You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by sp...@apache.org on 2018/05/31 03:32:13 UTC
[35/86] sentry git commit: Revert "SENTRY-2208: Refactor out Sentry
service into own module from sentry-provider-db (Anthony Young-Garner,
reviewed by Sergio Pena, Steve Moist, Na Li)"
http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStoreSchemaInfo.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStoreSchemaInfo.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStoreSchemaInfo.java
new file mode 100644
index 0000000..ce76a46
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStoreSchemaInfo.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.provider.db.service.persistent;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.sentry.core.common.exception.SentryUserException;
+
+public class SentryStoreSchemaInfo {
+ private static final String SQL_FILE_EXTENSION = ".sql";
+ private static final String UPGRADE_FILE_PREFIX = "upgrade-";
+ private static final String INIT_FILE_PREFIX = "sentry-";
+ private static final String VERSION_UPGRADE_LIST = "upgrade.order";
+ private final String dbType;
+ private final String sentrySchemaVersions[];
+ private final String sentryScriptDir;
+
+ private static final String SENTRY_VERSION = "2.1.0";
+
+ public SentryStoreSchemaInfo(String sentryScriptDir, String dbType)
+ throws SentryUserException {
+ this.sentryScriptDir = sentryScriptDir;
+ this.dbType = dbType;
+ // load upgrade order for the given dbType
+ List<String> upgradeOrderList = new ArrayList<String>();
+ String upgradeListFile = getSentryStoreScriptDir() + File.separator
+ + VERSION_UPGRADE_LIST + "." + dbType;
+ try (BufferedReader bfReader = new BufferedReader(new FileReader(upgradeListFile))) {
+ String currSchemaVersion;
+ while ((currSchemaVersion = bfReader.readLine()) != null) {
+ upgradeOrderList.add(currSchemaVersion.trim());
+ }
+ } catch (FileNotFoundException e) {
+ throw new SentryUserException("File " + upgradeListFile + " not found ", e);
+ } catch (IOException e) {
+ throw new SentryUserException("Error reading " + upgradeListFile, e);
+ }
+ sentrySchemaVersions = upgradeOrderList.toArray(new String[0]);
+ }
+
+ public String getSentrySchemaVersion() {
+ return SENTRY_VERSION;
+ }
+
+ public List<String> getUpgradeScripts(String fromSchemaVer)
+ throws SentryUserException {
+ List<String> upgradeScriptList = new ArrayList<String>();
+
+ // check if we are already at current schema level
+ if (getSentryVersion().equals(fromSchemaVer)) {
+ return upgradeScriptList;
+ }
+
+ // Find the list of scripts to execute for this upgrade
+ int firstScript = sentrySchemaVersions.length;
+ for (int i = 0; i < sentrySchemaVersions.length; i++) {
+ String fromVersion = sentrySchemaVersions[i].split("-to-")[0];
+ if (fromVersion.equals(fromSchemaVer)) {
+ firstScript = i;
+ break;
+ }
+ }
+ if (firstScript == sentrySchemaVersions.length) {
+ throw new SentryUserException("Unknown version specified for upgrade "
+ + fromSchemaVer + " Metastore schema may be too old or newer");
+ }
+
+ for (int i = firstScript; i < sentrySchemaVersions.length; i++) {
+ String scriptFile = generateUpgradeFileName(sentrySchemaVersions[i]);
+ upgradeScriptList.add(scriptFile);
+ }
+ return upgradeScriptList;
+ }
+
+ /***
+ * Get the name of the script to initialize the schema for given version
+ *
+ * @param toVersion
+ * Target version. If it's null, then the current server version is
+ * used
+ * @return
+ * @throws SentryUserException
+ */
+ public String generateInitFileName(String toVersion)
+ throws SentryUserException {
+ String version = toVersion;
+ if (version == null) {
+ version = getSentryVersion();
+ }
+ String initScriptName = INIT_FILE_PREFIX + dbType + "-" + version
+ + SQL_FILE_EXTENSION;
+ // check if the file exists
+ if (!(new File(getSentryStoreScriptDir() + File.separatorChar
+ + initScriptName).exists())) {
+ throw new SentryUserException(
+ "Unknown version specified for initialization: " + version);
+ }
+ return initScriptName;
+ }
+
+ /**
+ * Find the directory of sentry store scripts
+ *
+ * @return
+ */
+ public String getSentryStoreScriptDir() {
+ return sentryScriptDir;
+ }
+
+ // format the upgrade script name eg upgrade-x-y-dbType.sql
+ private String generateUpgradeFileName(String fileVersion) {
+ return INIT_FILE_PREFIX + UPGRADE_FILE_PREFIX + dbType + "-"
+ + fileVersion + SQL_FILE_EXTENSION;
+ }
+
+ // Current hive version, in majorVersion.minorVersion.changeVersion format
+ // TODO: store the version using the build script
+ public static String getSentryVersion() {
+ return SENTRY_VERSION;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionBlock.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionBlock.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionBlock.java
new file mode 100644
index 0000000..6ad52a3
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionBlock.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.provider.db.service.persistent;
+
+import javax.jdo.PersistenceManager;
+
+/**
+ * TransactionBlock wraps the code that is executed inside a single
+ * transaction. The {@link #execute(PersistenceManager)} method returns the
+ * result of the transaction.
+ */
+@FunctionalInterface
+public interface TransactionBlock<T> {
+ /**
+ * Execute some code as a single transaction, the code should not start new
+ * transaction or manipulate transactions with the PersistenceManager.
+ *
+ * @param pm PersistenceManager for the current transaction
+ * @return Object with the result of execute()
+ * @throws Exception
+ */
+ T execute(PersistenceManager pm) throws Exception;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java
new file mode 100644
index 0000000..ba6e845
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.provider.db.service.persistent;
+
+import com.codahale.metrics.Counter;
+import static com.codahale.metrics.MetricRegistry.name;
+import com.codahale.metrics.Timer;
+
+import com.codahale.metrics.Timer.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.core.common.exception.SentryUserException;
+import org.apache.sentry.service.common.ServiceConstants.ServerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jdo.PersistenceManager;
+import javax.jdo.PersistenceManagerFactory;
+import javax.jdo.Transaction;
+
+import org.apache.sentry.api.service.thrift.SentryMetrics;
+
+import java.util.Random;
+import java.util.concurrent.Callable;
+
+/**
+ * TransactionManager is used for executing the database transaction, it supports
+ * the transaction with retry mechanism for the unexpected exceptions,
+ * except <em>SentryUserExceptions</em>, eg, <em>SentryNoSuchObjectException</em>,
+ * <em>SentryAlreadyExistsException</em> etc. For <em>SentryUserExceptions</em>,
+ * will simply throw the exception without retry<p>
+ *
+ * The purpose of the class is to separate all transaction housekeeping (opening
+ * transaction, rolling back failed transactions) from the actual transaction
+ * business logic.<p>
+ *
+ * TransactionManager creates an instance of PersistenceManager for each
+ * transaction.<p>
+ *
+ * TransactionManager exposes several metrics:
+ * <ul>
+ * <li>Timer metric for all transactions</li>
+ * <li>Counter for failed transactions</li>
+ * <li>Counter for each exception thrown by transaction</li>
+ * </ul>
+ */
+@SuppressWarnings("NestedTryStatement")
+public final class TransactionManager {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(TransactionManager.class);
+
+ /** Random number generator for exponential backoff */
+ private static final Random random = new Random();
+
+ private final PersistenceManagerFactory pmf;
+
+ // Maximum number of retries per call
+ private final int transactionRetryMax;
+
+ // Delay (in milliseconds) between retries
+ private final int retryWaitTimeMills;
+
+ /** Name for metrics */
+ private static final String TRANSACTIONS = "transactions";
+
+ // Transaction timer measures time distribution for all transactions
+ private final Timer transactionTimer =
+ SentryMetrics.getInstance().
+ getTimer(name(TransactionManager.class,
+ TRANSACTIONS));
+
+ // Counter for failed transactions
+ private final Counter failedTransactionsCount =
+ SentryMetrics.getInstance().
+ getCounter(name(TransactionManager.class,
+ TRANSACTIONS, "failed"));
+
+ private final Counter retryCount =
+ SentryMetrics.getInstance().getCounter(name(TransactionManager.class,
+ TRANSACTIONS, "retry"));
+
+ TransactionManager(PersistenceManagerFactory pmf, Configuration conf) {
+ this.pmf = pmf;
+ transactionRetryMax = conf.getInt(
+ ServerConfig.SENTRY_STORE_TRANSACTION_RETRY,
+ ServerConfig.SENTRY_STORE_TRANSACTION_RETRY_DEFAULT);
+ retryWaitTimeMills = conf.getInt(
+ ServerConfig.SENTRY_STORE_TRANSACTION_RETRY_WAIT_TIME_MILLIS,
+ ServerConfig.SENTRY_STORE_TRANSACTION_RETRY_WAIT_TIME_MILLIS_DEFAULT);
+ }
+
+
+ /**
+ * Execute some code as a single transaction, the code in tb.execute()
+ * should not start new transaction or manipulate transactions with the
+ * PersistenceManager.
+ *
+ * @param tb transaction block with code to be executed
+ * @return Object with the result of tb.execute()
+ */
+ public <T> T executeTransaction(TransactionBlock<T> tb) throws Exception {
+ try (Context context = transactionTimer.time();
+ PersistenceManager pm = pmf.getPersistenceManager()) {
+ Transaction transaction = pm.currentTransaction();
+ transaction.begin();
+ try {
+ T result = tb.execute(pm);
+ transaction.commit();
+ return result;
+ } catch (Exception e) {
+ // Count total failed transactions
+ failedTransactionsCount.inc();
+ // Count specific exceptions
+ SentryMetrics.getInstance().getCounter(name(TransactionManager.class,
+ "exception", e.getClass().getSimpleName())).inc();
+ // Re-throw the exception
+ throw e;
+ } finally {
+ if (transaction.isActive()) {
+ transaction.rollback();
+ }
+ }
+ }
+ }
+
+ /**
+ * Execute a list of TransactionBlock code as a single transaction.
+ * The code in tb.execute() should not start new transaction or
+ * manipulate transactions with the PersistenceManager. It returns
+ * the result of the last transaction block execution.
+ *
+ * @param tbs transaction blocks with code to be executed
+ * @return the result of the last result of tb.execute()
+ */
+ private <T> T executeTransaction(Iterable<TransactionBlock<T>> tbs) throws Exception {
+ try (Context context = transactionTimer.time();
+ PersistenceManager pm = pmf.getPersistenceManager()) {
+ Transaction transaction = pm.currentTransaction();
+ transaction.begin();
+ try {
+ T result = null;
+ for (TransactionBlock<T> tb : tbs) {
+ result = tb.execute(pm);
+ }
+ transaction.commit();
+ return result;
+ } catch (Exception e) {
+ // Count total failed transactions
+ failedTransactionsCount.inc();
+ // Count specific exceptions
+ SentryMetrics.getInstance().getCounter(name(TransactionManager.class,
+ "exception", e.getClass().getSimpleName())).inc();
+ // Re-throw the exception
+ throw e;
+ } finally {
+ if (transaction.isActive()) {
+ transaction.rollback();
+ }
+ }
+ }
+ }
+
+ /**
+ * Execute some code as a single transaction with retry mechanism.
+ *
+ * @param tb transaction block with code to execute
+ * @return Object with the result of tb.execute()
+ */
+ @SuppressWarnings("squid:S00112")
+ public <T> T executeTransactionWithRetry(final TransactionBlock<T> tb)
+ throws Exception {
+ return new ExponentialBackoff().execute(
+ new Callable<T>() {
+ @Override
+ public T call() throws Exception {
+ return executeTransaction(tb);
+ }
+ }
+ );
+ }
+
+ /**
+ * Execute a list of TransactionBlock code as a single transaction.
+ * If any of the TransactionBlock fail, all the TransactionBlocks would
+ * retry. It returns the result of the last transaction block
+ * execution.
+ *
+ * @param tbs a list of transaction blocks with code to be executed.
+ */
+ @SuppressWarnings("squid:S00112")
+ <T> void executeTransactionBlocksWithRetry(final Iterable<TransactionBlock<T>> tbs)
+ throws Exception {
+ new ExponentialBackoff().execute(
+ new Callable<T>() {
+ @Override
+ public T call() throws Exception {
+ return executeTransaction(tbs);
+ }
+ }
+ );
+ }
+
+ /**
+ * Implementation of exponential backoff with random fuzziness.
+ * On each iteration the backoff time is 1.5 the previous amount plus the
+ * random fuzziness factor which is up to half of the previous amount.
+ */
+ private class ExponentialBackoff {
+
+ @SuppressWarnings("squid:S00112")
+ <T> T execute(Callable<T> arg) throws Exception {
+ Exception ex = null;
+ long sleepTime = retryWaitTimeMills;
+
+ for (int retryNum = 1; retryNum <= transactionRetryMax; retryNum++) {
+ try {
+ return arg.call();
+ } catch (SentryUserException e) {
+ // throw the sentry exception without retry
+ LOGGER.warn("Transaction manager encountered non-retriable exception", e);
+ throw e;
+ } catch (Exception e) {
+ ex = e;
+ retryCount.inc();
+ LOGGER.warn("Transaction execution encountered exception", e);
+ LOGGER.warn("Retrying transaction {}/{} times",
+ retryNum, transactionRetryMax);
+ // Introduce some randomness in the backoff time.
+ LOGGER.warn("Sleeping for {} milliseconds before retrying", sleepTime);
+ Thread.sleep(sleepTime);
+ int fuzz = random.nextInt((int)sleepTime / 2);
+ sleepTime *= 3;
+ sleepTime /= 2;
+ sleepTime += fuzz;
+ }
+ }
+ assert(ex != null);
+ String message = "The transaction has reached max retry number, "
+ + ex.getMessage();
+ LOGGER.error(message, ex);
+ throw new Exception(message, ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java
new file mode 100644
index 0000000..992d8ab
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java
@@ -0,0 +1,524 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.service.thrift;
+
+import com.codahale.metrics.Counter;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.sentry.hdfs.PathsUpdate;
+import org.apache.sentry.hdfs.SentryMalformedPathException;
+import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
+import org.apache.sentry.api.service.thrift.SentryMetrics;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import static com.codahale.metrics.MetricRegistry.name;
+
+/**
+ * Manage fetching full snapshot from HMS.
+ * Snapshot is represented as a map from the hive object name to
+ * the set of paths for this object.
+ * The hive object name is either the Hive database name or
+ * Hive database name joined with Hive table name as {@code dbName.tableName}.
+ * All table partitions are stored under the table object.
+ * <p>
+ * Once {@link FullUpdateInitializer}, the {@link FullUpdateInitializer#getFullHMSSnapshot()}
+ * method should be called to get the initial update.
+ * <p>
+ * It is important to close the {@link FullUpdateInitializer} object to prevent resource
+ * leaks.
+ * <p>
+ * The usual way of using {@link FullUpdateInitializer} is
+ * <pre>
+ * {@code
+ * try (FullUpdateInitializer updateInitializer =
+ * new FullUpdateInitializer(clientFactory, authzConf)) {
+ * Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot();
+ * return pathsUpdate;
+ * }
+ */
+public final class FullUpdateInitializer implements AutoCloseable {
+
+ /*
+ * Implementation note.
+ *
+ * The snapshot is obtained using an executor. We follow the map/reduce model.
+ * Each executor thread (mapper) obtains and returns a partial snapshot which are then
+ * reduced to a single combined snapshot by getFullHMSSnapshot().
+ *
+ * Synchronization between the getFullHMSSnapshot() and executors is done using the
+ * 'results' queue. The queue holds the futures for each scheduled task.
+ * It is initially populated by getFullHMSSnapshot and each task may add new future
+ * results to it. Only getFullHMSSnapshot() removes entries from the results queue.
+ * This guarantees that once the results queue is empty there are no pending jobs.
+ *
+ * Since there are no other data sharing, the implementation is safe without
+ * any other synchronization. It is not thread-safe for concurrent calls
+ * to getFullHMSSnapshot().
+ *
+ */
+
+
+ private static final String FULL_UPDATE_INITIALIZER_THREAD_NAME = "hms-fetch-%d";
+ private final ExecutorService threadPool;
+ private final int maxPartitionsPerCall;
+ private final int maxTablesPerCall;
+ private final Deque<Future<CallResult>> results = new ConcurrentLinkedDeque<>();
+ private final int maxRetries;
+ private final int waitDurationMillis;
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(FullUpdateInitializer.class);
+
+ private static final ObjectMapping emptyObjectMapping =
+ new ObjectMapping(Collections.<String, Set<String>>emptyMap());
+ private final HiveConnectionFactory clientFactory;
+
+ /** Total number of database objects */
+ private final Counter databaseCount = SentryMetrics.getInstance()
+ .getCounter(name(FullUpdateInitializer.class, "total", "db"));
+
+ /** Total number of table objects */
+ private final Counter tableCount = SentryMetrics.getInstance()
+ .getCounter(name(FullUpdateInitializer.class, "total", "tables"));
+
+ /** Total number of partition objects */
+ private final Counter partitionCount = SentryMetrics.getInstance()
+ .getCounter(name(FullUpdateInitializer.class, "total", "partitions"));
+
+ /**
+ * Extract path (not starting with "/") from the full URI
+ * @param uri - resource URI (usually with scheme)
+ * @return path if uri is valid or null
+ */
+ static String pathFromURI(String uri) {
+ try {
+ return PathsUpdate.parsePath(uri);
+ } catch (SentryMalformedPathException e) {
+ LOGGER.warn(String.format("Ignoring invalid uri %s: %s",
+ uri, e.getReason()));
+ return null;
+ }
+ }
+
+ /**
+ * Mapping of object to set of paths.
+ * Used to represent partial results from executor threads. Multiple
+ * ObjectMapping objects are combined in a single mapping
+ * to get the final result.
+ */
+ private static final class ObjectMapping {
+ private final Map<String, Set<String>> objects;
+
+ ObjectMapping(Map<String, Set<String>> objects) {
+ this.objects = objects;
+ }
+
+ ObjectMapping(String authObject, String path) {
+ Set<String> values = Collections.singleton(safeIntern(path));
+ objects = ImmutableMap.of(authObject, values);
+ }
+
+ ObjectMapping(String authObject, Collection<String> paths) {
+ Set<String> values = new HashSet<>(paths);
+ objects = ImmutableMap.of(authObject, values);
+ }
+
+ Map<String, Set<String>> getObjects() {
+ return objects;
+ }
+ }
+
+ private static final class CallResult {
+ private final Exception failure;
+ private final boolean successStatus;
+ private final ObjectMapping objectMapping;
+
+ CallResult(Exception ex) {
+ failure = ex;
+ successStatus = false;
+ objectMapping = emptyObjectMapping;
+ }
+
+ CallResult(ObjectMapping objectMapping) {
+ failure = null;
+ successStatus = true;
+ this.objectMapping = objectMapping;
+ }
+
+ boolean success() {
+ return successStatus;
+ }
+
+ ObjectMapping getObjectMapping() {
+ return objectMapping;
+ }
+
+ public Exception getFailure() {
+ return failure;
+ }
+ }
+
+ private abstract class BaseTask implements Callable<CallResult> {
+
+ /**
+ * Class represents retry strategy for BaseTask.
+ */
+ private final class RetryStrategy {
+ private int retryStrategyMaxRetries = 0;
+ private final int retryStrategyWaitDurationMillis;
+
+ private RetryStrategy(int retryStrategyMaxRetries, int retryStrategyWaitDurationMillis) {
+ this.retryStrategyMaxRetries = retryStrategyMaxRetries;
+
+ // Assign default wait duration if negative value is provided.
+ this.retryStrategyWaitDurationMillis = (retryStrategyWaitDurationMillis > 0) ?
+ retryStrategyWaitDurationMillis : 1000;
+ }
+
+ @SuppressWarnings({"squid:S1141", "squid:S2142"})
+ public CallResult exec() {
+ // Retry logic is happening inside callable/task to avoid
+ // synchronous waiting on getting the result.
+ // Retry the failure task until reach the max retry number.
+ // Wait configurable duration for next retry.
+ //
+ // Only thrift exceptions are retried.
+ // Other exceptions are propagated up the stack.
+ Exception exception = null;
+ try {
+ // We catch all exceptions except Thrift exceptions which are retried
+ for (int i = 0; i < retryStrategyMaxRetries; i++) {
+ //noinspection NestedTryStatement
+ try {
+ return new CallResult(doTask());
+ } catch (TException ex) {
+ LOGGER.debug("Failed to execute task on " + (i + 1) + " attempts." +
+ " Sleeping for " + retryStrategyWaitDurationMillis + " ms. Exception: " +
+ ex.toString(), ex);
+ exception = ex;
+
+ try {
+ Thread.sleep(retryStrategyWaitDurationMillis);
+ } catch (InterruptedException ignored) {
+ // Skip the rest retries if get InterruptedException.
+ // And set the corresponding retries number.
+ LOGGER.warn("Interrupted during update fetch during iteration " + (i + 1));
+ break;
+ }
+ }
+ }
+ } catch (Exception ex) {
+ exception = ex;
+ }
+ LOGGER.error("Failed to execute task", exception);
+ // We will fail in the end, so we are shutting down the pool to prevent
+ // new tasks from being scheduled.
+ threadPool.shutdown();
+ return new CallResult(exception);
+ }
+ }
+
+ private final RetryStrategy retryStrategy;
+
+ BaseTask() {
+ retryStrategy = new RetryStrategy(maxRetries, waitDurationMillis);
+ }
+
+ @Override
+ public CallResult call() throws Exception {
+ return retryStrategy.exec();
+ }
+
+ abstract ObjectMapping doTask() throws Exception;
+ }
+
+ private class PartitionTask extends BaseTask {
+ private final String dbName;
+ private final String tblName;
+ private final String authName;
+ private final List<String> partNames;
+
+ PartitionTask(String dbName, String tblName, String authName,
+ List<String> partNames) {
+ this.dbName = safeIntern(dbName);
+ this.tblName = safeIntern(tblName);
+ this.authName = safeIntern(authName);
+ this.partNames = partNames;
+ }
+
+ @Override
+ ObjectMapping doTask() throws Exception {
+ List<Partition> tblParts;
+ HMSClient c = null;
+ try (HMSClient client = clientFactory.connect()) {
+ c = client;
+ tblParts = client.getClient().getPartitionsByNames(dbName, tblName, partNames);
+ } catch (Exception e) {
+ if (c != null) {
+ c.invalidate();
+ }
+ throw e;
+ }
+
+ LOGGER.debug("Fetched partitions for db = {}, table = {}",
+ dbName, tblName);
+
+ Collection<String> partitionNames = new ArrayList<>(tblParts.size());
+ for (Partition part : tblParts) {
+ String partPath = pathFromURI(part.getSd().getLocation());
+ if (partPath != null) {
+ partitionNames.add(partPath.intern());
+ }
+ }
+ return new ObjectMapping(authName, partitionNames);
+ }
+ }
+
+ private class TableTask extends BaseTask {
+ private final String dbName;
+ private final List<String> tableNames;
+
+ TableTask(Database db, List<String> tableNames) {
+ dbName = safeIntern(db.getName());
+ this.tableNames = tableNames;
+ }
+
+ @Override
+ @SuppressWarnings({"squid:S2629", "squid:S135"})
+ ObjectMapping doTask() throws Exception {
+ HMSClient c = null;
+ try (HMSClient client = clientFactory.connect()) {
+ c = client;
+ List<Table> tables = client.getClient().getTableObjectsByName(dbName, tableNames);
+
+ LOGGER.debug("Fetching tables for db = {}, tables = {}", dbName, tableNames);
+
+ Map<String, Set<String>> objectMapping = new HashMap<>(tables.size());
+ for (Table tbl : tables) {
+ // Table names are case insensitive
+ if (!tbl.getDbName().equalsIgnoreCase(dbName)) {
+ // Inconsistency in HMS data
+ LOGGER.warn(String.format("DB name %s for table %s does not match %s",
+ tbl.getDbName(), tbl.getTableName(), dbName));
+ continue;
+ }
+
+ String tableName = safeIntern(tbl.getTableName().toLowerCase());
+ String authzObject = (dbName + "." + tableName).intern();
+ List<String> tblPartNames =
+ client.getClient().listPartitionNames(dbName, tableName, (short) -1);
+ // Count total number of partitions
+ partitionCount.inc(tblPartNames.size());
+ for (int i = 0; i < tblPartNames.size(); i += maxPartitionsPerCall) {
+ List<String> partsToFetch = tblPartNames.subList(i,
+ Math.min(i + maxPartitionsPerCall, tblPartNames.size()));
+ Callable<CallResult> partTask = new PartitionTask(dbName,
+ tableName, authzObject, partsToFetch);
+ results.add(threadPool.submit(partTask));
+ }
+ String tblPath = safeIntern(pathFromURI(tbl.getSd().getLocation()));
+ if (tblPath == null) {
+ continue;
+ }
+ Set<String> paths = objectMapping.get(authzObject);
+ if (paths == null) {
+ paths = new HashSet<>(1);
+ objectMapping.put(authzObject, paths);
+ }
+ paths.add(tblPath);
+ }
+ return new ObjectMapping(Collections.unmodifiableMap(objectMapping));
+ } catch (Exception e) {
+ if (c != null) {
+ c.invalidate();
+ }
+ throw e;
+ }
+ }
+ }
+
+ private class DbTask extends BaseTask {
+
+ private final String dbName;
+
+ DbTask(String dbName) {
+ //Database names are case insensitive
+ this.dbName = safeIntern(dbName.toLowerCase());
+ databaseCount.inc();
+ }
+
+ @Override
+ ObjectMapping doTask() throws Exception {
+ HMSClient c = null;
+ try (HMSClient client = clientFactory.connect()) {
+ c = client;
+ Database db = client.getClient().getDatabase(dbName);
+ if (!dbName.equalsIgnoreCase(db.getName())) {
+ LOGGER.warn("Database name {} does not match {}", db.getName(), dbName);
+ return emptyObjectMapping;
+ }
+ List<String> allTblStr = client.getClient().getAllTables(dbName);
+ // Count total number of tables
+ tableCount.inc(allTblStr.size());
+ for (int i = 0; i < allTblStr.size(); i += maxTablesPerCall) {
+ List<String> tablesToFetch = allTblStr.subList(i,
+ Math.min(i + maxTablesPerCall, allTblStr.size()));
+ Callable<CallResult> tableTask = new TableTask(db, tablesToFetch);
+ results.add(threadPool.submit(tableTask));
+ }
+ String dbPath = safeIntern(pathFromURI(db.getLocationUri()));
+ return (dbPath != null) ? new ObjectMapping(dbName, dbPath) :
+ emptyObjectMapping;
+ } catch (Exception e) {
+ if (c != null) {
+ c.invalidate();
+ }
+ throw e;
+ }
+ }
+ }
+
+ FullUpdateInitializer(HiveConnectionFactory clientFactory, Configuration conf) {
+ this.clientFactory = clientFactory;
+ maxPartitionsPerCall = conf.getInt(
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC,
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC_DEFAULT);
+ maxTablesPerCall = conf.getInt(
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC,
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC_DEFAULT);
+ maxRetries = conf.getInt(
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM,
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM_DEFAULT);
+ waitDurationMillis = conf.getInt(
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS,
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS_DEFAULT);
+
+ ThreadFactory fullUpdateInitThreadFactory = new ThreadFactoryBuilder()
+ .setNameFormat(FULL_UPDATE_INITIALIZER_THREAD_NAME)
+ .setDaemon(false)
+ .build();
+ threadPool = Executors.newFixedThreadPool(conf.getInt(
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS,
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS_DEFAULT),
+ fullUpdateInitThreadFactory);
+ }
+
+ /**
+ * Get Full HMS snapshot.
+ * @return Full snapshot of HMS objects.
+ * @throws TException if Thrift error occured
+ * @throws ExecutionException if there was a scheduling error
+ * @throws InterruptedException if processing was interrupted
+ */
+ @SuppressWarnings("squid:S00112")
+ Map<String, Collection<String>> getFullHMSSnapshot() throws Exception {
+ // Get list of all HMS databases
+ List<String> allDbStr;
+ HMSClient c = null;
+ try (HMSClient client = clientFactory.connect()) {
+ c = client;
+ allDbStr = client.getClient().getAllDatabases();
+ } catch (Exception e) {
+ if (c != null) {
+ c.invalidate();
+ }
+ throw e;
+ }
+
+ // Schedule async task for each database responsible for fetching per-database
+ // objects.
+ for (String dbName : allDbStr) {
+ results.add(threadPool.submit(new DbTask(dbName)));
+ }
+
+ // Resulting full snapshot
+ Map<String, Collection<String>> fullSnapshot = new HashMap<>();
+
+ // As async tasks complete, merge their results into full snapshot.
+ while (!results.isEmpty()) {
+ // This is the only thread that takes elements off the results list - all other threads
+ // only add to it. Once the list is empty it can't become non-empty
+ // This means that if we check that results is non-empty we can safely call pop() and
+ // know that the result of poll() is not null.
+ Future<CallResult> result = results.pop();
+ // Wait for the task to complete
+ CallResult callResult = result.get();
+ // Fail if we got errors
+ if (!callResult.success()) {
+ throw callResult.getFailure();
+ }
+ // Merge values into fullUpdate
+ Map<String, Set<String>> objectMapping =
+ callResult.getObjectMapping().getObjects();
+ for (Map.Entry<String, Set<String>> entry: objectMapping.entrySet()) {
+ String key = entry.getKey();
+ Set<String> val = entry.getValue();
+ Set<String> existingSet = (Set<String>)fullSnapshot.get(key);
+ if (existingSet == null) {
+ fullSnapshot.put(key, val);
+ continue;
+ }
+ existingSet.addAll(val);
+ }
+ }
+ return fullSnapshot;
+ }
+
+ @Override
+ public void close() {
+ threadPool.shutdownNow();
+ try {
+ threadPool.awaitTermination(1, TimeUnit.SECONDS);
+ } catch (InterruptedException ignored) {
+ LOGGER.warn("Interrupted shutdown");
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Intern a string but only if it is not null
+ * @param arg String to be interned, may be null
+ * @return interned string or null
+ */
+ static String safeIntern(String arg) {
+ return (arg != null) ? arg.intern() : null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateModifier.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateModifier.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateModifier.java
new file mode 100644
index 0000000..c30d982
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateModifier.java
@@ -0,0 +1,606 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAddPartitionMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterPartitionMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterTableMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateDatabaseMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateTableMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropDatabaseMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropPartitionMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropTableMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Apply newer events to the full update.
+ *
+ * <p>The process of obtaining ful snapshot from HMS is not atomic.
+ * While we read information from HMS it may change - some new objects can be created,
+ * or some can be removed or modified. This class is used to reconsile changes to
+ * the full snapshot.
+ */
+final class FullUpdateModifier {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FullUpdateModifier.class);
+
+ // Prevent creation of class instances
+ private FullUpdateModifier() {
+ }
+
+ /**
+ * Take a full snapshot and apply an MS event to it.
+ *
+ * <p>We pass serializer as a parameter to simplify testing.
+ *
+ * @param image Full snapshot
+ * @param event HMS notificatin event
+ * @param deserializer Message deserializer -
+ * should produce Sentry JSON serializer type messages.
+ */
+ // NOTE: we pass deserializer here instead of using built-in one to simplify testing.
+ // Tests use mock serializers and thus we do not have to construct proper events.
+ static void applyEvent(Map<String, Collection<String>> image, NotificationEvent event,
+ MessageDeserializer deserializer) {
+ EventMessage.EventType eventType =
+ EventMessage.EventType.valueOf(event.getEventType());
+
+ switch (eventType) {
+ case CREATE_DATABASE:
+ createDatabase(image, event, deserializer);
+ break;
+ case DROP_DATABASE:
+ dropDatabase(image, event, deserializer);
+ break;
+ case CREATE_TABLE:
+ createTable(image, event, deserializer);
+ break;
+ case DROP_TABLE:
+ dropTable(image, event, deserializer);
+ break;
+ case ALTER_TABLE:
+ alterTable(image, event, deserializer);
+ break;
+ case ADD_PARTITION:
+ addPartition(image, event, deserializer);
+ break;
+ case DROP_PARTITION:
+ dropPartition(image, event, deserializer);
+ break;
+ case ALTER_PARTITION:
+ alterPartition(image, event, deserializer);
+ break;
+ default:
+ LOGGER.error("Notification with ID:{} has invalid event type: {}", event.getEventId(),
+ event.getEventType());
+ break;
+ }
+ }
+
+ /**
+ * Add mapping from the new database name to location {dbname: {location}}.
+ */
+ private static void createDatabase(Map<String, Collection<String>> image, NotificationEvent event,
+ MessageDeserializer deserializer) {
+ SentryJSONCreateDatabaseMessage message =
+ (SentryJSONCreateDatabaseMessage) deserializer
+ .getCreateDatabaseMessage(event.getMessage());
+
+ String dbName = message.getDB();
+ if ((dbName == null) || dbName.isEmpty()) {
+ LOGGER.error("Create database event is missing database name");
+ return;
+ }
+ dbName = dbName.toLowerCase();
+
+ String location = message.getLocation();
+ if ((location == null) || location.isEmpty()) {
+ LOGGER.error("Create database event is missing database location");
+ return;
+ }
+
+ String path = FullUpdateInitializer.pathFromURI(location);
+ if (path == null) {
+ return;
+ }
+
+ // Add new database if it doesn't exist yet
+ if (!image.containsKey(dbName)) {
+ LOGGER.debug("create database {} with location {}", dbName, location);
+ image.put(dbName.intern(), Collections.singleton(path));
+ } else {
+ // Sanity check the information and print warnings if database exists but
+ // with a different location
+ Set<String> oldLocations = (Set<String>)image.get(dbName);
+ LOGGER.debug("database {} already exists, ignored", dbName);
+ if (!oldLocations.contains(location)) {
+ LOGGER.warn("database {} exists but location is different from {}", dbName, location);
+ }
+ }
+ }
+
+ /**
+ * Remove a mapping from database name and remove all mappings which look like dbName.tableName
+ * where dbName matches database name.
+ */
+ private static void dropDatabase(Map<String, Collection<String>> image, NotificationEvent event,
+ MessageDeserializer deserializer) {
+ SentryJSONDropDatabaseMessage message =
+ (SentryJSONDropDatabaseMessage) deserializer.getDropDatabaseMessage(event.getMessage());
+
+ String dbName = message.getDB();
+ if ((dbName == null) || dbName.isEmpty()) {
+ LOGGER.error("Drop database event is missing database name");
+ return;
+ }
+ dbName = dbName.toLowerCase();
+ String location = message.getLocation();
+ if ((location == null) || location.isEmpty()) {
+ LOGGER.error("Drop database event is missing database location");
+ return;
+ }
+
+ String path = FullUpdateInitializer.pathFromURI(location);
+ if (path == null) {
+ return;
+ }
+
+ // If the database is alreday deleted, we have nothing to do
+ Set<String> locations = (Set<String>)image.get(dbName);
+ if (locations == null) {
+ LOGGER.debug("database {} is already deleted", dbName);
+ return;
+ }
+
+ if (!locations.contains(path)) {
+ LOGGER.warn("Database {} location does not match {}", dbName, path);
+ return;
+ }
+
+ LOGGER.debug("drop database {} with location {}", dbName, location);
+
+ // Drop information about the database
+ image.remove(dbName);
+
+ String dbPrefix = dbName + ".";
+
+ // Remove all objects for this database
+ for (Iterator<Map.Entry<String, Collection<String>>> it = image.entrySet().iterator();
+ it.hasNext(); ) {
+ Map.Entry<String, Collection<String>> entry = it.next();
+ String key = entry.getKey();
+ if (key.startsWith(dbPrefix)) {
+ LOGGER.debug("Removing {}", key);
+ it.remove();
+ }
+ }
+ }
+
+ /**
+ * Add mapping for dbName.tableName.
+ */
+ private static void createTable(Map<String, Collection<String>> image, NotificationEvent event,
+ MessageDeserializer deserializer) {
+ SentryJSONCreateTableMessage message = (SentryJSONCreateTableMessage) deserializer
+ .getCreateTableMessage(event.getMessage());
+
+ String dbName = message.getDB();
+ if ((dbName == null) || dbName.isEmpty()) {
+ LOGGER.error("Create table event is missing database name");
+ return;
+ }
+ String tableName = message.getTable();
+ if ((tableName == null) || tableName.isEmpty()) {
+ LOGGER.error("Create table event is missing table name");
+ return;
+ }
+
+ String location = message.getLocation();
+ if ((location == null) || location.isEmpty()) {
+ LOGGER.error("Create table event is missing table location");
+ return;
+ }
+
+ String path = FullUpdateInitializer.pathFromURI(location);
+ if (path == null) {
+ return;
+ }
+
+ String authName = dbName.toLowerCase() + "." + tableName.toLowerCase();
+ // Add new table if it doesn't exist yet
+ if (!image.containsKey(authName)) {
+ LOGGER.debug("create table {} with location {}", authName, location);
+ Set<String> locations = new HashSet<>(1);
+ locations.add(path);
+ image.put(authName.intern(), locations);
+ } else {
+ // Sanity check the information and print warnings if table exists but
+ // with a different location
+ Set<String> oldLocations = (Set<String>)image.get(authName);
+ LOGGER.debug("Table {} already exists, ignored", authName);
+ if (!oldLocations.contains(location)) {
+ LOGGER.warn("Table {} exists but location is different from {}", authName, location);
+ }
+ }
+ }
+
+ /**
+ * Drop mapping from dbName.tableName
+ */
+ private static void dropTable(Map<String, Collection<String>> image, NotificationEvent event,
+ MessageDeserializer deserializer) {
+ SentryJSONDropTableMessage message = (SentryJSONDropTableMessage) deserializer
+ .getDropTableMessage(event.getMessage());
+
+ String dbName = message.getDB();
+ if ((dbName == null) || dbName.isEmpty()) {
+ LOGGER.error("Drop table event is missing database name");
+ return;
+ }
+ String tableName = message.getTable();
+ if ((tableName == null) || tableName.isEmpty()) {
+ LOGGER.error("Drop table event is missing table name");
+ return;
+ }
+
+ String location = message.getLocation();
+ if ((location == null) || location.isEmpty()) {
+ LOGGER.error("Drop table event is missing table location");
+ return;
+ }
+
+ String path = FullUpdateInitializer.pathFromURI(location);
+ if (path == null) {
+ return;
+ }
+
+ String authName = dbName.toLowerCase() + "." + tableName.toLowerCase();
+ Set<String> locations = (Set<String>)image.get(authName);
+ if (locations != null && locations.contains(path)) {
+ LOGGER.debug("Removing {}", authName);
+ image.remove(authName);
+ } else {
+ LOGGER.warn("can't find matching table {} with location {}", authName, location);
+ }
+ }
+
+ /**
+ * ALTER TABLE is a complicated function that can alter multiple things.
+ *
+ * <p>We take care iof the following cases:
+ * <ul>
+ * <li>Change database name. This is the most complicated one.
+ * We need to change the actual database name and change all mappings
+ * that look like "dbName.tableName" to the new dbName</li>
+ * <li>Change table name</li>
+ * <li>Change location</li>
+ * </ul>
+ *
+ */
+ private static void alterTable(Map<String, Collection<String>> image, NotificationEvent event,
+ MessageDeserializer deserializer) {
+ SentryJSONAlterTableMessage message =
+ (SentryJSONAlterTableMessage) deserializer.getAlterTableMessage(event.getMessage());
+ String prevDbName = message.getDB();
+ if ((prevDbName == null) || prevDbName.isEmpty()) {
+ LOGGER.error("Alter table event is missing old database name");
+ return;
+ }
+ prevDbName = prevDbName.toLowerCase();
+ String prevTableName = message.getTable();
+ if ((prevTableName == null) || prevTableName.isEmpty()) {
+ LOGGER.error("Alter table event is missing old table name");
+ return;
+ }
+ prevTableName = prevTableName.toLowerCase();
+
+ String newDbName = event.getDbName();
+ if ((newDbName == null) || newDbName.isEmpty()) {
+ LOGGER.error("Alter table event is missing new database name");
+ return;
+ }
+ newDbName = newDbName.toLowerCase();
+
+ String newTableName = event.getTableName();
+ if ((newTableName == null) || newTableName.isEmpty()) {
+ LOGGER.error("Alter table event is missing new table name");
+ return;
+ }
+ newTableName = newTableName.toLowerCase();
+
+ String prevLocation = message.getOldLocation();
+ if ((prevLocation == null) || prevLocation.isEmpty()) {
+ LOGGER.error("Alter table event is missing old location");
+ return;
+ }
+ String prevPath = FullUpdateInitializer.pathFromURI(prevLocation);
+ if (prevPath == null) {
+ return;
+ }
+
+ String newLocation = message.getNewLocation();
+ if ((newLocation == null) || newLocation.isEmpty()) {
+ LOGGER.error("Alter table event is missing new location");
+ return;
+ }
+ String newPath = FullUpdateInitializer.pathFromURI(newLocation);
+ if (newPath == null) {
+ return;
+ }
+
+ String prevAuthName = prevDbName + "." + prevTableName;
+ String newAuthName = newDbName + "." + newTableName;
+
+ if (!prevDbName.equals(newDbName)) {
+ // Database name change
+ LOGGER.debug("Changing database name: {} -> {}", prevDbName, newDbName);
+ Set<String> locations = (Set<String>)image.get(prevDbName);
+ if (locations != null) {
+ // Rename database if it is not renamed yet
+ if (!image.containsKey(newDbName)) {
+ image.put(newDbName, locations);
+ image.remove(prevDbName);
+ // Walk through all tables and rename DB part of the AUTH name
+ // AUTH name is "dbName.TableName" so we need to replace dbName with the new name
+ String prevDbPrefix = prevDbName + ".";
+ String newDbPrefix = newDbName + ".";
+ renamePrefixKeys(image, prevDbPrefix, newDbPrefix);
+ } else {
+ LOGGER.warn("database {} rename: found existing database {}", prevDbName, newDbName);
+ }
+ } else {
+ LOGGER.debug("database {} not found", prevDbName);
+ }
+ }
+
+ if (!prevAuthName.equals(newAuthName)) {
+ // Either the database name or table name changed, rename objects
+ Set<String> locations = (Set<String>)image.get(prevAuthName);
+ if (locations != null) {
+ // Rename if it is not renamed yet
+ if (!image.containsKey(newAuthName)) {
+ LOGGER.debug("rename {} -> {}", prevAuthName, newAuthName);
+ image.put(newAuthName, locations);
+ image.remove(prevAuthName);
+ } else {
+ LOGGER.warn("auth {} rename: found existing object {}", prevAuthName, newAuthName);
+ }
+ } else {
+ LOGGER.debug("auth {} not found", prevAuthName);
+ }
+ }
+
+ if (!prevPath.equals(newPath)) {
+ LOGGER.debug("Location change: {} -> {}", prevPath, newPath);
+ // Location change
+ Set<String> locations = (Set<String>) image.get(newAuthName);
+ if (locations != null && locations.contains(prevPath) && !locations.contains(newPath)) {
+ locations.remove(prevPath);
+ locations.add(newPath);
+ } else {
+ LOGGER.warn("can not process location change for {}", newAuthName);
+ LOGGER.warn("old locatio = {}, new location = {}", prevPath, newPath);
+ }
+ }
+ }
+
+ /**
+ * Add partition just adds a new location to the existing table.
+ */
+ private static void addPartition(Map<String, Collection<String>> image, NotificationEvent event,
+ MessageDeserializer deserializer) {
+ SentryJSONAddPartitionMessage message =
+ (SentryJSONAddPartitionMessage) deserializer.getAddPartitionMessage(event.getMessage());
+
+ String dbName = message.getDB();
+ if ((dbName == null) || dbName.isEmpty()) {
+ LOGGER.error("Add partition event is missing database name");
+ return;
+ }
+ String tableName = message.getTable();
+ if ((tableName == null) || tableName.isEmpty()) {
+ LOGGER.error("Add partition event for {} is missing table name", dbName);
+ return;
+ }
+
+ String authName = dbName.toLowerCase() + "." + tableName.toLowerCase();
+
+ List<String> locations = message.getLocations();
+ if (locations == null || locations.isEmpty()) {
+ LOGGER.error("Add partition event for {} is missing partition locations", authName);
+ return;
+ }
+
+ Set<String> oldLocations = (Set<String>) image.get(authName);
+ if (oldLocations == null) {
+ LOGGER.warn("Add partition for {}: missing table locations",authName);
+ return;
+ }
+
+ // Add each partition
+ for (String location: locations) {
+ String path = FullUpdateInitializer.pathFromURI(location);
+ if (path != null) {
+ LOGGER.debug("Adding partition {}:{}", authName, path);
+ oldLocations.add(path);
+ }
+ }
+ }
+
+ /**
+ * Drop partition removes location from the existing table.
+ */
+ private static void dropPartition(Map<String, Collection<String>> image, NotificationEvent event,
+ MessageDeserializer deserializer) {
+ SentryJSONDropPartitionMessage message =
+ (SentryJSONDropPartitionMessage) deserializer
+ .getDropPartitionMessage(event.getMessage());
+ String dbName = message.getDB();
+ if ((dbName == null) || dbName.isEmpty()) {
+ LOGGER.error("Drop partition event is missing database name");
+ return;
+ }
+ String tableName = message.getTable();
+ if ((tableName == null) || tableName.isEmpty()) {
+ LOGGER.error("Drop partition event for {} is missing table name", dbName);
+ return;
+ }
+
+ String authName = dbName.toLowerCase() + "." + tableName.toLowerCase();
+
+ List<String> locations = message.getLocations();
+ if (locations == null || locations.isEmpty()) {
+ LOGGER.error("Drop partition event for {} is missing partition locations", authName);
+ return;
+ }
+
+ Set<String> oldLocations = (Set<String>) image.get(authName);
+ if (oldLocations == null) {
+ LOGGER.warn("Add partition for {}: missing table locations",authName);
+ return;
+ }
+
+ // Drop each partition
+ for (String location: locations) {
+ String path = FullUpdateInitializer.pathFromURI(location);
+ if (path != null) {
+ oldLocations.remove(path);
+ }
+ }
+ }
+
+ private static void alterPartition(Map<String, Collection<String>> image, NotificationEvent event,
+ MessageDeserializer deserializer) {
+ SentryJSONAlterPartitionMessage message =
+ (SentryJSONAlterPartitionMessage) deserializer
+ .getAlterPartitionMessage(event.getMessage());
+
+ String dbName = message.getDB();
+ if ((dbName == null) || dbName.isEmpty()) {
+ LOGGER.error("Alter partition event is missing database name");
+ return;
+ }
+ String tableName = message.getTable();
+ if ((tableName == null) || tableName.isEmpty()) {
+ LOGGER.error("Alter partition event for {} is missing table name", dbName);
+ return;
+ }
+
+ String authName = dbName.toLowerCase() + "." + tableName.toLowerCase();
+
+ String prevLocation = message.getOldLocation();
+ if (prevLocation == null || prevLocation.isEmpty()) {
+ LOGGER.error("Alter partition event for {} is missing old location", authName);
+ }
+
+ String prevPath = FullUpdateInitializer.pathFromURI(prevLocation);
+ if (prevPath == null) {
+ return;
+ }
+
+ String newLocation = message.getNewLocation();
+ if (newLocation == null || newLocation.isEmpty()) {
+ LOGGER.error("Alter partition event for {} is missing new location", authName);
+ }
+
+ String newPath = FullUpdateInitializer.pathFromURI(newLocation);
+ if (newPath == null) {
+ return;
+ }
+
+ if (prevPath.equals(newPath)) {
+ LOGGER.warn("Alter partition event for {} has the same old and new path {}",
+ authName, prevPath);
+ return;
+ }
+
+ Set<String> locations = (Set<String>) image.get(authName);
+ if (locations == null) {
+ LOGGER.warn("Missing partition locations for {}", authName);
+ return;
+ }
+
+ // Rename partition
+ if (locations.remove(prevPath)) {
+ LOGGER.debug("Renaming {} to {}", prevPath, newPath);
+ locations.add(newPath);
+ }
+ }
+
+ /**
+ * Walk through the map and rename all instances of oldKey to newKey.
+ */
+ @VisibleForTesting
+ protected static void renamePrefixKeys(Map<String, Collection<String>> image,
+ String oldKey, String newKey) {
+ // The trick is that we can't just iterate through the map, remove old values and
+ // insert new values. While we can remove old values with iterators,
+ // we can't insert new ones while we walk. So we collect the keys to be added in
+ // a new map and merge them in the end.
+ Map<String, Set<String>> replacement = new HashMap<>();
+
+ for (Iterator<Map.Entry<String, Collection<String>>> it = image.entrySet().iterator();
+ it.hasNext(); ) {
+ Map.Entry<String, Collection<String>> entry = it.next();
+ String key = entry.getKey();
+ if (key.startsWith(oldKey)) {
+ String updatedKey = key.replaceAll("^" + oldKey + "(.*)", newKey + "$1");
+ if (!image.containsKey(updatedKey)) {
+ LOGGER.debug("Rename {} to {}", key, updatedKey);
+ replacement.put(updatedKey, (Set<String>) entry.getValue());
+ it.remove();
+ } else {
+ LOGGER.warn("skipping key {} - already present", updatedKey);
+ }
+ }
+ }
+
+ mergeMaps(image, replacement);
+ }
+
+ /**
+ * Merge replacement values into the original map but only if they are not
+ * already there.
+ *
+ * @param m1 source map
+ * @param m2 map with replacement values
+ */
+ private static void mergeMaps(Map<String, Collection<String>> m1, Map<String, Set<String>> m2) {
+ // Merge replacement values into the original map but only if they are not
+ // already there
+ for (Map.Entry<String, Set<String>> entry : m2.entrySet()) {
+ if (!m1.containsKey(entry.getKey())) {
+ m1.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/GSSCallback.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/GSSCallback.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/GSSCallback.java
new file mode 100644
index 0000000..d2d85d3
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/GSSCallback.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.service.thrift;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.sentry.core.common.exception.ConnectionDeniedException;
+import org.apache.sentry.service.common.ServiceConstants.ServerConfig;
+
+public class GSSCallback extends SaslRpcServer.SaslGssCallbackHandler {
+
+ private final Configuration conf;
+ public GSSCallback(Configuration conf) {
+ super();
+ this.conf = conf;
+ }
+
+ boolean comparePrincipals(String principal1, String principal2) {
+ String[] principalParts1 = SaslRpcServer.splitKerberosName(principal1);
+ String[] principalParts2 = SaslRpcServer.splitKerberosName(principal2);
+ if (principalParts1.length == 0 || principalParts2.length == 0) {
+ return false;
+ }
+ if (principalParts1.length == principalParts2.length) {
+ for (int i=0; i < principalParts1.length; i++) {
+ if (!principalParts1[i].equals(principalParts2[i])) {
+ return false;
+ }
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ boolean allowConnect(String principal) {
+ String allowedPrincipals = conf.get(ServerConfig.ALLOW_CONNECT);
+ if (allowedPrincipals == null) {
+ return false;
+ }
+ String principalShortName = getShortName(principal);
+ List<String> items = Arrays.asList(allowedPrincipals.split("\\s*,\\s*"));
+ for (String item : items) {
+ if (comparePrincipals(item, principalShortName)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private String getShortName(String principal) {
+ String parts[] = SaslRpcServer.splitKerberosName(principal);
+ return parts[0];
+ }
+
+ @Override
+ public void handle(Callback[] callbacks)
+ throws UnsupportedCallbackException, ConnectionDeniedException {
+ AuthorizeCallback ac = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof AuthorizeCallback) {
+ ac = (AuthorizeCallback) callback;
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unrecognized SASL GSSAPI Callback");
+ }
+ }
+ if (ac != null) {
+ String authid = ac.getAuthenticationID();
+ String authzid = ac.getAuthorizationID();
+
+ if (allowConnect(authid)) {
+ if (authid.equals(authzid)) {
+ ac.setAuthorized(true);
+ } else {
+ ac.setAuthorized(false);
+ }
+ if (ac.isAuthorized()) {
+ ac.setAuthorizedID(authzid);
+ }
+ } else {
+ throw new ConnectionDeniedException(ac,
+ "Connection to sentry service denied due to lack of client credentials",
+ authid);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java
new file mode 100644
index 0000000..7831430
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+
+/**
+ * AutoCloseable wrapper around HiveMetaStoreClient.
+ * It is only used to provide try-with-resource semantics for
+ * {@link HiveMetaStoreClient}.
+ */
+public class HMSClient implements AutoCloseable {
+ private final HiveMetaStoreClient client;
+ private boolean valid;
+
+ public HMSClient(HiveMetaStoreClient client) {
+ this.client = Preconditions.checkNotNull(client);
+ valid = true;
+ }
+
+ public HiveMetaStoreClient getClient() {
+ return client;
+ }
+
+ public void invalidate() {
+ if (valid) {
+ client.close();
+ valid = false;
+ }
+ }
+
+ @Override
+ public void close() {
+ if (valid) {
+ client.close();
+ valid = false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollowerState.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollowerState.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollowerState.java
new file mode 100644
index 0000000..74268f7
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollowerState.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the License. You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.sentry.service.thrift;
+
+/**
+ * States for the HMSFollower
+ */
+public enum HMSFollowerState implements SentryState {
+ /**
+ * If the HMSFollower has been started or not.
+ */
+ STARTED,
+
+ /**
+ * If the HMSFollower is connected to the HMS
+ */
+ CONNECTED;
+
+ /**
+ * The component name this state is for.
+ */
+ public static final String COMPONENT = "HMSFollower";
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long getValue() {
+ return 1 << this.ordinal();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java
new file mode 100644
index 0000000..62542c3
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import org.apache.hadoop.hive.metastore.api.MetaException;
+
+import java.io.IOException;
+
+public interface HiveConnectionFactory extends AutoCloseable {
+ /**
+ * Open a new connection to HMS.
+ *
+ * @return connection to HMS.
+ * @throws IOException if connection establishement failed.
+ * @throws InterruptedException if connection establishment was interrupted.
+ * @throws MetaException if connection establishement failed.
+ */
+ HMSClient connect() throws IOException, InterruptedException, MetaException;
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java
new file mode 100644
index 0000000..93cc34f
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java
@@ -0,0 +1,211 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ <p>
+ http://www.apache.org/licenses/LICENSE-2.0
+ <p>
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.sentry.hdfs.UniquePathsUpdate;
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class used to fetch Hive MetaStore notifications.
+ */
+public final class HiveNotificationFetcher implements AutoCloseable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(HiveNotificationFetcher.class);
+
+ private final SentryStore sentryStore;
+ private final HiveConnectionFactory hmsConnectionFactory;
+ private HiveMetaStoreClient hmsClient;
+
+ /* The following cache and last filtered ID help us to avoid making less calls to the DB */
+ private long lastIdFiltered = 0;
+ private Set<String> cache = new HashSet<>();
+
+ public HiveNotificationFetcher(SentryStore sentryStore, HiveConnectionFactory hmsConnectionFactory) {
+ this.sentryStore = sentryStore;
+ this.hmsConnectionFactory = hmsConnectionFactory;
+ }
+
+ /**
+ * Fetch new HMS notifications appeared since a specified event ID. The returned list may
+ * include notifications with the same specified ID if they were not seen by Sentry.
+ *
+ * @param lastEventId The event ID to use to request notifications.
+ * @return A list of newer notifications unseen by Sentry.
+ * @throws Exception If an error occurs on the HMS communication.
+ */
+ public List<NotificationEvent> fetchNotifications(long lastEventId) throws Exception {
+ return fetchNotifications(lastEventId, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Fetch new HMS notifications appeared since a specified event ID. The returned list may
+ * include notifications with the same specified ID if they were not seen by Sentry.
+ *
+ * @param lastEventId The event ID to use to request notifications.
+ * @param maxEvents The maximum number of events to fetch.
+ * @return A list of newer notifications unseen by Sentry.
+ * @throws Exception If an error occurs on the HMS communication.
+ */
+ List<NotificationEvent> fetchNotifications(long lastEventId, int maxEvents) throws Exception {
+ NotificationFilter filter = null;
+
+ /*
+ * HMS may bring duplicated events that were committed later than the previous request. To bring
+ * those newer duplicated events, we request new notifications from the last seen ID - 1.
+ *
+ * A current problem is that we could miss duplicates committed much more later, but because
+ * HMS does not guarantee the order of those, then it is safer to avoid processing them.
+ *
+ * TODO: We can avoid doing this once HIVE-16886 is fixed.
+ */
+ if (lastEventId > 0) {
+ filter = createNotificationFilterFor(lastEventId);
+ lastEventId--;
+ }
+
+ LOGGER.debug("Requesting HMS notifications since ID = {}", lastEventId);
+
+ NotificationEventResponse response;
+ try {
+ response = getHmsClient().getNextNotification(lastEventId, maxEvents, filter);
+ } catch (Exception e) {
+ close();
+ throw e;
+ }
+
+ if (response != null && response.isSetEvents()) {
+ LOGGER.debug("Fetched {} new HMS notification(s)", response.getEventsSize());
+ return response.getEvents();
+ }
+
+ return Collections.emptyList();
+ }
+
+ /**
+ * Returns a HMS notification filter for a specific notification ID. HMS notifications may
+ * have duplicated IDs, so the filter uses a SHA-1 hash to check for a unique notification.
+ *
+ * @param id the notification ID to filter
+ * @return the HMS notification filter
+ */
+ private NotificationFilter createNotificationFilterFor(final long id) {
+ /*
+ * A SHA-1 hex value that keeps unique notifications processed is persisted on the Sentry DB.
+ * To keep unnecessary calls to the DB, we use a cache that keeps seen hashes of the
+ * specified ID. If a new filter ID is used, then we clean up the cache.
+ */
+
+ if (lastIdFiltered != id) {
+ lastIdFiltered = id;
+ cache.clear();
+ }
+
+ return new NotificationFilter() {
+ @Override
+ public boolean accept(NotificationEvent notificationEvent) {
+ if (notificationEvent.getEventId() == id) {
+ String hash = UniquePathsUpdate.sha1(notificationEvent);
+
+ try {
+ if (cache.contains(hash) || sentryStore.isNotificationProcessed(hash)) {
+ cache.add(hash);
+
+ LOGGER.debug("Ignoring HMS notification already processed: ID = {}", id);
+ return false;
+ }
+ } catch (Exception e) {
+ LOGGER.error("An error occurred while checking if notification {} is already "
+ + "processed: {}", id, e.getMessage());
+
+ // We cannot throw an exception on this filter, so we return false assuming this
+ // notification is already processed
+ return false;
+ }
+ }
+
+ return true;
+ }
+ };
+ }
+
+ /**
+ * Gets the HMS client connection object.
+ * If will create a new connection if no connection object exists.
+ *
+ * @return The HMS client used to communication with the Hive MetaStore.
+ * @throws Exception If it cannot connect to the HMS service.
+ */
+ private HiveMetaStoreClient getHmsClient() throws Exception {
+ if (hmsClient == null) {
+ try {
+ hmsClient = hmsConnectionFactory.connect().getClient();
+ } catch (Exception e) {
+ LOGGER.error("Fail to connect to the HMS service: {}", e.getMessage());
+ throw e;
+ }
+ }
+
+ return hmsClient;
+ }
+
+ /**
+ * @return the latest notification Id logged by the HMS
+ * @throws Exception when an error occurs when talking to the HMS client
+ */
+ public long getCurrentNotificationId() throws Exception {
+ CurrentNotificationEventId eventId;
+ try {
+ eventId = getHmsClient().getCurrentNotificationEventId();
+ } catch (Exception e) {
+ close();
+ throw e;
+ }
+
+ if (eventId != null && eventId.isSetEventId()) {
+ return eventId.getEventId();
+ }
+
+ return SentryStore.EMPTY_NOTIFICATION_ID;
+ }
+
+ /* AutoCloseable implementations */
+
+ @Override
+ public void close() {
+ try {
+ if (hmsClient != null) {
+ hmsClient.close();
+ }
+
+ cache.clear();
+ } finally {
+ hmsClient = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java
new file mode 100644
index 0000000..31e58fd
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import com.google.common.base.Preconditions;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import javax.security.auth.login.LoginException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.sentry.service.common.ServiceConstants.ServerConfig;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Factory used to generate Hive connections.
+ * Supports insecure and Kerberos connections.
+ * For Kerberos connections starts the ticket renewal thread and sets
+ * up Kerberos credentials.
+ */
+public final class HiveSimpleConnectionFactory implements HiveConnectionFactory {
+ private static final Logger LOGGER = LoggerFactory.getLogger(HiveSimpleConnectionFactory.class);
+
+ /** Sentty configuration */
+ private final Configuration conf;
+ /** Hive configuration */
+ private final HiveConf hiveConf;
+ private final boolean insecure;
+ private SentryKerberosContext kerberosContext = null;
+
+ public HiveSimpleConnectionFactory(Configuration sentryConf, HiveConf hiveConf) {
+ this.conf = sentryConf;
+ this.hiveConf = hiveConf;
+ insecure = !ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase(
+ sentryConf.get(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_NONE).trim());
+ }
+
+ /**
+ * Initialize the Factory.
+ * For insecure connections there is nothing to initialize.
+ * For Kerberos connections sets up ticket renewal thread.
+ * @throws IOException
+ * @throws LoginException
+ */
+ public void init() throws IOException, LoginException {
+ if (insecure) {
+ LOGGER.info("Using insecure connection to HMS");
+ return;
+ }
+
+ int port = conf.getInt(ServerConfig.RPC_PORT, ServerConfig.RPC_PORT_DEFAULT);
+ String rawPrincipal = Preconditions.checkNotNull(conf.get(ServerConfig.PRINCIPAL),
+ "%s is required", ServerConfig.PRINCIPAL);
+ String principal = SecurityUtil.getServerPrincipal(rawPrincipal, NetUtils.createSocketAddr(
+ conf.get(ServerConfig.RPC_ADDRESS, ServerConfig.RPC_ADDRESS_DEFAULT),
+ port).getAddress());
+ LOGGER.debug("Opening kerberos connection to HMS using kerberos principal {}", principal);
+ String[] principalParts = SaslRpcServer.splitKerberosName(principal);
+ Preconditions.checkArgument(principalParts.length == 3,
+ "Kerberos principal %s should have 3 parts", principal);
+ String keytab = Preconditions.checkNotNull(conf.get(ServerConfig.KEY_TAB),
+ "Configuration is missing required %s paraeter", ServerConfig.KEY_TAB);
+ File keytabFile = new File(keytab);
+ Preconditions.checkState(keytabFile.isFile() && keytabFile.canRead(),
+ "Keytab %s does not exist or is not readable", keytab);
+ // Instantiating SentryKerberosContext in non-server mode handles the ticket renewal.
+ kerberosContext = new SentryKerberosContext(principal, keytab, false);
+ UserGroupInformation.setConfiguration(conf);
+ LOGGER.info("Using secure connection to HMS");
+ }
+
+ /**
+ * Connect to HMS in unsecure mode or in Kerberos mode according to config.
+ *
+ * @return HMS connection
+ * @throws IOException if could not establish connection
+ * @throws InterruptedException if connection was interrupted
+ * @throws MetaException if other errors happened
+ */
+ public HMSClient connect() throws IOException, InterruptedException, MetaException {
+ if (insecure) {
+ return new HMSClient(new HiveMetaStoreClient(hiveConf));
+ }
+ UserGroupInformation clientUGI =
+ UserGroupInformation.getUGIFromSubject(kerberosContext.getSubject());
+ return new HMSClient(clientUGI.doAs(new PrivilegedExceptionAction<HiveMetaStoreClient>() {
+ @Override
+ public HiveMetaStoreClient run() throws MetaException {
+ return new HiveMetaStoreClient(hiveConf);
+ }
+ }));
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (kerberosContext != null) {
+ kerberosContext.shutDown();
+ kerberosContext = null;
+ }
+ }
+}