You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by sp...@apache.org on 2018/05/31 03:32:13 UTC

[35/86] sentry git commit: Revert "SENTRY-2208: Refactor out Sentry service into own module from sentry-provider-db (Anthony Young-Garner, reviewed by Sergio Pena, Steve Moist, Na Li)"

http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStoreSchemaInfo.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStoreSchemaInfo.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStoreSchemaInfo.java
new file mode 100644
index 0000000..ce76a46
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStoreSchemaInfo.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.provider.db.service.persistent;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.sentry.core.common.exception.SentryUserException;
+
+public class SentryStoreSchemaInfo {
+  private static final String SQL_FILE_EXTENSION = ".sql";
+  private static final String UPGRADE_FILE_PREFIX = "upgrade-";
+  private static final String INIT_FILE_PREFIX = "sentry-";
+  private static final String VERSION_UPGRADE_LIST = "upgrade.order";
+  private final String dbType;
+  private final String sentrySchemaVersions[];
+  private final String sentryScriptDir;
+
+  private static final String SENTRY_VERSION = "2.1.0";
+
+  public SentryStoreSchemaInfo(String sentryScriptDir, String dbType)
+      throws SentryUserException {
+    this.sentryScriptDir = sentryScriptDir;
+    this.dbType = dbType;
+    // load upgrade order for the given dbType
+    List<String> upgradeOrderList = new ArrayList<String>();
+    String upgradeListFile = getSentryStoreScriptDir() + File.separator
+        + VERSION_UPGRADE_LIST + "." + dbType;
+    try (BufferedReader bfReader = new BufferedReader(new FileReader(upgradeListFile))) {
+      String currSchemaVersion;
+      while ((currSchemaVersion = bfReader.readLine()) != null) {
+        upgradeOrderList.add(currSchemaVersion.trim());
+      }
+    } catch (FileNotFoundException e) {
+      throw new SentryUserException("File " + upgradeListFile + " not found ", e);
+    } catch (IOException e) {
+      throw new SentryUserException("Error reading " + upgradeListFile, e);
+    }
+    sentrySchemaVersions = upgradeOrderList.toArray(new String[0]);
+  }
+
+  public String getSentrySchemaVersion() {
+    return SENTRY_VERSION;
+  }
+
+  public List<String> getUpgradeScripts(String fromSchemaVer)
+      throws SentryUserException {
+    List<String> upgradeScriptList = new ArrayList<String>();
+
+    // check if we are already at current schema level
+    if (getSentryVersion().equals(fromSchemaVer)) {
+      return upgradeScriptList;
+    }
+
+    // Find the list of scripts to execute for this upgrade
+    int firstScript = sentrySchemaVersions.length;
+    for (int i = 0; i < sentrySchemaVersions.length; i++) {
+      String fromVersion = sentrySchemaVersions[i].split("-to-")[0];
+      if (fromVersion.equals(fromSchemaVer)) {
+        firstScript = i;
+        break;
+      }
+    }
+    if (firstScript == sentrySchemaVersions.length) {
+      throw new SentryUserException("Unknown version specified for upgrade "
+          + fromSchemaVer + " Metastore schema may be too old or newer");
+    }
+
+    for (int i = firstScript; i < sentrySchemaVersions.length; i++) {
+      String scriptFile = generateUpgradeFileName(sentrySchemaVersions[i]);
+      upgradeScriptList.add(scriptFile);
+    }
+    return upgradeScriptList;
+  }
+
+  /***
+   * Get the name of the script to initialize the schema for given version
+   *
+   * @param toVersion
+   *          Target version. If it's null, then the current server version is
+   *          used
+   * @return
+   * @throws SentryUserException
+   */
+  public String generateInitFileName(String toVersion)
+      throws SentryUserException {
+    String version = toVersion;
+    if (version == null) {
+      version = getSentryVersion();
+    }
+    String initScriptName = INIT_FILE_PREFIX + dbType + "-" + version
+        + SQL_FILE_EXTENSION;
+    // check if the file exists
+    if (!(new File(getSentryStoreScriptDir() + File.separatorChar
+        + initScriptName).exists())) {
+      throw new SentryUserException(
+          "Unknown version specified for initialization: " + version);
+    }
+    return initScriptName;
+  }
+
+  /**
+   * Find the directory of sentry store scripts
+   *
+   * @return
+   */
+  public String getSentryStoreScriptDir() {
+    return sentryScriptDir;
+  }
+
+  // format the upgrade script name eg upgrade-x-y-dbType.sql
+  private String generateUpgradeFileName(String fileVersion) {
+    return INIT_FILE_PREFIX + UPGRADE_FILE_PREFIX + dbType + "-"
+        + fileVersion + SQL_FILE_EXTENSION;
+  }
+
+  // Current hive version, in majorVersion.minorVersion.changeVersion format
+  // TODO: store the version using the build script
+  public static String getSentryVersion() {
+    return SENTRY_VERSION;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionBlock.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionBlock.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionBlock.java
new file mode 100644
index 0000000..6ad52a3
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionBlock.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.provider.db.service.persistent;
+
+import javax.jdo.PersistenceManager;
+
+/**
+ * TransactionBlock wraps the code that is executed inside a single
+ * transaction. The {@link #execute(PersistenceManager)} method returns the
+ * result of the transaction.
+ */
+@FunctionalInterface
+public interface TransactionBlock<T> {
+  /**
+   * Execute some code as a single transaction, the code should not start new
+   * transaction or manipulate transactions with the PersistenceManager.
+   *
+   * @param pm PersistenceManager for the current transaction
+   * @return Object with the result of execute()
+   * @throws Exception
+   */
+  T execute(PersistenceManager pm) throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java
new file mode 100644
index 0000000..ba6e845
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.provider.db.service.persistent;
+
+import com.codahale.metrics.Counter;
+import static com.codahale.metrics.MetricRegistry.name;
+import com.codahale.metrics.Timer;
+
+import com.codahale.metrics.Timer.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.core.common.exception.SentryUserException;
+import org.apache.sentry.service.common.ServiceConstants.ServerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jdo.PersistenceManager;
+import javax.jdo.PersistenceManagerFactory;
+import javax.jdo.Transaction;
+
+import org.apache.sentry.api.service.thrift.SentryMetrics;
+
+import java.util.Random;
+import java.util.concurrent.Callable;
+
+/**
+ * TransactionManager is used for executing the database transaction, it supports
+ * the transaction with retry mechanism for the unexpected exceptions,
+ * except <em>SentryUserExceptions</em>, eg, <em>SentryNoSuchObjectException</em>,
+ * <em>SentryAlreadyExistsException</em> etc. For <em>SentryUserExceptions</em>,
+ * will simply throw the exception without retry<p>
+ *
+ * The purpose of the class is to separate all transaction housekeeping (opening
+ * transaction, rolling back failed transactions) from the actual transaction
+ * business logic.<p>
+ *
+ * TransactionManager creates an instance of PersistenceManager for each
+ * transaction.<p>
+ *
+ * TransactionManager exposes several metrics:
+ * <ul>
+ *     <li>Timer metric for all transactions</li>
+ *     <li>Counter for failed transactions</li>
+ *     <li>Counter for each exception thrown by transaction</li>
+ * </ul>
+ */
+@SuppressWarnings("NestedTryStatement")
+public final class TransactionManager {
+
+  private static final Logger LOGGER =
+          LoggerFactory.getLogger(TransactionManager.class);
+
+  /** Random number generator for exponential backoff */
+  private static final Random random = new Random();
+
+  private final PersistenceManagerFactory pmf;
+
+  // Maximum number of retries per call
+  private final int transactionRetryMax;
+
+  // Delay (in milliseconds) between retries
+  private final int retryWaitTimeMills;
+
+  /** Name for metrics */
+  private static final String TRANSACTIONS = "transactions";
+
+  // Transaction timer measures time distribution for all transactions
+  private final Timer transactionTimer =
+          SentryMetrics.getInstance().
+                  getTimer(name(TransactionManager.class,
+                           TRANSACTIONS));
+
+  // Counter for failed transactions
+  private final Counter failedTransactionsCount =
+          SentryMetrics.getInstance().
+                  getCounter(name(TransactionManager.class,
+                             TRANSACTIONS, "failed"));
+
+  private final Counter retryCount =
+          SentryMetrics.getInstance().getCounter(name(TransactionManager.class,
+                  TRANSACTIONS, "retry"));
+
+  TransactionManager(PersistenceManagerFactory pmf, Configuration conf) {
+    this.pmf = pmf;
+    transactionRetryMax = conf.getInt(
+        ServerConfig.SENTRY_STORE_TRANSACTION_RETRY,
+        ServerConfig.SENTRY_STORE_TRANSACTION_RETRY_DEFAULT);
+    retryWaitTimeMills = conf.getInt(
+        ServerConfig.SENTRY_STORE_TRANSACTION_RETRY_WAIT_TIME_MILLIS,
+        ServerConfig.SENTRY_STORE_TRANSACTION_RETRY_WAIT_TIME_MILLIS_DEFAULT);
+  }
+
+
+  /**
+   * Execute some code as a single transaction, the code in tb.execute()
+   * should not start new transaction or manipulate transactions with the
+   * PersistenceManager.
+   *
+   * @param tb transaction block with code to be executed
+   * @return Object with the result of tb.execute()
+   */
+  public <T> T executeTransaction(TransactionBlock<T> tb) throws Exception {
+    try (Context context = transactionTimer.time();
+         PersistenceManager pm = pmf.getPersistenceManager()) {
+      Transaction transaction = pm.currentTransaction();
+      transaction.begin();
+      try {
+        T result = tb.execute(pm);
+        transaction.commit();
+        return result;
+      } catch (Exception e) {
+        // Count total failed transactions
+        failedTransactionsCount.inc();
+        // Count specific exceptions
+        SentryMetrics.getInstance().getCounter(name(TransactionManager.class,
+                "exception", e.getClass().getSimpleName())).inc();
+        // Re-throw the exception
+        throw e;
+      } finally {
+        if (transaction.isActive()) {
+          transaction.rollback();
+        }
+      }
+    }
+  }
+
+  /**
+   * Execute a list of TransactionBlock code as a single transaction.
+   * The code in tb.execute() should not start new transaction or
+   * manipulate transactions with the PersistenceManager. It returns
+   * the result of the last transaction block execution.
+   *
+   * @param tbs transaction blocks with code to be executed
+   * @return the result of the last result of tb.execute()
+   */
+  private <T> T executeTransaction(Iterable<TransactionBlock<T>> tbs) throws Exception {
+    try (Context context = transactionTimer.time();
+         PersistenceManager pm = pmf.getPersistenceManager()) {
+      Transaction transaction = pm.currentTransaction();
+      transaction.begin();
+      try {
+        T result = null;
+        for (TransactionBlock<T> tb : tbs) {
+          result = tb.execute(pm);
+        }
+        transaction.commit();
+        return result;
+      } catch (Exception e) {
+        // Count total failed transactions
+        failedTransactionsCount.inc();
+        // Count specific exceptions
+        SentryMetrics.getInstance().getCounter(name(TransactionManager.class,
+            "exception", e.getClass().getSimpleName())).inc();
+        // Re-throw the exception
+        throw e;
+      } finally {
+        if (transaction.isActive()) {
+          transaction.rollback();
+        }
+      }
+    }
+  }
+
+  /**
+   * Execute some code as a single transaction with retry mechanism.
+   *
+   * @param tb transaction block with code to execute
+   * @return Object with the result of tb.execute()
+   */
+  @SuppressWarnings("squid:S00112")
+  public <T> T executeTransactionWithRetry(final TransactionBlock<T> tb)
+          throws Exception {
+    return new ExponentialBackoff().execute(
+            new Callable<T>() {
+              @Override
+              public T call() throws Exception {
+                return executeTransaction(tb);
+              }
+            }
+    );
+  }
+
+  /**
+   * Execute a list of TransactionBlock code as a single transaction.
+   * If any of the TransactionBlock fail, all the TransactionBlocks would
+   * retry. It returns the result of the last transaction block
+   * execution.
+   *
+   * @param tbs a list of transaction blocks with code to be executed.
+   */
+  @SuppressWarnings("squid:S00112")
+  <T> void executeTransactionBlocksWithRetry(final Iterable<TransactionBlock<T>> tbs)
+          throws Exception {
+    new ExponentialBackoff().execute(
+            new Callable<T>() {
+              @Override
+              public T call() throws Exception {
+                return executeTransaction(tbs);
+              }
+            }
+    );
+  }
+
+  /**
+   * Implementation of exponential backoff with random fuzziness.
+   * On each iteration the backoff time is 1.5 the previous amount plus the
+   * random fuzziness factor which is up to half of the previous amount.
+   */
+  private class ExponentialBackoff {
+
+    @SuppressWarnings("squid:S00112")
+    <T> T execute(Callable<T> arg) throws Exception {
+      Exception ex = null;
+      long sleepTime = retryWaitTimeMills;
+
+      for (int retryNum = 1; retryNum <= transactionRetryMax; retryNum++) {
+        try {
+          return arg.call();
+        } catch (SentryUserException e) {
+          // throw the sentry exception without retry
+          LOGGER.warn("Transaction manager encountered non-retriable exception", e);
+          throw e;
+        } catch (Exception e) {
+          ex = e;
+          retryCount.inc();
+          LOGGER.warn("Transaction execution encountered exception", e);
+          LOGGER.warn("Retrying transaction {}/{} times",
+                  retryNum, transactionRetryMax);
+          // Introduce some randomness in the backoff time.
+          LOGGER.warn("Sleeping for {} milliseconds before retrying", sleepTime);
+          Thread.sleep(sleepTime);
+          int fuzz = random.nextInt((int)sleepTime / 2);
+          sleepTime *= 3;
+          sleepTime /= 2;
+          sleepTime += fuzz;
+        }
+      }
+      assert(ex != null);
+      String message = "The transaction has reached max retry number, "
+              + ex.getMessage();
+      LOGGER.error(message, ex);
+      throw new Exception(message, ex);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java
new file mode 100644
index 0000000..992d8ab
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java
@@ -0,0 +1,524 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.service.thrift;
+
+import com.codahale.metrics.Counter;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.sentry.hdfs.PathsUpdate;
+import org.apache.sentry.hdfs.SentryMalformedPathException;
+import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
+import org.apache.sentry.api.service.thrift.SentryMetrics;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import static com.codahale.metrics.MetricRegistry.name;
+
+/**
+ * Manage fetching full snapshot from HMS.
+ * Snapshot is represented as a map from the hive object name to
+ * the set of paths for this object.
+ * The hive object name is either the Hive database name or
+ * Hive database name joined with Hive table name as {@code dbName.tableName}.
+ * All table partitions are stored under the table object.
+ * <p>
+ * Once {@link FullUpdateInitializer}, the {@link FullUpdateInitializer#getFullHMSSnapshot()}
+ * method should be called to get the initial update.
+ * <p>
+ * It is important to close the {@link FullUpdateInitializer} object to prevent resource
+ * leaks.
+ * <p>
+ * The usual way of using {@link FullUpdateInitializer} is
+ * <pre>
+ * {@code
+ * try (FullUpdateInitializer updateInitializer =
+ *      new FullUpdateInitializer(clientFactory, authzConf)) {
+ *         Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot();
+ *      return pathsUpdate;
+ * }
+ */
+public final class FullUpdateInitializer implements AutoCloseable {
+
+  /*
+   * Implementation note.
+   *
+   * The snapshot is obtained using an executor. We follow the map/reduce model.
+   * Each executor thread (mapper) obtains and returns a partial snapshot which are then
+   * reduced to a single combined snapshot by getFullHMSSnapshot().
+   *
+   * Synchronization between the getFullHMSSnapshot() and executors is done using the
+   * 'results' queue. The queue holds the futures for each scheduled task.
+   * It is initially populated by getFullHMSSnapshot and each task may add new future
+   * results to it. Only getFullHMSSnapshot() removes entries from the results queue.
+   * This guarantees that once the results queue is empty there are no pending jobs.
+   *
+   * Since there are no other data sharing, the implementation is safe without
+   * any other synchronization. It is not thread-safe for concurrent calls
+   * to getFullHMSSnapshot().
+   *
+   */
+
+
+  private static final String FULL_UPDATE_INITIALIZER_THREAD_NAME = "hms-fetch-%d";
+  private final ExecutorService threadPool;
+  private final int maxPartitionsPerCall;
+  private final int maxTablesPerCall;
+  private final Deque<Future<CallResult>> results = new ConcurrentLinkedDeque<>();
+  private final int maxRetries;
+  private final int waitDurationMillis;
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(FullUpdateInitializer.class);
+
+  private static final ObjectMapping emptyObjectMapping =
+          new ObjectMapping(Collections.<String, Set<String>>emptyMap());
+  private final HiveConnectionFactory clientFactory;
+
+  /** Total number of database objects */
+  private final Counter databaseCount = SentryMetrics.getInstance()
+      .getCounter(name(FullUpdateInitializer.class, "total", "db"));
+
+  /** Total number of table objects */
+  private final Counter tableCount = SentryMetrics.getInstance()
+      .getCounter(name(FullUpdateInitializer.class, "total", "tables"));
+
+  /** Total number of partition objects */
+  private final Counter partitionCount = SentryMetrics.getInstance()
+      .getCounter(name(FullUpdateInitializer.class, "total", "partitions"));
+
+  /**
+   * Extract path (not starting with "/") from the full URI
+   * @param uri - resource URI (usually with scheme)
+   * @return path if uri is valid or null
+   */
+  static String pathFromURI(String uri) {
+    try {
+      return PathsUpdate.parsePath(uri);
+    } catch (SentryMalformedPathException e) {
+      LOGGER.warn(String.format("Ignoring invalid uri %s: %s",
+              uri, e.getReason()));
+      return null;
+    }
+  }
+
+  /**
+   * Mapping of object to set of paths.
+   * Used to represent partial results from executor threads. Multiple
+   * ObjectMapping objects are combined in a single mapping
+   * to get the final result.
+   */
+  private static final class ObjectMapping {
+    private final Map<String, Set<String>> objects;
+
+    ObjectMapping(Map<String, Set<String>> objects) {
+      this.objects = objects;
+    }
+
+    ObjectMapping(String authObject, String path) {
+      Set<String> values = Collections.singleton(safeIntern(path));
+      objects = ImmutableMap.of(authObject, values);
+    }
+
+    ObjectMapping(String authObject, Collection<String> paths) {
+      Set<String> values = new HashSet<>(paths);
+      objects = ImmutableMap.of(authObject, values);
+    }
+
+    Map<String, Set<String>> getObjects() {
+      return objects;
+    }
+  }
+
+  private static final class CallResult {
+    private final Exception failure;
+    private final boolean successStatus;
+    private final ObjectMapping objectMapping;
+
+    CallResult(Exception ex) {
+      failure = ex;
+      successStatus = false;
+      objectMapping = emptyObjectMapping;
+    }
+
+    CallResult(ObjectMapping objectMapping) {
+      failure = null;
+      successStatus = true;
+      this.objectMapping = objectMapping;
+    }
+
+    boolean success() {
+      return successStatus;
+    }
+
+    ObjectMapping getObjectMapping() {
+      return objectMapping;
+    }
+
+    public Exception getFailure() {
+      return failure;
+    }
+  }
+
+  private abstract class BaseTask implements Callable<CallResult> {
+
+    /**
+     *  Class represents retry strategy for BaseTask.
+     */
+    private final class RetryStrategy {
+      private int retryStrategyMaxRetries = 0;
+      private final int retryStrategyWaitDurationMillis;
+
+      private RetryStrategy(int retryStrategyMaxRetries, int retryStrategyWaitDurationMillis) {
+        this.retryStrategyMaxRetries = retryStrategyMaxRetries;
+
+        // Assign default wait duration if negative value is provided.
+        this.retryStrategyWaitDurationMillis = (retryStrategyWaitDurationMillis > 0) ?
+                retryStrategyWaitDurationMillis : 1000;
+      }
+
+      @SuppressWarnings({"squid:S1141", "squid:S2142"})
+      public CallResult exec()  {
+        // Retry logic is happening inside callable/task to avoid
+        // synchronous waiting on getting the result.
+        // Retry the failure task until reach the max retry number.
+        // Wait configurable duration for next retry.
+        //
+        // Only thrift exceptions are retried.
+        // Other exceptions are propagated up the stack.
+        Exception exception = null;
+        try {
+          // We catch all exceptions except Thrift exceptions which are retried
+          for (int i = 0; i < retryStrategyMaxRetries; i++) {
+            //noinspection NestedTryStatement
+            try {
+              return new CallResult(doTask());
+            } catch (TException ex) {
+              LOGGER.debug("Failed to execute task on " + (i + 1) + " attempts." +
+                      " Sleeping for " + retryStrategyWaitDurationMillis + " ms. Exception: " +
+                      ex.toString(), ex);
+              exception = ex;
+
+              try {
+                Thread.sleep(retryStrategyWaitDurationMillis);
+              } catch (InterruptedException ignored) {
+                // Skip the rest retries if get InterruptedException.
+                // And set the corresponding retries number.
+                LOGGER.warn("Interrupted during update fetch during iteration " + (i + 1));
+                break;
+              }
+            }
+          }
+        } catch (Exception ex) {
+          exception = ex;
+        }
+        LOGGER.error("Failed to execute task", exception);
+        // We will fail in the end, so we are shutting down the pool to prevent
+        // new tasks from being scheduled.
+        threadPool.shutdown();
+        return new CallResult(exception);
+      }
+    }
+
+    private final RetryStrategy retryStrategy;
+
+    BaseTask() {
+      retryStrategy = new RetryStrategy(maxRetries, waitDurationMillis);
+    }
+
+    @Override
+    public CallResult call() throws Exception {
+      return retryStrategy.exec();
+    }
+
+    abstract ObjectMapping doTask() throws Exception;
+  }
+
+  private class PartitionTask extends BaseTask {
+    private final String dbName;
+    private final String tblName;
+    private final String authName;
+    private final List<String> partNames;
+
+    PartitionTask(String dbName, String tblName, String authName,
+                  List<String> partNames) {
+      this.dbName = safeIntern(dbName);
+      this.tblName = safeIntern(tblName);
+      this.authName = safeIntern(authName);
+      this.partNames = partNames;
+    }
+
+    @Override
+    ObjectMapping doTask() throws Exception {
+      List<Partition> tblParts;
+      HMSClient c = null;
+      try (HMSClient client = clientFactory.connect()) {
+        c = client;
+        tblParts = client.getClient().getPartitionsByNames(dbName, tblName, partNames);
+      } catch (Exception e) {
+        if (c != null) {
+          c.invalidate();
+        }
+        throw e;
+      }
+
+      LOGGER.debug("Fetched partitions for db = {}, table = {}",
+              dbName, tblName);
+
+      Collection<String> partitionNames = new ArrayList<>(tblParts.size());
+      for (Partition part : tblParts) {
+        String partPath = pathFromURI(part.getSd().getLocation());
+        if (partPath != null) {
+          partitionNames.add(partPath.intern());
+        }
+      }
+      return new ObjectMapping(authName, partitionNames);
+    }
+  }
+
+  private class TableTask extends BaseTask {
+    private final String dbName;
+    private final List<String> tableNames;
+
+    TableTask(Database db, List<String> tableNames) {
+      dbName = safeIntern(db.getName());
+      this.tableNames = tableNames;
+    }
+
+    @Override
+    @SuppressWarnings({"squid:S2629", "squid:S135"})
+    ObjectMapping doTask() throws Exception {
+      HMSClient c = null;
+      try (HMSClient client = clientFactory.connect()) {
+        c = client;
+        List<Table> tables = client.getClient().getTableObjectsByName(dbName, tableNames);
+
+        LOGGER.debug("Fetching tables for db = {}, tables = {}", dbName, tableNames);
+
+        Map<String, Set<String>> objectMapping = new HashMap<>(tables.size());
+        for (Table tbl : tables) {
+          // Table names are case insensitive
+          if (!tbl.getDbName().equalsIgnoreCase(dbName)) {
+            // Inconsistency in HMS data
+            LOGGER.warn(String.format("DB name %s for table %s does not match %s",
+                    tbl.getDbName(), tbl.getTableName(), dbName));
+            continue;
+          }
+
+          String tableName = safeIntern(tbl.getTableName().toLowerCase());
+          String authzObject = (dbName + "." + tableName).intern();
+          List<String> tblPartNames =
+              client.getClient().listPartitionNames(dbName, tableName, (short) -1);
+          // Count total number of partitions
+          partitionCount.inc(tblPartNames.size());
+          for (int i = 0; i < tblPartNames.size(); i += maxPartitionsPerCall) {
+            List<String> partsToFetch = tblPartNames.subList(i,
+                    Math.min(i + maxPartitionsPerCall, tblPartNames.size()));
+            Callable<CallResult> partTask = new PartitionTask(dbName,
+                    tableName, authzObject, partsToFetch);
+            results.add(threadPool.submit(partTask));
+          }
+          String tblPath = safeIntern(pathFromURI(tbl.getSd().getLocation()));
+          if (tblPath == null) {
+            continue;
+          }
+          Set<String> paths = objectMapping.get(authzObject);
+          if (paths == null) {
+            paths = new HashSet<>(1);
+            objectMapping.put(authzObject, paths);
+          }
+          paths.add(tblPath);
+        }
+        return new ObjectMapping(Collections.unmodifiableMap(objectMapping));
+      } catch (Exception e) {
+        if (c != null) {
+          c.invalidate();
+        }
+        throw e;
+      }
+    }
+  }
+
+  private class DbTask extends BaseTask {
+
+    private final String dbName;
+
+    DbTask(String dbName) {
+      //Database names are case insensitive
+      this.dbName = safeIntern(dbName.toLowerCase());
+      databaseCount.inc();
+    }
+
+    @Override
+    ObjectMapping doTask() throws Exception {
+      HMSClient c = null;
+      try (HMSClient client = clientFactory.connect()) {
+        c = client;
+        Database db = client.getClient().getDatabase(dbName);
+        if (!dbName.equalsIgnoreCase(db.getName())) {
+          LOGGER.warn("Database name {} does not match {}", db.getName(), dbName);
+          return emptyObjectMapping;
+        }
+        List<String> allTblStr = client.getClient().getAllTables(dbName);
+        // Count total number of tables
+        tableCount.inc(allTblStr.size());
+        for (int i = 0; i < allTblStr.size(); i += maxTablesPerCall) {
+          List<String> tablesToFetch = allTblStr.subList(i,
+                  Math.min(i + maxTablesPerCall, allTblStr.size()));
+          Callable<CallResult> tableTask = new TableTask(db, tablesToFetch);
+          results.add(threadPool.submit(tableTask));
+        }
+        String dbPath = safeIntern(pathFromURI(db.getLocationUri()));
+        return (dbPath != null) ? new ObjectMapping(dbName, dbPath) :
+                emptyObjectMapping;
+      } catch (Exception e) {
+        if (c != null) {
+          c.invalidate();
+        }
+        throw e;
+      }
+    }
+  }
+
+  FullUpdateInitializer(HiveConnectionFactory clientFactory, Configuration conf) {
+    this.clientFactory = clientFactory;
+    maxPartitionsPerCall = conf.getInt(
+        ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC,
+        ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC_DEFAULT);
+    maxTablesPerCall = conf.getInt(
+        ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC,
+        ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC_DEFAULT);
+    maxRetries = conf.getInt(
+            ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM,
+            ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM_DEFAULT);
+    waitDurationMillis = conf.getInt(
+            ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS,
+            ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS_DEFAULT);
+
+    ThreadFactory fullUpdateInitThreadFactory = new ThreadFactoryBuilder()
+        .setNameFormat(FULL_UPDATE_INITIALIZER_THREAD_NAME)
+        .setDaemon(false)
+        .build();
+    threadPool = Executors.newFixedThreadPool(conf.getInt(
+        ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS,
+        ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS_DEFAULT),
+            fullUpdateInitThreadFactory);
+  }
+
+  /**
+   * Get Full HMS snapshot.
+   * @return Full snapshot of HMS objects.
+   * @throws TException if Thrift error occured
+   * @throws ExecutionException if there was a scheduling error
+   * @throws InterruptedException if processing was interrupted
+   */
+  @SuppressWarnings("squid:S00112")
+  Map<String, Collection<String>> getFullHMSSnapshot() throws Exception {
+    // Get list of all HMS databases
+    List<String> allDbStr;
+    HMSClient c = null;
+    try (HMSClient client = clientFactory.connect()) {
+      c = client;
+      allDbStr = client.getClient().getAllDatabases();
+    } catch (Exception e) {
+      if (c != null) {
+        c.invalidate();
+      }
+      throw e;
+    }
+
+    // Schedule async task for each database responsible for fetching per-database
+    // objects.
+    for (String dbName : allDbStr) {
+      results.add(threadPool.submit(new DbTask(dbName)));
+    }
+
+    // Resulting full snapshot
+    Map<String, Collection<String>> fullSnapshot = new HashMap<>();
+
+    // As async tasks complete, merge their results into full snapshot.
+    while (!results.isEmpty()) {
+      // This is the only thread that takes elements off the results list - all other threads
+      // only add to it. Once the list is empty it can't become non-empty
+      // This means that if we check that results is non-empty we can safely call pop() and
+      // know that the result of poll() is not null.
+      Future<CallResult> result = results.pop();
+      // Wait for the task to complete
+      CallResult callResult = result.get();
+      // Fail if we got errors
+      if (!callResult.success()) {
+        throw callResult.getFailure();
+      }
+      // Merge values into fullUpdate
+      Map<String, Set<String>> objectMapping =
+              callResult.getObjectMapping().getObjects();
+      for (Map.Entry<String, Set<String>> entry: objectMapping.entrySet()) {
+        String key = entry.getKey();
+        Set<String> val = entry.getValue();
+        Set<String> existingSet = (Set<String>)fullSnapshot.get(key);
+        if (existingSet == null) {
+          fullSnapshot.put(key, val);
+          continue;
+        }
+        existingSet.addAll(val);
+      }
+    }
+    return fullSnapshot;
+  }
+
+  @Override
+  public void close() {
+    threadPool.shutdownNow();
+    try {
+      threadPool.awaitTermination(1, TimeUnit.SECONDS);
+    } catch (InterruptedException ignored) {
+      LOGGER.warn("Interrupted shutdown");
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  /**
+   * Intern a string but only if it is not null
+   * @param arg String to be interned, may be null
+   * @return interned string or null
+   */
+  static String safeIntern(String arg) {
+    return (arg != null) ? arg.intern() : null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateModifier.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateModifier.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateModifier.java
new file mode 100644
index 0000000..c30d982
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateModifier.java
@@ -0,0 +1,606 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAddPartitionMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterPartitionMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterTableMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateDatabaseMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateTableMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropDatabaseMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropPartitionMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropTableMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Apply newer events to the full update.
+ *
+ * <p>The process of obtaining ful snapshot from HMS is not atomic.
+ * While we read information from HMS it may change - some new objects can be created,
+ * or some can be removed or modified. This class is used to reconsile changes to
+ * the full snapshot.
+ */
+final class FullUpdateModifier {
+  private static final Logger LOGGER = LoggerFactory.getLogger(FullUpdateModifier.class);
+
+  // Prevent creation of class instances
+  private FullUpdateModifier() {
+  }
+
+  /**
+   * Take a full snapshot and apply an MS event to it.
+   *
+   * <p>We pass serializer as a parameter to simplify testing.
+   *
+   * @param image Full snapshot
+   * @param event HMS notificatin event
+   * @param deserializer Message deserializer -
+   *                     should produce Sentry JSON serializer type messages.
+   */
+  // NOTE: we pass deserializer here instead of using built-in one to simplify testing.
+  // Tests use mock serializers and thus we do not have to construct proper events.
+  static void applyEvent(Map<String, Collection<String>> image, NotificationEvent event,
+                         MessageDeserializer deserializer) {
+    EventMessage.EventType eventType =
+            EventMessage.EventType.valueOf(event.getEventType());
+
+    switch (eventType) {
+      case CREATE_DATABASE:
+        createDatabase(image, event, deserializer);
+        break;
+      case DROP_DATABASE:
+        dropDatabase(image, event, deserializer);
+        break;
+      case CREATE_TABLE:
+        createTable(image, event, deserializer);
+        break;
+      case DROP_TABLE:
+        dropTable(image, event, deserializer);
+        break;
+      case ALTER_TABLE:
+        alterTable(image, event, deserializer);
+        break;
+      case ADD_PARTITION:
+        addPartition(image, event, deserializer);
+        break;
+      case DROP_PARTITION:
+        dropPartition(image, event, deserializer);
+        break;
+      case ALTER_PARTITION:
+        alterPartition(image, event, deserializer);
+        break;
+      default:
+        LOGGER.error("Notification with ID:{} has invalid event type: {}", event.getEventId(),
+                event.getEventType());
+        break;
+    }
+  }
+
+  /**
+   * Add mapping from the new database name to location {dbname: {location}}.
+   */
+  private static void createDatabase(Map<String, Collection<String>> image, NotificationEvent event,
+                                     MessageDeserializer deserializer) {
+    SentryJSONCreateDatabaseMessage message =
+            (SentryJSONCreateDatabaseMessage) deserializer
+                    .getCreateDatabaseMessage(event.getMessage());
+
+    String dbName = message.getDB();
+    if ((dbName == null) || dbName.isEmpty()) {
+      LOGGER.error("Create database event is missing database name");
+      return;
+    }
+    dbName = dbName.toLowerCase();
+
+    String location = message.getLocation();
+    if ((location == null) || location.isEmpty()) {
+      LOGGER.error("Create database event is missing database location");
+      return;
+    }
+
+    String path = FullUpdateInitializer.pathFromURI(location);
+    if (path == null) {
+      return;
+    }
+
+    // Add new database if it doesn't exist yet
+    if (!image.containsKey(dbName)) {
+      LOGGER.debug("create database {} with location {}", dbName, location);
+      image.put(dbName.intern(), Collections.singleton(path));
+    } else {
+      // Sanity check the information and print warnings if database exists but
+      // with a different location
+      Set<String> oldLocations = (Set<String>)image.get(dbName);
+      LOGGER.debug("database {} already exists, ignored", dbName);
+      if (!oldLocations.contains(location)) {
+        LOGGER.warn("database {} exists but location is different from {}", dbName, location);
+      }
+    }
+  }
+
+  /**
+   * Remove a mapping from database name and remove all mappings which look like dbName.tableName
+   * where dbName matches database name.
+   */
+  private static void dropDatabase(Map<String, Collection<String>> image, NotificationEvent event,
+                                   MessageDeserializer deserializer) {
+    SentryJSONDropDatabaseMessage message =
+            (SentryJSONDropDatabaseMessage) deserializer.getDropDatabaseMessage(event.getMessage());
+
+    String dbName = message.getDB();
+    if ((dbName == null) || dbName.isEmpty()) {
+      LOGGER.error("Drop database event is missing database name");
+      return;
+    }
+    dbName = dbName.toLowerCase();
+    String location = message.getLocation();
+    if ((location == null) || location.isEmpty()) {
+      LOGGER.error("Drop database event is missing database location");
+      return;
+    }
+
+    String path = FullUpdateInitializer.pathFromURI(location);
+    if (path == null) {
+      return;
+    }
+
+    // If the database is alreday deleted, we have nothing to do
+    Set<String> locations = (Set<String>)image.get(dbName);
+    if (locations == null) {
+      LOGGER.debug("database {} is already deleted", dbName);
+      return;
+    }
+
+    if (!locations.contains(path)) {
+      LOGGER.warn("Database {} location does not match {}", dbName, path);
+      return;
+    }
+
+    LOGGER.debug("drop database {} with location {}", dbName, location);
+
+    // Drop information about the database
+    image.remove(dbName);
+
+    String dbPrefix = dbName + ".";
+
+    // Remove all objects for this database
+    for (Iterator<Map.Entry<String, Collection<String>>> it = image.entrySet().iterator();
+         it.hasNext(); ) {
+      Map.Entry<String, Collection<String>> entry = it.next();
+      String key = entry.getKey();
+      if (key.startsWith(dbPrefix)) {
+        LOGGER.debug("Removing {}", key);
+        it.remove();
+      }
+    }
+  }
+
+  /**
+   * Add mapping for dbName.tableName.
+   */
+  private static void createTable(Map<String, Collection<String>> image, NotificationEvent event,
+                                  MessageDeserializer deserializer) {
+    SentryJSONCreateTableMessage message = (SentryJSONCreateTableMessage) deserializer
+            .getCreateTableMessage(event.getMessage());
+
+    String dbName = message.getDB();
+    if ((dbName == null) || dbName.isEmpty()) {
+      LOGGER.error("Create table event is missing database name");
+      return;
+    }
+    String tableName = message.getTable();
+    if ((tableName == null) || tableName.isEmpty()) {
+      LOGGER.error("Create table event is missing table name");
+      return;
+    }
+
+    String location = message.getLocation();
+    if ((location == null) || location.isEmpty()) {
+      LOGGER.error("Create table event is missing table location");
+      return;
+    }
+
+    String path = FullUpdateInitializer.pathFromURI(location);
+    if (path == null) {
+      return;
+    }
+
+    String authName = dbName.toLowerCase() + "." + tableName.toLowerCase();
+    // Add new table if it doesn't exist yet
+    if (!image.containsKey(authName)) {
+      LOGGER.debug("create table {} with location {}", authName, location);
+      Set<String> locations = new HashSet<>(1);
+      locations.add(path);
+      image.put(authName.intern(), locations);
+    } else {
+      // Sanity check the information and print warnings if table exists but
+      // with a different location
+      Set<String> oldLocations = (Set<String>)image.get(authName);
+      LOGGER.debug("Table {} already exists, ignored", authName);
+      if (!oldLocations.contains(location)) {
+        LOGGER.warn("Table {} exists but location is different from {}", authName, location);
+      }
+    }
+  }
+
+  /**
+   * Drop mapping from dbName.tableName
+   */
+  private static void dropTable(Map<String, Collection<String>> image, NotificationEvent event,
+                                MessageDeserializer deserializer) {
+    SentryJSONDropTableMessage message = (SentryJSONDropTableMessage) deserializer
+            .getDropTableMessage(event.getMessage());
+
+    String dbName = message.getDB();
+    if ((dbName == null) || dbName.isEmpty()) {
+      LOGGER.error("Drop table event is missing database name");
+      return;
+    }
+    String tableName = message.getTable();
+    if ((tableName == null) || tableName.isEmpty()) {
+      LOGGER.error("Drop table event is missing table name");
+      return;
+    }
+
+    String location = message.getLocation();
+    if ((location == null) || location.isEmpty()) {
+      LOGGER.error("Drop table event is missing table location");
+      return;
+    }
+
+    String path = FullUpdateInitializer.pathFromURI(location);
+    if (path == null) {
+      return;
+    }
+
+    String authName = dbName.toLowerCase() + "." + tableName.toLowerCase();
+    Set<String> locations = (Set<String>)image.get(authName);
+    if (locations != null && locations.contains(path)) {
+      LOGGER.debug("Removing {}", authName);
+      image.remove(authName);
+    } else {
+      LOGGER.warn("can't find matching table {} with location {}", authName, location);
+    }
+  }
+
+  /**
+   * ALTER TABLE is a complicated function that can alter multiple things.
+   *
+   * <p>We take care iof the following cases:
+   * <ul>
+   *   <li>Change database name. This is the most complicated one.
+   *   We need to change the actual database name and change all mappings
+   *   that look like "dbName.tableName" to the new dbName</li>
+   *  <li>Change table name</li>
+   *  <li>Change location</li>
+   * </ul>
+   *
+   */
+  private static void alterTable(Map<String, Collection<String>> image, NotificationEvent event,
+                                 MessageDeserializer deserializer) {
+    SentryJSONAlterTableMessage message =
+            (SentryJSONAlterTableMessage) deserializer.getAlterTableMessage(event.getMessage());
+    String prevDbName = message.getDB();
+    if ((prevDbName == null) || prevDbName.isEmpty()) {
+      LOGGER.error("Alter table event is missing old database name");
+      return;
+    }
+    prevDbName = prevDbName.toLowerCase();
+    String prevTableName = message.getTable();
+    if ((prevTableName == null) || prevTableName.isEmpty()) {
+      LOGGER.error("Alter table event is missing old table name");
+      return;
+    }
+    prevTableName = prevTableName.toLowerCase();
+
+    String newDbName = event.getDbName();
+    if ((newDbName == null) || newDbName.isEmpty()) {
+      LOGGER.error("Alter table event is missing new database name");
+      return;
+    }
+    newDbName = newDbName.toLowerCase();
+
+    String newTableName = event.getTableName();
+    if ((newTableName == null) || newTableName.isEmpty()) {
+      LOGGER.error("Alter table event is missing new table name");
+      return;
+    }
+    newTableName = newTableName.toLowerCase();
+
+    String prevLocation = message.getOldLocation();
+    if ((prevLocation == null) || prevLocation.isEmpty()) {
+      LOGGER.error("Alter table event is missing old location");
+      return;
+    }
+    String prevPath = FullUpdateInitializer.pathFromURI(prevLocation);
+    if (prevPath == null) {
+      return;
+    }
+
+    String newLocation = message.getNewLocation();
+    if ((newLocation == null) || newLocation.isEmpty()) {
+      LOGGER.error("Alter table event is missing new location");
+      return;
+    }
+    String newPath = FullUpdateInitializer.pathFromURI(newLocation);
+    if (newPath == null) {
+      return;
+    }
+
+    String prevAuthName = prevDbName + "." + prevTableName;
+    String newAuthName = newDbName + "." + newTableName;
+
+    if (!prevDbName.equals(newDbName)) {
+      // Database name change
+      LOGGER.debug("Changing database name: {} -> {}", prevDbName, newDbName);
+      Set<String> locations = (Set<String>)image.get(prevDbName);
+      if (locations != null) {
+        // Rename database if it is not renamed yet
+        if (!image.containsKey(newDbName)) {
+          image.put(newDbName, locations);
+          image.remove(prevDbName);
+          // Walk through all tables and rename DB part of the AUTH name
+          // AUTH name is "dbName.TableName" so we need to replace dbName with the new name
+          String prevDbPrefix = prevDbName + ".";
+          String newDbPrefix = newDbName + ".";
+          renamePrefixKeys(image, prevDbPrefix, newDbPrefix);
+        } else {
+          LOGGER.warn("database {} rename: found existing database {}", prevDbName, newDbName);
+        }
+      } else {
+        LOGGER.debug("database {} not found", prevDbName);
+      }
+    }
+
+    if (!prevAuthName.equals(newAuthName)) {
+      // Either the database name or table name changed, rename objects
+      Set<String> locations = (Set<String>)image.get(prevAuthName);
+      if (locations != null) {
+        // Rename if it is not renamed yet
+        if (!image.containsKey(newAuthName)) {
+          LOGGER.debug("rename {} -> {}", prevAuthName, newAuthName);
+          image.put(newAuthName, locations);
+          image.remove(prevAuthName);
+        } else {
+          LOGGER.warn("auth {} rename: found existing object {}", prevAuthName, newAuthName);
+        }
+      } else {
+        LOGGER.debug("auth {} not found", prevAuthName);
+      }
+    }
+
+    if (!prevPath.equals(newPath)) {
+      LOGGER.debug("Location change: {} -> {}", prevPath, newPath);
+      // Location change
+      Set<String> locations = (Set<String>) image.get(newAuthName);
+      if (locations != null && locations.contains(prevPath) && !locations.contains(newPath)) {
+        locations.remove(prevPath);
+        locations.add(newPath);
+      } else {
+        LOGGER.warn("can not process location change for {}", newAuthName);
+        LOGGER.warn("old locatio = {}, new location = {}", prevPath, newPath);
+      }
+    }
+  }
+
+  /**
+   * Add partition just adds a new location to the existing table.
+   */
+  private static void addPartition(Map<String, Collection<String>> image, NotificationEvent event,
+                                   MessageDeserializer deserializer) {
+    SentryJSONAddPartitionMessage message =
+            (SentryJSONAddPartitionMessage) deserializer.getAddPartitionMessage(event.getMessage());
+
+    String dbName = message.getDB();
+    if ((dbName == null) || dbName.isEmpty()) {
+      LOGGER.error("Add partition event is missing database name");
+      return;
+    }
+    String tableName = message.getTable();
+    if ((tableName == null) || tableName.isEmpty()) {
+      LOGGER.error("Add partition event for {} is missing table name", dbName);
+      return;
+    }
+
+    String authName = dbName.toLowerCase() + "." + tableName.toLowerCase();
+
+    List<String> locations = message.getLocations();
+    if (locations == null || locations.isEmpty()) {
+      LOGGER.error("Add partition event for {} is missing partition locations", authName);
+      return;
+    }
+
+    Set<String> oldLocations = (Set<String>) image.get(authName);
+    if (oldLocations == null) {
+      LOGGER.warn("Add partition for {}: missing table locations",authName);
+      return;
+    }
+
+    // Add each partition
+    for (String location: locations) {
+      String path = FullUpdateInitializer.pathFromURI(location);
+      if (path != null) {
+        LOGGER.debug("Adding partition {}:{}", authName, path);
+        oldLocations.add(path);
+      }
+    }
+  }
+
+  /**
+   * Drop partition removes location from the existing table.
+   */
+  private static void dropPartition(Map<String, Collection<String>> image, NotificationEvent event,
+                                    MessageDeserializer deserializer) {
+    SentryJSONDropPartitionMessage message =
+            (SentryJSONDropPartitionMessage) deserializer
+                    .getDropPartitionMessage(event.getMessage());
+    String dbName = message.getDB();
+    if ((dbName == null) || dbName.isEmpty()) {
+      LOGGER.error("Drop partition event is missing database name");
+      return;
+    }
+    String tableName = message.getTable();
+    if ((tableName == null) || tableName.isEmpty()) {
+      LOGGER.error("Drop partition event for {} is missing table name", dbName);
+      return;
+    }
+
+    String authName = dbName.toLowerCase() + "." + tableName.toLowerCase();
+
+    List<String> locations = message.getLocations();
+    if (locations == null || locations.isEmpty()) {
+      LOGGER.error("Drop partition event for {} is missing partition locations", authName);
+      return;
+    }
+
+    Set<String> oldLocations = (Set<String>) image.get(authName);
+    if (oldLocations == null) {
+      LOGGER.warn("Add partition for {}: missing table locations",authName);
+      return;
+    }
+
+    // Drop each partition
+    for (String location: locations) {
+      String path = FullUpdateInitializer.pathFromURI(location);
+      if (path != null) {
+        oldLocations.remove(path);
+      }
+    }
+  }
+
+  private static void alterPartition(Map<String, Collection<String>> image, NotificationEvent event,
+                                     MessageDeserializer deserializer) {
+    SentryJSONAlterPartitionMessage message =
+            (SentryJSONAlterPartitionMessage) deserializer
+                    .getAlterPartitionMessage(event.getMessage());
+
+    String dbName = message.getDB();
+    if ((dbName == null) || dbName.isEmpty()) {
+      LOGGER.error("Alter partition event is missing database name");
+      return;
+    }
+    String tableName = message.getTable();
+    if ((tableName == null) || tableName.isEmpty()) {
+      LOGGER.error("Alter partition event for {} is missing table name", dbName);
+      return;
+    }
+
+    String authName = dbName.toLowerCase() + "." + tableName.toLowerCase();
+
+    String prevLocation = message.getOldLocation();
+    if (prevLocation == null || prevLocation.isEmpty()) {
+      LOGGER.error("Alter partition event for {} is missing old location", authName);
+    }
+
+    String prevPath = FullUpdateInitializer.pathFromURI(prevLocation);
+    if (prevPath == null) {
+      return;
+    }
+
+    String newLocation = message.getNewLocation();
+    if (newLocation == null || newLocation.isEmpty()) {
+      LOGGER.error("Alter partition event for {} is missing new location", authName);
+    }
+
+    String newPath = FullUpdateInitializer.pathFromURI(newLocation);
+    if (newPath == null) {
+      return;
+    }
+
+    if (prevPath.equals(newPath)) {
+      LOGGER.warn("Alter partition event for {} has the same old and new path {}",
+              authName, prevPath);
+      return;
+    }
+
+    Set<String> locations = (Set<String>) image.get(authName);
+    if (locations == null) {
+      LOGGER.warn("Missing partition locations for {}", authName);
+      return;
+    }
+
+    // Rename partition
+    if (locations.remove(prevPath)) {
+      LOGGER.debug("Renaming {} to {}", prevPath, newPath);
+      locations.add(newPath);
+    }
+  }
+
+  /**
+   * Walk through the map and rename all instances of oldKey to newKey.
+   */
+  @VisibleForTesting
+  protected static void renamePrefixKeys(Map<String, Collection<String>> image,
+                                         String oldKey, String newKey) {
+    // The trick is that we can't just iterate through the map, remove old values and
+    // insert new values. While we can remove old values with iterators,
+    // we can't insert new ones while we walk. So we collect the keys to be added in
+    // a new map and merge them in the end.
+    Map<String, Set<String>> replacement = new HashMap<>();
+
+    for (Iterator<Map.Entry<String, Collection<String>>> it = image.entrySet().iterator();
+         it.hasNext(); ) {
+      Map.Entry<String, Collection<String>> entry = it.next();
+      String key = entry.getKey();
+      if (key.startsWith(oldKey)) {
+        String updatedKey = key.replaceAll("^" + oldKey + "(.*)", newKey + "$1");
+        if (!image.containsKey(updatedKey)) {
+          LOGGER.debug("Rename {} to {}", key, updatedKey);
+          replacement.put(updatedKey, (Set<String>) entry.getValue());
+          it.remove();
+        } else {
+          LOGGER.warn("skipping key {} - already present", updatedKey);
+        }
+      }
+    }
+
+    mergeMaps(image, replacement);
+  }
+
+  /**
+   * Merge replacement values into the original map but only if they are not
+   * already there.
+   *
+   * @param m1 source map
+   * @param m2 map with replacement values
+   */
+  private static void mergeMaps(Map<String, Collection<String>> m1, Map<String, Set<String>> m2) {
+    // Merge replacement values into the original map but only if they are not
+    // already there
+    for (Map.Entry<String, Set<String>> entry : m2.entrySet()) {
+      if (!m1.containsKey(entry.getKey())) {
+        m1.put(entry.getKey(), entry.getValue());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/GSSCallback.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/GSSCallback.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/GSSCallback.java
new file mode 100644
index 0000000..d2d85d3
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/GSSCallback.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.service.thrift;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.sentry.core.common.exception.ConnectionDeniedException;
+import org.apache.sentry.service.common.ServiceConstants.ServerConfig;
+
+public class GSSCallback extends SaslRpcServer.SaslGssCallbackHandler {
+
+  private final Configuration conf;
+  public GSSCallback(Configuration conf) {
+    super();
+    this.conf = conf;
+  }
+
+  boolean comparePrincipals(String principal1, String principal2) {
+    String[] principalParts1 = SaslRpcServer.splitKerberosName(principal1);
+    String[] principalParts2 = SaslRpcServer.splitKerberosName(principal2);
+    if (principalParts1.length == 0 || principalParts2.length == 0) {
+      return false;
+    }
+    if (principalParts1.length == principalParts2.length) {
+      for (int i=0; i < principalParts1.length; i++) {
+        if (!principalParts1[i].equals(principalParts2[i])) {
+          return false;
+        }
+      }
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  boolean allowConnect(String principal) {
+    String allowedPrincipals = conf.get(ServerConfig.ALLOW_CONNECT);
+    if (allowedPrincipals == null) {
+      return false;
+    }
+    String principalShortName = getShortName(principal);
+    List<String> items = Arrays.asList(allowedPrincipals.split("\\s*,\\s*"));
+    for (String item : items) {
+      if (comparePrincipals(item, principalShortName)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private String getShortName(String principal) {
+    String parts[] = SaslRpcServer.splitKerberosName(principal);
+    return parts[0];
+  }
+
+  @Override
+  public void handle(Callback[] callbacks)
+  throws UnsupportedCallbackException, ConnectionDeniedException {
+    AuthorizeCallback ac = null;
+    for (Callback callback : callbacks) {
+      if (callback instanceof AuthorizeCallback) {
+        ac = (AuthorizeCallback) callback;
+      } else {
+        throw new UnsupportedCallbackException(callback,
+            "Unrecognized SASL GSSAPI Callback");
+      }
+    }
+    if (ac != null) {
+      String authid = ac.getAuthenticationID();
+      String authzid = ac.getAuthorizationID();
+
+      if (allowConnect(authid)) {
+        if (authid.equals(authzid)) {
+          ac.setAuthorized(true);
+        } else {
+          ac.setAuthorized(false);
+        }
+        if (ac.isAuthorized()) {
+          ac.setAuthorizedID(authzid);
+        }
+      } else {
+        throw new ConnectionDeniedException(ac,
+            "Connection to sentry service denied due to lack of client credentials",
+            authid);
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java
new file mode 100644
index 0000000..7831430
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+
+/**
+ * AutoCloseable wrapper around HiveMetaStoreClient.
+ * It is only used to provide try-with-resource semantics for
+ * {@link HiveMetaStoreClient}.
+ */
+public class HMSClient implements AutoCloseable {
+  private final HiveMetaStoreClient client;
+  private boolean valid;
+
+  public HMSClient(HiveMetaStoreClient client) {
+    this.client = Preconditions.checkNotNull(client);
+    valid = true;
+  }
+
+  public HiveMetaStoreClient getClient() {
+    return client;
+  }
+
+  public void invalidate() {
+    if (valid) {
+      client.close();
+      valid = false;
+    }
+  }
+
+  @Override
+  public void close() {
+    if (valid) {
+      client.close();
+      valid = false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollowerState.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollowerState.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollowerState.java
new file mode 100644
index 0000000..74268f7
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollowerState.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the License.  You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.sentry.service.thrift;
+
+/**
+ * States for the HMSFollower
+ */
+public enum HMSFollowerState implements SentryState {
+  /**
+   * If the HMSFollower has been started or not.
+   */
+  STARTED,
+
+  /**
+   * If the HMSFollower is connected to the HMS
+   */
+  CONNECTED;
+
+  /**
+   * The component name this state is for.
+   */
+  public static final String COMPONENT = "HMSFollower";
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public long getValue() {
+    return 1 << this.ordinal();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java
new file mode 100644
index 0000000..62542c3
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import org.apache.hadoop.hive.metastore.api.MetaException;
+
+import java.io.IOException;
+
+public interface HiveConnectionFactory extends AutoCloseable {
+  /**
+   * Open a new connection to HMS.
+   *
+   * @return connection to HMS.
+   * @throws IOException          if connection establishement failed.
+   * @throws InterruptedException if connection establishment was interrupted.
+   * @throws MetaException        if connection establishement failed.
+   */
+  HMSClient connect() throws IOException, InterruptedException, MetaException;
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java
new file mode 100644
index 0000000..93cc34f
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java
@@ -0,0 +1,211 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  <p>
+  http://www.apache.org/licenses/LICENSE-2.0
+  <p>
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.sentry.hdfs.UniquePathsUpdate;
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class used to fetch Hive MetaStore notifications.
+ */
+public final class HiveNotificationFetcher implements AutoCloseable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(HiveNotificationFetcher.class);
+
+  private final SentryStore sentryStore;
+  private final HiveConnectionFactory hmsConnectionFactory;
+  private HiveMetaStoreClient hmsClient;
+
+  /* The following cache and last filtered ID help us to avoid making less calls to the DB */
+  private long lastIdFiltered = 0;
+  private Set<String> cache = new HashSet<>();
+
+  public HiveNotificationFetcher(SentryStore sentryStore, HiveConnectionFactory hmsConnectionFactory) {
+    this.sentryStore = sentryStore;
+    this.hmsConnectionFactory = hmsConnectionFactory;
+  }
+
+  /**
+   * Fetch new HMS notifications appeared since a specified event ID. The returned list may
+   * include notifications with the same specified ID if they were not seen by Sentry.
+   *
+   * @param lastEventId The event ID to use to request notifications.
+   * @return A list of newer notifications unseen by Sentry.
+   * @throws Exception If an error occurs on the HMS communication.
+   */
+  public List<NotificationEvent> fetchNotifications(long lastEventId) throws Exception {
+    return fetchNotifications(lastEventId, Integer.MAX_VALUE);
+  }
+
+  /**
+   * Fetch new HMS notifications appeared since a specified event ID. The returned list may
+   * include notifications with the same specified ID if they were not seen by Sentry.
+   *
+   * @param lastEventId The event ID to use to request notifications.
+   * @param maxEvents The maximum number of events to fetch.
+   * @return A list of newer notifications unseen by Sentry.
+   * @throws Exception If an error occurs on the HMS communication.
+   */
+  List<NotificationEvent> fetchNotifications(long lastEventId, int maxEvents) throws Exception {
+    NotificationFilter filter = null;
+
+    /*
+     * HMS may bring duplicated events that were committed later than the previous request. To bring
+     * those newer duplicated events, we request new notifications from the last seen ID - 1.
+     *
+     * A current problem is that we could miss duplicates committed much more later, but because
+     * HMS does not guarantee the order of those, then it is safer to avoid processing them.
+     *
+     * TODO: We can avoid doing this once HIVE-16886 is fixed.
+     */
+    if (lastEventId > 0) {
+      filter = createNotificationFilterFor(lastEventId);
+      lastEventId--;
+    }
+
+    LOGGER.debug("Requesting HMS notifications since ID = {}", lastEventId);
+
+    NotificationEventResponse response;
+    try {
+      response = getHmsClient().getNextNotification(lastEventId, maxEvents, filter);
+    } catch (Exception e) {
+      close();
+      throw e;
+    }
+
+    if (response != null && response.isSetEvents()) {
+      LOGGER.debug("Fetched {} new HMS notification(s)", response.getEventsSize());
+      return response.getEvents();
+    }
+
+    return Collections.emptyList();
+  }
+
+  /**
+   * Returns a HMS notification filter for a specific notification ID. HMS notifications may
+   * have duplicated IDs, so the filter uses a SHA-1 hash to check for a unique notification.
+   *
+   * @param id the notification ID to filter
+   * @return the HMS notification filter
+   */
+  private NotificationFilter createNotificationFilterFor(final long id) {
+    /*
+     * A SHA-1 hex value that keeps unique notifications processed is persisted on the Sentry DB.
+     * To keep unnecessary calls to the DB, we use a cache that keeps seen hashes of the
+     * specified ID. If a new filter ID is used, then we clean up the cache.
+     */
+
+    if (lastIdFiltered != id) {
+      lastIdFiltered = id;
+      cache.clear();
+    }
+
+    return new NotificationFilter() {
+      @Override
+      public boolean accept(NotificationEvent notificationEvent) {
+        if (notificationEvent.getEventId() == id) {
+          String hash = UniquePathsUpdate.sha1(notificationEvent);
+
+          try {
+            if (cache.contains(hash) || sentryStore.isNotificationProcessed(hash)) {
+              cache.add(hash);
+
+              LOGGER.debug("Ignoring HMS notification already processed: ID = {}", id);
+              return false;
+            }
+          } catch (Exception e) {
+            LOGGER.error("An error occurred while checking if notification {} is already "
+                + "processed: {}", id, e.getMessage());
+
+            // We cannot throw an exception on this filter, so we return false assuming this
+            // notification is already processed
+            return false;
+          }
+        }
+
+        return true;
+      }
+    };
+  }
+
+  /**
+   * Gets the HMS client connection object.
+   * If will create a new connection if no connection object exists.
+   *
+   * @return The HMS client used to communication with the Hive MetaStore.
+   * @throws Exception If it cannot connect to the HMS service.
+   */
+  private HiveMetaStoreClient getHmsClient() throws Exception {
+    if (hmsClient == null) {
+      try {
+        hmsClient = hmsConnectionFactory.connect().getClient();
+      } catch (Exception e) {
+        LOGGER.error("Fail to connect to the HMS service: {}", e.getMessage());
+        throw e;
+      }
+    }
+
+    return hmsClient;
+  }
+
+  /**
+   * @return the latest notification Id logged by the HMS
+   * @throws Exception when an error occurs when talking to the HMS client
+   */
+  public long getCurrentNotificationId() throws Exception {
+    CurrentNotificationEventId eventId;
+    try {
+      eventId = getHmsClient().getCurrentNotificationEventId();
+    } catch (Exception e) {
+      close();
+      throw e;
+    }
+
+    if (eventId != null && eventId.isSetEventId()) {
+      return eventId.getEventId();
+    }
+
+    return SentryStore.EMPTY_NOTIFICATION_ID;
+  }
+
+  /* AutoCloseable implementations */
+
+  @Override
+  public void close() {
+    try {
+      if (hmsClient != null) {
+        hmsClient.close();
+      }
+
+      cache.clear();
+    } finally {
+      hmsClient = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java
new file mode 100644
index 0000000..31e58fd
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import com.google.common.base.Preconditions;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import javax.security.auth.login.LoginException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.sentry.service.common.ServiceConstants.ServerConfig;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Factory used to generate Hive connections.
+ * Supports insecure and Kerberos connections.
+ * For Kerberos connections starts the ticket renewal thread and sets
+ * up Kerberos credentials.
+ */
+public final class HiveSimpleConnectionFactory implements HiveConnectionFactory {
+  private static final Logger LOGGER = LoggerFactory.getLogger(HiveSimpleConnectionFactory.class);
+
+  /** Sentty configuration */
+  private final Configuration conf;
+  /** Hive configuration */
+  private final HiveConf hiveConf;
+  private final boolean insecure;
+  private SentryKerberosContext kerberosContext = null;
+
+  public HiveSimpleConnectionFactory(Configuration sentryConf, HiveConf hiveConf) {
+    this.conf = sentryConf;
+    this.hiveConf = hiveConf;
+    insecure = !ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase(
+        sentryConf.get(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_NONE).trim());
+  }
+
+  /**
+   * Initialize the Factory.
+   * For insecure connections there is nothing to initialize.
+   * For Kerberos connections sets up ticket renewal thread.
+   * @throws IOException
+   * @throws LoginException
+   */
+  public void init() throws IOException, LoginException {
+    if (insecure) {
+      LOGGER.info("Using insecure connection to HMS");
+      return;
+    }
+
+    int port = conf.getInt(ServerConfig.RPC_PORT, ServerConfig.RPC_PORT_DEFAULT);
+    String rawPrincipal = Preconditions.checkNotNull(conf.get(ServerConfig.PRINCIPAL),
+        "%s is required", ServerConfig.PRINCIPAL);
+    String principal = SecurityUtil.getServerPrincipal(rawPrincipal, NetUtils.createSocketAddr(
+        conf.get(ServerConfig.RPC_ADDRESS, ServerConfig.RPC_ADDRESS_DEFAULT),
+        port).getAddress());
+    LOGGER.debug("Opening kerberos connection to HMS using kerberos principal {}", principal);
+    String[] principalParts = SaslRpcServer.splitKerberosName(principal);
+    Preconditions.checkArgument(principalParts.length == 3,
+        "Kerberos principal %s should have 3 parts", principal);
+    String keytab = Preconditions.checkNotNull(conf.get(ServerConfig.KEY_TAB),
+        "Configuration is missing required %s paraeter", ServerConfig.KEY_TAB);
+    File keytabFile = new File(keytab);
+    Preconditions.checkState(keytabFile.isFile() && keytabFile.canRead(),
+        "Keytab %s does not exist or is not readable", keytab);
+    // Instantiating SentryKerberosContext in non-server mode handles the ticket renewal.
+    kerberosContext = new SentryKerberosContext(principal, keytab, false);
+    UserGroupInformation.setConfiguration(conf);
+    LOGGER.info("Using secure connection to HMS");
+  }
+
+  /**
+   * Connect to HMS in unsecure mode or in Kerberos mode according to config.
+   *
+   * @return HMS connection
+   * @throws IOException          if could not establish connection
+   * @throws InterruptedException if connection was interrupted
+   * @throws MetaException        if other errors happened
+   */
+  public HMSClient connect() throws IOException, InterruptedException, MetaException {
+    if (insecure) {
+      return new HMSClient(new HiveMetaStoreClient(hiveConf));
+    }
+    UserGroupInformation clientUGI =
+        UserGroupInformation.getUGIFromSubject(kerberosContext.getSubject());
+    return new HMSClient(clientUGI.doAs(new PrivilegedExceptionAction<HiveMetaStoreClient>() {
+      @Override
+      public HiveMetaStoreClient run() throws MetaException {
+        return new HiveMetaStoreClient(hiveConf);
+      }
+    }));
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (kerberosContext != null) {
+      kerberosContext.shutDown();
+      kerberosContext = null;
+    }
+  }
+}