You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by sp...@apache.org on 2018/05/29 18:06:54 UTC
[35/43] sentry git commit: SENTRY-2208: Refactor out Sentry service
into own module from sentry-provider-db (Anthony Young-Garner,
reviewed by Sergio Pena, Steve Moist, Na Li)
http://git-wip-us.apache.org/repos/asf/sentry/blob/b97f5c7a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStoreSchemaInfo.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStoreSchemaInfo.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStoreSchemaInfo.java
deleted file mode 100644
index ce76a46..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStoreSchemaInfo.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.provider.db.service.persistent;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.sentry.core.common.exception.SentryUserException;
-
-public class SentryStoreSchemaInfo {
- private static final String SQL_FILE_EXTENSION = ".sql";
- private static final String UPGRADE_FILE_PREFIX = "upgrade-";
- private static final String INIT_FILE_PREFIX = "sentry-";
- private static final String VERSION_UPGRADE_LIST = "upgrade.order";
- private final String dbType;
- private final String sentrySchemaVersions[];
- private final String sentryScriptDir;
-
- private static final String SENTRY_VERSION = "2.1.0";
-
- public SentryStoreSchemaInfo(String sentryScriptDir, String dbType)
- throws SentryUserException {
- this.sentryScriptDir = sentryScriptDir;
- this.dbType = dbType;
- // load upgrade order for the given dbType
- List<String> upgradeOrderList = new ArrayList<String>();
- String upgradeListFile = getSentryStoreScriptDir() + File.separator
- + VERSION_UPGRADE_LIST + "." + dbType;
- try (BufferedReader bfReader = new BufferedReader(new FileReader(upgradeListFile))) {
- String currSchemaVersion;
- while ((currSchemaVersion = bfReader.readLine()) != null) {
- upgradeOrderList.add(currSchemaVersion.trim());
- }
- } catch (FileNotFoundException e) {
- throw new SentryUserException("File " + upgradeListFile + " not found ", e);
- } catch (IOException e) {
- throw new SentryUserException("Error reading " + upgradeListFile, e);
- }
- sentrySchemaVersions = upgradeOrderList.toArray(new String[0]);
- }
-
- public String getSentrySchemaVersion() {
- return SENTRY_VERSION;
- }
-
- public List<String> getUpgradeScripts(String fromSchemaVer)
- throws SentryUserException {
- List<String> upgradeScriptList = new ArrayList<String>();
-
- // check if we are already at current schema level
- if (getSentryVersion().equals(fromSchemaVer)) {
- return upgradeScriptList;
- }
-
- // Find the list of scripts to execute for this upgrade
- int firstScript = sentrySchemaVersions.length;
- for (int i = 0; i < sentrySchemaVersions.length; i++) {
- String fromVersion = sentrySchemaVersions[i].split("-to-")[0];
- if (fromVersion.equals(fromSchemaVer)) {
- firstScript = i;
- break;
- }
- }
- if (firstScript == sentrySchemaVersions.length) {
- throw new SentryUserException("Unknown version specified for upgrade "
- + fromSchemaVer + " Metastore schema may be too old or newer");
- }
-
- for (int i = firstScript; i < sentrySchemaVersions.length; i++) {
- String scriptFile = generateUpgradeFileName(sentrySchemaVersions[i]);
- upgradeScriptList.add(scriptFile);
- }
- return upgradeScriptList;
- }
-
- /***
- * Get the name of the script to initialize the schema for given version
- *
- * @param toVersion
- * Target version. If it's null, then the current server version is
- * used
- * @return
- * @throws SentryUserException
- */
- public String generateInitFileName(String toVersion)
- throws SentryUserException {
- String version = toVersion;
- if (version == null) {
- version = getSentryVersion();
- }
- String initScriptName = INIT_FILE_PREFIX + dbType + "-" + version
- + SQL_FILE_EXTENSION;
- // check if the file exists
- if (!(new File(getSentryStoreScriptDir() + File.separatorChar
- + initScriptName).exists())) {
- throw new SentryUserException(
- "Unknown version specified for initialization: " + version);
- }
- return initScriptName;
- }
-
- /**
- * Find the directory of sentry store scripts
- *
- * @return
- */
- public String getSentryStoreScriptDir() {
- return sentryScriptDir;
- }
-
- // format the upgrade script name eg upgrade-x-y-dbType.sql
- private String generateUpgradeFileName(String fileVersion) {
- return INIT_FILE_PREFIX + UPGRADE_FILE_PREFIX + dbType + "-"
- + fileVersion + SQL_FILE_EXTENSION;
- }
-
- // Current hive version, in majorVersion.minorVersion.changeVersion format
- // TODO: store the version using the build script
- public static String getSentryVersion() {
- return SENTRY_VERSION;
- }
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/b97f5c7a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionBlock.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionBlock.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionBlock.java
deleted file mode 100644
index 6ad52a3..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionBlock.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.provider.db.service.persistent;
-
-import javax.jdo.PersistenceManager;
-
-/**
- * TransactionBlock wraps the code that is executed inside a single
- * transaction. The {@link #execute(PersistenceManager)} method returns the
- * result of the transaction.
- */
-@FunctionalInterface
-public interface TransactionBlock<T> {
- /**
- * Execute some code as a single transaction, the code should not start new
- * transaction or manipulate transactions with the PersistenceManager.
- *
- * @param pm PersistenceManager for the current transaction
- * @return Object with the result of execute()
- * @throws Exception
- */
- T execute(PersistenceManager pm) throws Exception;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sentry/blob/b97f5c7a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java
deleted file mode 100644
index ba6e845..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.provider.db.service.persistent;
-
-import com.codahale.metrics.Counter;
-import static com.codahale.metrics.MetricRegistry.name;
-import com.codahale.metrics.Timer;
-
-import com.codahale.metrics.Timer.Context;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sentry.core.common.exception.SentryUserException;
-import org.apache.sentry.service.common.ServiceConstants.ServerConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jdo.PersistenceManager;
-import javax.jdo.PersistenceManagerFactory;
-import javax.jdo.Transaction;
-
-import org.apache.sentry.api.service.thrift.SentryMetrics;
-
-import java.util.Random;
-import java.util.concurrent.Callable;
-
-/**
- * TransactionManager is used for executing the database transaction, it supports
- * the transaction with retry mechanism for the unexpected exceptions,
- * except <em>SentryUserExceptions</em>, eg, <em>SentryNoSuchObjectException</em>,
- * <em>SentryAlreadyExistsException</em> etc. For <em>SentryUserExceptions</em>,
- * will simply throw the exception without retry<p>
- *
- * The purpose of the class is to separate all transaction housekeeping (opening
- * transaction, rolling back failed transactions) from the actual transaction
- * business logic.<p>
- *
- * TransactionManager creates an instance of PersistenceManager for each
- * transaction.<p>
- *
- * TransactionManager exposes several metrics:
- * <ul>
- * <li>Timer metric for all transactions</li>
- * <li>Counter for failed transactions</li>
- * <li>Counter for each exception thrown by transaction</li>
- * </ul>
- */
-@SuppressWarnings("NestedTryStatement")
-public final class TransactionManager {
-
- private static final Logger LOGGER =
- LoggerFactory.getLogger(TransactionManager.class);
-
- /** Random number generator for exponential backoff */
- private static final Random random = new Random();
-
- private final PersistenceManagerFactory pmf;
-
- // Maximum number of retries per call
- private final int transactionRetryMax;
-
- // Delay (in milliseconds) between retries
- private final int retryWaitTimeMills;
-
- /** Name for metrics */
- private static final String TRANSACTIONS = "transactions";
-
- // Transaction timer measures time distribution for all transactions
- private final Timer transactionTimer =
- SentryMetrics.getInstance().
- getTimer(name(TransactionManager.class,
- TRANSACTIONS));
-
- // Counter for failed transactions
- private final Counter failedTransactionsCount =
- SentryMetrics.getInstance().
- getCounter(name(TransactionManager.class,
- TRANSACTIONS, "failed"));
-
- private final Counter retryCount =
- SentryMetrics.getInstance().getCounter(name(TransactionManager.class,
- TRANSACTIONS, "retry"));
-
- TransactionManager(PersistenceManagerFactory pmf, Configuration conf) {
- this.pmf = pmf;
- transactionRetryMax = conf.getInt(
- ServerConfig.SENTRY_STORE_TRANSACTION_RETRY,
- ServerConfig.SENTRY_STORE_TRANSACTION_RETRY_DEFAULT);
- retryWaitTimeMills = conf.getInt(
- ServerConfig.SENTRY_STORE_TRANSACTION_RETRY_WAIT_TIME_MILLIS,
- ServerConfig.SENTRY_STORE_TRANSACTION_RETRY_WAIT_TIME_MILLIS_DEFAULT);
- }
-
-
- /**
- * Execute some code as a single transaction, the code in tb.execute()
- * should not start new transaction or manipulate transactions with the
- * PersistenceManager.
- *
- * @param tb transaction block with code to be executed
- * @return Object with the result of tb.execute()
- */
- public <T> T executeTransaction(TransactionBlock<T> tb) throws Exception {
- try (Context context = transactionTimer.time();
- PersistenceManager pm = pmf.getPersistenceManager()) {
- Transaction transaction = pm.currentTransaction();
- transaction.begin();
- try {
- T result = tb.execute(pm);
- transaction.commit();
- return result;
- } catch (Exception e) {
- // Count total failed transactions
- failedTransactionsCount.inc();
- // Count specific exceptions
- SentryMetrics.getInstance().getCounter(name(TransactionManager.class,
- "exception", e.getClass().getSimpleName())).inc();
- // Re-throw the exception
- throw e;
- } finally {
- if (transaction.isActive()) {
- transaction.rollback();
- }
- }
- }
- }
-
- /**
- * Execute a list of TransactionBlock code as a single transaction.
- * The code in tb.execute() should not start new transaction or
- * manipulate transactions with the PersistenceManager. It returns
- * the result of the last transaction block execution.
- *
- * @param tbs transaction blocks with code to be executed
- * @return the result of the last result of tb.execute()
- */
- private <T> T executeTransaction(Iterable<TransactionBlock<T>> tbs) throws Exception {
- try (Context context = transactionTimer.time();
- PersistenceManager pm = pmf.getPersistenceManager()) {
- Transaction transaction = pm.currentTransaction();
- transaction.begin();
- try {
- T result = null;
- for (TransactionBlock<T> tb : tbs) {
- result = tb.execute(pm);
- }
- transaction.commit();
- return result;
- } catch (Exception e) {
- // Count total failed transactions
- failedTransactionsCount.inc();
- // Count specific exceptions
- SentryMetrics.getInstance().getCounter(name(TransactionManager.class,
- "exception", e.getClass().getSimpleName())).inc();
- // Re-throw the exception
- throw e;
- } finally {
- if (transaction.isActive()) {
- transaction.rollback();
- }
- }
- }
- }
-
- /**
- * Execute some code as a single transaction with retry mechanism.
- *
- * @param tb transaction block with code to execute
- * @return Object with the result of tb.execute()
- */
- @SuppressWarnings("squid:S00112")
- public <T> T executeTransactionWithRetry(final TransactionBlock<T> tb)
- throws Exception {
- return new ExponentialBackoff().execute(
- new Callable<T>() {
- @Override
- public T call() throws Exception {
- return executeTransaction(tb);
- }
- }
- );
- }
-
- /**
- * Execute a list of TransactionBlock code as a single transaction.
- * If any of the TransactionBlock fail, all the TransactionBlocks would
- * retry. It returns the result of the last transaction block
- * execution.
- *
- * @param tbs a list of transaction blocks with code to be executed.
- */
- @SuppressWarnings("squid:S00112")
- <T> void executeTransactionBlocksWithRetry(final Iterable<TransactionBlock<T>> tbs)
- throws Exception {
- new ExponentialBackoff().execute(
- new Callable<T>() {
- @Override
- public T call() throws Exception {
- return executeTransaction(tbs);
- }
- }
- );
- }
-
- /**
- * Implementation of exponential backoff with random fuzziness.
- * On each iteration the backoff time is 1.5 the previous amount plus the
- * random fuzziness factor which is up to half of the previous amount.
- */
- private class ExponentialBackoff {
-
- @SuppressWarnings("squid:S00112")
- <T> T execute(Callable<T> arg) throws Exception {
- Exception ex = null;
- long sleepTime = retryWaitTimeMills;
-
- for (int retryNum = 1; retryNum <= transactionRetryMax; retryNum++) {
- try {
- return arg.call();
- } catch (SentryUserException e) {
- // throw the sentry exception without retry
- LOGGER.warn("Transaction manager encountered non-retriable exception", e);
- throw e;
- } catch (Exception e) {
- ex = e;
- retryCount.inc();
- LOGGER.warn("Transaction execution encountered exception", e);
- LOGGER.warn("Retrying transaction {}/{} times",
- retryNum, transactionRetryMax);
- // Introduce some randomness in the backoff time.
- LOGGER.warn("Sleeping for {} milliseconds before retrying", sleepTime);
- Thread.sleep(sleepTime);
- int fuzz = random.nextInt((int)sleepTime / 2);
- sleepTime *= 3;
- sleepTime /= 2;
- sleepTime += fuzz;
- }
- }
- assert(ex != null);
- String message = "The transaction has reached max retry number, "
- + ex.getMessage();
- LOGGER.error(message, ex);
- throw new Exception(message, ex);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/b97f5c7a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java
deleted file mode 100644
index 992d8ab..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java
+++ /dev/null
@@ -1,524 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.service.thrift;
-
-import com.codahale.metrics.Counter;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.sentry.hdfs.PathsUpdate;
-import org.apache.sentry.hdfs.SentryMalformedPathException;
-import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
-import org.apache.sentry.api.service.thrift.SentryMetrics;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import static com.codahale.metrics.MetricRegistry.name;
-
-/**
- * Manage fetching full snapshot from HMS.
- * Snapshot is represented as a map from the hive object name to
- * the set of paths for this object.
- * The hive object name is either the Hive database name or
- * Hive database name joined with Hive table name as {@code dbName.tableName}.
- * All table partitions are stored under the table object.
- * <p>
- * Once {@link FullUpdateInitializer}, the {@link FullUpdateInitializer#getFullHMSSnapshot()}
- * method should be called to get the initial update.
- * <p>
- * It is important to close the {@link FullUpdateInitializer} object to prevent resource
- * leaks.
- * <p>
- * The usual way of using {@link FullUpdateInitializer} is
- * <pre>
- * {@code
- * try (FullUpdateInitializer updateInitializer =
- * new FullUpdateInitializer(clientFactory, authzConf)) {
- * Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot();
- * return pathsUpdate;
- * }
- */
-public final class FullUpdateInitializer implements AutoCloseable {
-
- /*
- * Implementation note.
- *
- * The snapshot is obtained using an executor. We follow the map/reduce model.
- * Each executor thread (mapper) obtains and returns a partial snapshot which are then
- * reduced to a single combined snapshot by getFullHMSSnapshot().
- *
- * Synchronization between the getFullHMSSnapshot() and executors is done using the
- * 'results' queue. The queue holds the futures for each scheduled task.
- * It is initially populated by getFullHMSSnapshot and each task may add new future
- * results to it. Only getFullHMSSnapshot() removes entries from the results queue.
- * This guarantees that once the results queue is empty there are no pending jobs.
- *
- * Since there are no other data sharing, the implementation is safe without
- * any other synchronization. It is not thread-safe for concurrent calls
- * to getFullHMSSnapshot().
- *
- */
-
-
- private static final String FULL_UPDATE_INITIALIZER_THREAD_NAME = "hms-fetch-%d";
- private final ExecutorService threadPool;
- private final int maxPartitionsPerCall;
- private final int maxTablesPerCall;
- private final Deque<Future<CallResult>> results = new ConcurrentLinkedDeque<>();
- private final int maxRetries;
- private final int waitDurationMillis;
-
- private static final Logger LOGGER = LoggerFactory.getLogger(FullUpdateInitializer.class);
-
- private static final ObjectMapping emptyObjectMapping =
- new ObjectMapping(Collections.<String, Set<String>>emptyMap());
- private final HiveConnectionFactory clientFactory;
-
- /** Total number of database objects */
- private final Counter databaseCount = SentryMetrics.getInstance()
- .getCounter(name(FullUpdateInitializer.class, "total", "db"));
-
- /** Total number of table objects */
- private final Counter tableCount = SentryMetrics.getInstance()
- .getCounter(name(FullUpdateInitializer.class, "total", "tables"));
-
- /** Total number of partition objects */
- private final Counter partitionCount = SentryMetrics.getInstance()
- .getCounter(name(FullUpdateInitializer.class, "total", "partitions"));
-
- /**
- * Extract path (not starting with "/") from the full URI
- * @param uri - resource URI (usually with scheme)
- * @return path if uri is valid or null
- */
- static String pathFromURI(String uri) {
- try {
- return PathsUpdate.parsePath(uri);
- } catch (SentryMalformedPathException e) {
- LOGGER.warn(String.format("Ignoring invalid uri %s: %s",
- uri, e.getReason()));
- return null;
- }
- }
-
- /**
- * Mapping of object to set of paths.
- * Used to represent partial results from executor threads. Multiple
- * ObjectMapping objects are combined in a single mapping
- * to get the final result.
- */
- private static final class ObjectMapping {
- private final Map<String, Set<String>> objects;
-
- ObjectMapping(Map<String, Set<String>> objects) {
- this.objects = objects;
- }
-
- ObjectMapping(String authObject, String path) {
- Set<String> values = Collections.singleton(safeIntern(path));
- objects = ImmutableMap.of(authObject, values);
- }
-
- ObjectMapping(String authObject, Collection<String> paths) {
- Set<String> values = new HashSet<>(paths);
- objects = ImmutableMap.of(authObject, values);
- }
-
- Map<String, Set<String>> getObjects() {
- return objects;
- }
- }
-
- private static final class CallResult {
- private final Exception failure;
- private final boolean successStatus;
- private final ObjectMapping objectMapping;
-
- CallResult(Exception ex) {
- failure = ex;
- successStatus = false;
- objectMapping = emptyObjectMapping;
- }
-
- CallResult(ObjectMapping objectMapping) {
- failure = null;
- successStatus = true;
- this.objectMapping = objectMapping;
- }
-
- boolean success() {
- return successStatus;
- }
-
- ObjectMapping getObjectMapping() {
- return objectMapping;
- }
-
- public Exception getFailure() {
- return failure;
- }
- }
-
- private abstract class BaseTask implements Callable<CallResult> {
-
- /**
- * Class represents retry strategy for BaseTask.
- */
- private final class RetryStrategy {
- private int retryStrategyMaxRetries = 0;
- private final int retryStrategyWaitDurationMillis;
-
- private RetryStrategy(int retryStrategyMaxRetries, int retryStrategyWaitDurationMillis) {
- this.retryStrategyMaxRetries = retryStrategyMaxRetries;
-
- // Assign default wait duration if negative value is provided.
- this.retryStrategyWaitDurationMillis = (retryStrategyWaitDurationMillis > 0) ?
- retryStrategyWaitDurationMillis : 1000;
- }
-
- @SuppressWarnings({"squid:S1141", "squid:S2142"})
- public CallResult exec() {
- // Retry logic is happening inside callable/task to avoid
- // synchronous waiting on getting the result.
- // Retry the failure task until reach the max retry number.
- // Wait configurable duration for next retry.
- //
- // Only thrift exceptions are retried.
- // Other exceptions are propagated up the stack.
- Exception exception = null;
- try {
- // We catch all exceptions except Thrift exceptions which are retried
- for (int i = 0; i < retryStrategyMaxRetries; i++) {
- //noinspection NestedTryStatement
- try {
- return new CallResult(doTask());
- } catch (TException ex) {
- LOGGER.debug("Failed to execute task on " + (i + 1) + " attempts." +
- " Sleeping for " + retryStrategyWaitDurationMillis + " ms. Exception: " +
- ex.toString(), ex);
- exception = ex;
-
- try {
- Thread.sleep(retryStrategyWaitDurationMillis);
- } catch (InterruptedException ignored) {
- // Skip the rest retries if get InterruptedException.
- // And set the corresponding retries number.
- LOGGER.warn("Interrupted during update fetch during iteration " + (i + 1));
- break;
- }
- }
- }
- } catch (Exception ex) {
- exception = ex;
- }
- LOGGER.error("Failed to execute task", exception);
- // We will fail in the end, so we are shutting down the pool to prevent
- // new tasks from being scheduled.
- threadPool.shutdown();
- return new CallResult(exception);
- }
- }
-
- private final RetryStrategy retryStrategy;
-
- BaseTask() {
- retryStrategy = new RetryStrategy(maxRetries, waitDurationMillis);
- }
-
- @Override
- public CallResult call() throws Exception {
- return retryStrategy.exec();
- }
-
- abstract ObjectMapping doTask() throws Exception;
- }
-
- private class PartitionTask extends BaseTask {
- private final String dbName;
- private final String tblName;
- private final String authName;
- private final List<String> partNames;
-
- PartitionTask(String dbName, String tblName, String authName,
- List<String> partNames) {
- this.dbName = safeIntern(dbName);
- this.tblName = safeIntern(tblName);
- this.authName = safeIntern(authName);
- this.partNames = partNames;
- }
-
- @Override
- ObjectMapping doTask() throws Exception {
- List<Partition> tblParts;
- HMSClient c = null;
- try (HMSClient client = clientFactory.connect()) {
- c = client;
- tblParts = client.getClient().getPartitionsByNames(dbName, tblName, partNames);
- } catch (Exception e) {
- if (c != null) {
- c.invalidate();
- }
- throw e;
- }
-
- LOGGER.debug("Fetched partitions for db = {}, table = {}",
- dbName, tblName);
-
- Collection<String> partitionNames = new ArrayList<>(tblParts.size());
- for (Partition part : tblParts) {
- String partPath = pathFromURI(part.getSd().getLocation());
- if (partPath != null) {
- partitionNames.add(partPath.intern());
- }
- }
- return new ObjectMapping(authName, partitionNames);
- }
- }
-
- private class TableTask extends BaseTask {
- private final String dbName;
- private final List<String> tableNames;
-
- TableTask(Database db, List<String> tableNames) {
- dbName = safeIntern(db.getName());
- this.tableNames = tableNames;
- }
-
- @Override
- @SuppressWarnings({"squid:S2629", "squid:S135"})
- ObjectMapping doTask() throws Exception {
- HMSClient c = null;
- try (HMSClient client = clientFactory.connect()) {
- c = client;
- List<Table> tables = client.getClient().getTableObjectsByName(dbName, tableNames);
-
- LOGGER.debug("Fetching tables for db = {}, tables = {}", dbName, tableNames);
-
- Map<String, Set<String>> objectMapping = new HashMap<>(tables.size());
- for (Table tbl : tables) {
- // Table names are case insensitive
- if (!tbl.getDbName().equalsIgnoreCase(dbName)) {
- // Inconsistency in HMS data
- LOGGER.warn(String.format("DB name %s for table %s does not match %s",
- tbl.getDbName(), tbl.getTableName(), dbName));
- continue;
- }
-
- String tableName = safeIntern(tbl.getTableName().toLowerCase());
- String authzObject = (dbName + "." + tableName).intern();
- List<String> tblPartNames =
- client.getClient().listPartitionNames(dbName, tableName, (short) -1);
- // Count total number of partitions
- partitionCount.inc(tblPartNames.size());
- for (int i = 0; i < tblPartNames.size(); i += maxPartitionsPerCall) {
- List<String> partsToFetch = tblPartNames.subList(i,
- Math.min(i + maxPartitionsPerCall, tblPartNames.size()));
- Callable<CallResult> partTask = new PartitionTask(dbName,
- tableName, authzObject, partsToFetch);
- results.add(threadPool.submit(partTask));
- }
- String tblPath = safeIntern(pathFromURI(tbl.getSd().getLocation()));
- if (tblPath == null) {
- continue;
- }
- Set<String> paths = objectMapping.get(authzObject);
- if (paths == null) {
- paths = new HashSet<>(1);
- objectMapping.put(authzObject, paths);
- }
- paths.add(tblPath);
- }
- return new ObjectMapping(Collections.unmodifiableMap(objectMapping));
- } catch (Exception e) {
- if (c != null) {
- c.invalidate();
- }
- throw e;
- }
- }
- }
-
- private class DbTask extends BaseTask {
-
- private final String dbName;
-
- DbTask(String dbName) {
- //Database names are case insensitive
- this.dbName = safeIntern(dbName.toLowerCase());
- databaseCount.inc();
- }
-
- @Override
- ObjectMapping doTask() throws Exception {
- HMSClient c = null;
- try (HMSClient client = clientFactory.connect()) {
- c = client;
- Database db = client.getClient().getDatabase(dbName);
- if (!dbName.equalsIgnoreCase(db.getName())) {
- LOGGER.warn("Database name {} does not match {}", db.getName(), dbName);
- return emptyObjectMapping;
- }
- List<String> allTblStr = client.getClient().getAllTables(dbName);
- // Count total number of tables
- tableCount.inc(allTblStr.size());
- for (int i = 0; i < allTblStr.size(); i += maxTablesPerCall) {
- List<String> tablesToFetch = allTblStr.subList(i,
- Math.min(i + maxTablesPerCall, allTblStr.size()));
- Callable<CallResult> tableTask = new TableTask(db, tablesToFetch);
- results.add(threadPool.submit(tableTask));
- }
- String dbPath = safeIntern(pathFromURI(db.getLocationUri()));
- return (dbPath != null) ? new ObjectMapping(dbName, dbPath) :
- emptyObjectMapping;
- } catch (Exception e) {
- if (c != null) {
- c.invalidate();
- }
- throw e;
- }
- }
- }
-
- FullUpdateInitializer(HiveConnectionFactory clientFactory, Configuration conf) {
- this.clientFactory = clientFactory;
- maxPartitionsPerCall = conf.getInt(
- ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC,
- ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC_DEFAULT);
- maxTablesPerCall = conf.getInt(
- ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC,
- ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC_DEFAULT);
- maxRetries = conf.getInt(
- ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM,
- ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM_DEFAULT);
- waitDurationMillis = conf.getInt(
- ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS,
- ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS_DEFAULT);
-
- ThreadFactory fullUpdateInitThreadFactory = new ThreadFactoryBuilder()
- .setNameFormat(FULL_UPDATE_INITIALIZER_THREAD_NAME)
- .setDaemon(false)
- .build();
- threadPool = Executors.newFixedThreadPool(conf.getInt(
- ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS,
- ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS_DEFAULT),
- fullUpdateInitThreadFactory);
- }
-
- /**
- * Get Full HMS snapshot.
- * @return Full snapshot of HMS objects.
- * @throws TException if Thrift error occured
- * @throws ExecutionException if there was a scheduling error
- * @throws InterruptedException if processing was interrupted
- */
- @SuppressWarnings("squid:S00112")
- Map<String, Collection<String>> getFullHMSSnapshot() throws Exception {
- // Get list of all HMS databases
- List<String> allDbStr;
- HMSClient c = null;
- try (HMSClient client = clientFactory.connect()) {
- c = client;
- allDbStr = client.getClient().getAllDatabases();
- } catch (Exception e) {
- if (c != null) {
- c.invalidate();
- }
- throw e;
- }
-
- // Schedule async task for each database responsible for fetching per-database
- // objects.
- for (String dbName : allDbStr) {
- results.add(threadPool.submit(new DbTask(dbName)));
- }
-
- // Resulting full snapshot
- Map<String, Collection<String>> fullSnapshot = new HashMap<>();
-
- // As async tasks complete, merge their results into full snapshot.
- while (!results.isEmpty()) {
- // This is the only thread that takes elements off the results list - all other threads
- // only add to it. Once the list is empty it can't become non-empty
- // This means that if we check that results is non-empty we can safely call pop() and
- // know that the result of poll() is not null.
- Future<CallResult> result = results.pop();
- // Wait for the task to complete
- CallResult callResult = result.get();
- // Fail if we got errors
- if (!callResult.success()) {
- throw callResult.getFailure();
- }
- // Merge values into fullUpdate
- Map<String, Set<String>> objectMapping =
- callResult.getObjectMapping().getObjects();
- for (Map.Entry<String, Set<String>> entry: objectMapping.entrySet()) {
- String key = entry.getKey();
- Set<String> val = entry.getValue();
- Set<String> existingSet = (Set<String>)fullSnapshot.get(key);
- if (existingSet == null) {
- fullSnapshot.put(key, val);
- continue;
- }
- existingSet.addAll(val);
- }
- }
- return fullSnapshot;
- }
-
- @Override
- public void close() {
- threadPool.shutdownNow();
- try {
- threadPool.awaitTermination(1, TimeUnit.SECONDS);
- } catch (InterruptedException ignored) {
- LOGGER.warn("Interrupted shutdown");
- Thread.currentThread().interrupt();
- }
- }
-
- /**
- * Intern a string but only if it is not null
- * @param arg String to be interned, may be null
- * @return interned string or null
- */
- static String safeIntern(String arg) {
- return (arg != null) ? arg.intern() : null;
- }
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/b97f5c7a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateModifier.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateModifier.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateModifier.java
deleted file mode 100644
index c30d982..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateModifier.java
+++ /dev/null
@@ -1,606 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
-
-package org.apache.sentry.service.thrift;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
-import org.apache.hadoop.hive.metastore.messaging.EventMessage;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAddPartitionMessage;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterPartitionMessage;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterTableMessage;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateDatabaseMessage;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateTableMessage;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropDatabaseMessage;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropPartitionMessage;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropTableMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Apply newer events to the full update.
- *
- * <p>The process of obtaining ful snapshot from HMS is not atomic.
- * While we read information from HMS it may change - some new objects can be created,
- * or some can be removed or modified. This class is used to reconsile changes to
- * the full snapshot.
- */
-final class FullUpdateModifier {
- private static final Logger LOGGER = LoggerFactory.getLogger(FullUpdateModifier.class);
-
- // Prevent creation of class instances
- private FullUpdateModifier() {
- }
-
- /**
- * Take a full snapshot and apply an MS event to it.
- *
- * <p>We pass serializer as a parameter to simplify testing.
- *
- * @param image Full snapshot
- * @param event HMS notificatin event
- * @param deserializer Message deserializer -
- * should produce Sentry JSON serializer type messages.
- */
- // NOTE: we pass deserializer here instead of using built-in one to simplify testing.
- // Tests use mock serializers and thus we do not have to construct proper events.
- static void applyEvent(Map<String, Collection<String>> image, NotificationEvent event,
- MessageDeserializer deserializer) {
- EventMessage.EventType eventType =
- EventMessage.EventType.valueOf(event.getEventType());
-
- switch (eventType) {
- case CREATE_DATABASE:
- createDatabase(image, event, deserializer);
- break;
- case DROP_DATABASE:
- dropDatabase(image, event, deserializer);
- break;
- case CREATE_TABLE:
- createTable(image, event, deserializer);
- break;
- case DROP_TABLE:
- dropTable(image, event, deserializer);
- break;
- case ALTER_TABLE:
- alterTable(image, event, deserializer);
- break;
- case ADD_PARTITION:
- addPartition(image, event, deserializer);
- break;
- case DROP_PARTITION:
- dropPartition(image, event, deserializer);
- break;
- case ALTER_PARTITION:
- alterPartition(image, event, deserializer);
- break;
- default:
- LOGGER.error("Notification with ID:{} has invalid event type: {}", event.getEventId(),
- event.getEventType());
- break;
- }
- }
-
- /**
- * Add mapping from the new database name to location {dbname: {location}}.
- */
- private static void createDatabase(Map<String, Collection<String>> image, NotificationEvent event,
- MessageDeserializer deserializer) {
- SentryJSONCreateDatabaseMessage message =
- (SentryJSONCreateDatabaseMessage) deserializer
- .getCreateDatabaseMessage(event.getMessage());
-
- String dbName = message.getDB();
- if ((dbName == null) || dbName.isEmpty()) {
- LOGGER.error("Create database event is missing database name");
- return;
- }
- dbName = dbName.toLowerCase();
-
- String location = message.getLocation();
- if ((location == null) || location.isEmpty()) {
- LOGGER.error("Create database event is missing database location");
- return;
- }
-
- String path = FullUpdateInitializer.pathFromURI(location);
- if (path == null) {
- return;
- }
-
- // Add new database if it doesn't exist yet
- if (!image.containsKey(dbName)) {
- LOGGER.debug("create database {} with location {}", dbName, location);
- image.put(dbName.intern(), Collections.singleton(path));
- } else {
- // Sanity check the information and print warnings if database exists but
- // with a different location
- Set<String> oldLocations = (Set<String>)image.get(dbName);
- LOGGER.debug("database {} already exists, ignored", dbName);
- if (!oldLocations.contains(location)) {
- LOGGER.warn("database {} exists but location is different from {}", dbName, location);
- }
- }
- }
-
- /**
- * Remove a mapping from database name and remove all mappings which look like dbName.tableName
- * where dbName matches database name.
- */
- private static void dropDatabase(Map<String, Collection<String>> image, NotificationEvent event,
- MessageDeserializer deserializer) {
- SentryJSONDropDatabaseMessage message =
- (SentryJSONDropDatabaseMessage) deserializer.getDropDatabaseMessage(event.getMessage());
-
- String dbName = message.getDB();
- if ((dbName == null) || dbName.isEmpty()) {
- LOGGER.error("Drop database event is missing database name");
- return;
- }
- dbName = dbName.toLowerCase();
- String location = message.getLocation();
- if ((location == null) || location.isEmpty()) {
- LOGGER.error("Drop database event is missing database location");
- return;
- }
-
- String path = FullUpdateInitializer.pathFromURI(location);
- if (path == null) {
- return;
- }
-
- // If the database is alreday deleted, we have nothing to do
- Set<String> locations = (Set<String>)image.get(dbName);
- if (locations == null) {
- LOGGER.debug("database {} is already deleted", dbName);
- return;
- }
-
- if (!locations.contains(path)) {
- LOGGER.warn("Database {} location does not match {}", dbName, path);
- return;
- }
-
- LOGGER.debug("drop database {} with location {}", dbName, location);
-
- // Drop information about the database
- image.remove(dbName);
-
- String dbPrefix = dbName + ".";
-
- // Remove all objects for this database
- for (Iterator<Map.Entry<String, Collection<String>>> it = image.entrySet().iterator();
- it.hasNext(); ) {
- Map.Entry<String, Collection<String>> entry = it.next();
- String key = entry.getKey();
- if (key.startsWith(dbPrefix)) {
- LOGGER.debug("Removing {}", key);
- it.remove();
- }
- }
- }
-
- /**
- * Add mapping for dbName.tableName.
- */
- private static void createTable(Map<String, Collection<String>> image, NotificationEvent event,
- MessageDeserializer deserializer) {
- SentryJSONCreateTableMessage message = (SentryJSONCreateTableMessage) deserializer
- .getCreateTableMessage(event.getMessage());
-
- String dbName = message.getDB();
- if ((dbName == null) || dbName.isEmpty()) {
- LOGGER.error("Create table event is missing database name");
- return;
- }
- String tableName = message.getTable();
- if ((tableName == null) || tableName.isEmpty()) {
- LOGGER.error("Create table event is missing table name");
- return;
- }
-
- String location = message.getLocation();
- if ((location == null) || location.isEmpty()) {
- LOGGER.error("Create table event is missing table location");
- return;
- }
-
- String path = FullUpdateInitializer.pathFromURI(location);
- if (path == null) {
- return;
- }
-
- String authName = dbName.toLowerCase() + "." + tableName.toLowerCase();
- // Add new table if it doesn't exist yet
- if (!image.containsKey(authName)) {
- LOGGER.debug("create table {} with location {}", authName, location);
- Set<String> locations = new HashSet<>(1);
- locations.add(path);
- image.put(authName.intern(), locations);
- } else {
- // Sanity check the information and print warnings if table exists but
- // with a different location
- Set<String> oldLocations = (Set<String>)image.get(authName);
- LOGGER.debug("Table {} already exists, ignored", authName);
- if (!oldLocations.contains(location)) {
- LOGGER.warn("Table {} exists but location is different from {}", authName, location);
- }
- }
- }
-
- /**
- * Drop mapping from dbName.tableName
- */
- private static void dropTable(Map<String, Collection<String>> image, NotificationEvent event,
- MessageDeserializer deserializer) {
- SentryJSONDropTableMessage message = (SentryJSONDropTableMessage) deserializer
- .getDropTableMessage(event.getMessage());
-
- String dbName = message.getDB();
- if ((dbName == null) || dbName.isEmpty()) {
- LOGGER.error("Drop table event is missing database name");
- return;
- }
- String tableName = message.getTable();
- if ((tableName == null) || tableName.isEmpty()) {
- LOGGER.error("Drop table event is missing table name");
- return;
- }
-
- String location = message.getLocation();
- if ((location == null) || location.isEmpty()) {
- LOGGER.error("Drop table event is missing table location");
- return;
- }
-
- String path = FullUpdateInitializer.pathFromURI(location);
- if (path == null) {
- return;
- }
-
- String authName = dbName.toLowerCase() + "." + tableName.toLowerCase();
- Set<String> locations = (Set<String>)image.get(authName);
- if (locations != null && locations.contains(path)) {
- LOGGER.debug("Removing {}", authName);
- image.remove(authName);
- } else {
- LOGGER.warn("can't find matching table {} with location {}", authName, location);
- }
- }
-
- /**
- * ALTER TABLE is a complicated function that can alter multiple things.
- *
- * <p>We take care iof the following cases:
- * <ul>
- * <li>Change database name. This is the most complicated one.
- * We need to change the actual database name and change all mappings
- * that look like "dbName.tableName" to the new dbName</li>
- * <li>Change table name</li>
- * <li>Change location</li>
- * </ul>
- *
- */
- private static void alterTable(Map<String, Collection<String>> image, NotificationEvent event,
- MessageDeserializer deserializer) {
- SentryJSONAlterTableMessage message =
- (SentryJSONAlterTableMessage) deserializer.getAlterTableMessage(event.getMessage());
- String prevDbName = message.getDB();
- if ((prevDbName == null) || prevDbName.isEmpty()) {
- LOGGER.error("Alter table event is missing old database name");
- return;
- }
- prevDbName = prevDbName.toLowerCase();
- String prevTableName = message.getTable();
- if ((prevTableName == null) || prevTableName.isEmpty()) {
- LOGGER.error("Alter table event is missing old table name");
- return;
- }
- prevTableName = prevTableName.toLowerCase();
-
- String newDbName = event.getDbName();
- if ((newDbName == null) || newDbName.isEmpty()) {
- LOGGER.error("Alter table event is missing new database name");
- return;
- }
- newDbName = newDbName.toLowerCase();
-
- String newTableName = event.getTableName();
- if ((newTableName == null) || newTableName.isEmpty()) {
- LOGGER.error("Alter table event is missing new table name");
- return;
- }
- newTableName = newTableName.toLowerCase();
-
- String prevLocation = message.getOldLocation();
- if ((prevLocation == null) || prevLocation.isEmpty()) {
- LOGGER.error("Alter table event is missing old location");
- return;
- }
- String prevPath = FullUpdateInitializer.pathFromURI(prevLocation);
- if (prevPath == null) {
- return;
- }
-
- String newLocation = message.getNewLocation();
- if ((newLocation == null) || newLocation.isEmpty()) {
- LOGGER.error("Alter table event is missing new location");
- return;
- }
- String newPath = FullUpdateInitializer.pathFromURI(newLocation);
- if (newPath == null) {
- return;
- }
-
- String prevAuthName = prevDbName + "." + prevTableName;
- String newAuthName = newDbName + "." + newTableName;
-
- if (!prevDbName.equals(newDbName)) {
- // Database name change
- LOGGER.debug("Changing database name: {} -> {}", prevDbName, newDbName);
- Set<String> locations = (Set<String>)image.get(prevDbName);
- if (locations != null) {
- // Rename database if it is not renamed yet
- if (!image.containsKey(newDbName)) {
- image.put(newDbName, locations);
- image.remove(prevDbName);
- // Walk through all tables and rename DB part of the AUTH name
- // AUTH name is "dbName.TableName" so we need to replace dbName with the new name
- String prevDbPrefix = prevDbName + ".";
- String newDbPrefix = newDbName + ".";
- renamePrefixKeys(image, prevDbPrefix, newDbPrefix);
- } else {
- LOGGER.warn("database {} rename: found existing database {}", prevDbName, newDbName);
- }
- } else {
- LOGGER.debug("database {} not found", prevDbName);
- }
- }
-
- if (!prevAuthName.equals(newAuthName)) {
- // Either the database name or table name changed, rename objects
- Set<String> locations = (Set<String>)image.get(prevAuthName);
- if (locations != null) {
- // Rename if it is not renamed yet
- if (!image.containsKey(newAuthName)) {
- LOGGER.debug("rename {} -> {}", prevAuthName, newAuthName);
- image.put(newAuthName, locations);
- image.remove(prevAuthName);
- } else {
- LOGGER.warn("auth {} rename: found existing object {}", prevAuthName, newAuthName);
- }
- } else {
- LOGGER.debug("auth {} not found", prevAuthName);
- }
- }
-
- if (!prevPath.equals(newPath)) {
- LOGGER.debug("Location change: {} -> {}", prevPath, newPath);
- // Location change
- Set<String> locations = (Set<String>) image.get(newAuthName);
- if (locations != null && locations.contains(prevPath) && !locations.contains(newPath)) {
- locations.remove(prevPath);
- locations.add(newPath);
- } else {
- LOGGER.warn("can not process location change for {}", newAuthName);
- LOGGER.warn("old locatio = {}, new location = {}", prevPath, newPath);
- }
- }
- }
-
- /**
- * Add partition just adds a new location to the existing table.
- */
- private static void addPartition(Map<String, Collection<String>> image, NotificationEvent event,
- MessageDeserializer deserializer) {
- SentryJSONAddPartitionMessage message =
- (SentryJSONAddPartitionMessage) deserializer.getAddPartitionMessage(event.getMessage());
-
- String dbName = message.getDB();
- if ((dbName == null) || dbName.isEmpty()) {
- LOGGER.error("Add partition event is missing database name");
- return;
- }
- String tableName = message.getTable();
- if ((tableName == null) || tableName.isEmpty()) {
- LOGGER.error("Add partition event for {} is missing table name", dbName);
- return;
- }
-
- String authName = dbName.toLowerCase() + "." + tableName.toLowerCase();
-
- List<String> locations = message.getLocations();
- if (locations == null || locations.isEmpty()) {
- LOGGER.error("Add partition event for {} is missing partition locations", authName);
- return;
- }
-
- Set<String> oldLocations = (Set<String>) image.get(authName);
- if (oldLocations == null) {
- LOGGER.warn("Add partition for {}: missing table locations",authName);
- return;
- }
-
- // Add each partition
- for (String location: locations) {
- String path = FullUpdateInitializer.pathFromURI(location);
- if (path != null) {
- LOGGER.debug("Adding partition {}:{}", authName, path);
- oldLocations.add(path);
- }
- }
- }
-
- /**
- * Drop partition removes location from the existing table.
- */
- private static void dropPartition(Map<String, Collection<String>> image, NotificationEvent event,
- MessageDeserializer deserializer) {
- SentryJSONDropPartitionMessage message =
- (SentryJSONDropPartitionMessage) deserializer
- .getDropPartitionMessage(event.getMessage());
- String dbName = message.getDB();
- if ((dbName == null) || dbName.isEmpty()) {
- LOGGER.error("Drop partition event is missing database name");
- return;
- }
- String tableName = message.getTable();
- if ((tableName == null) || tableName.isEmpty()) {
- LOGGER.error("Drop partition event for {} is missing table name", dbName);
- return;
- }
-
- String authName = dbName.toLowerCase() + "." + tableName.toLowerCase();
-
- List<String> locations = message.getLocations();
- if (locations == null || locations.isEmpty()) {
- LOGGER.error("Drop partition event for {} is missing partition locations", authName);
- return;
- }
-
- Set<String> oldLocations = (Set<String>) image.get(authName);
- if (oldLocations == null) {
- LOGGER.warn("Add partition for {}: missing table locations",authName);
- return;
- }
-
- // Drop each partition
- for (String location: locations) {
- String path = FullUpdateInitializer.pathFromURI(location);
- if (path != null) {
- oldLocations.remove(path);
- }
- }
- }
-
- private static void alterPartition(Map<String, Collection<String>> image, NotificationEvent event,
- MessageDeserializer deserializer) {
- SentryJSONAlterPartitionMessage message =
- (SentryJSONAlterPartitionMessage) deserializer
- .getAlterPartitionMessage(event.getMessage());
-
- String dbName = message.getDB();
- if ((dbName == null) || dbName.isEmpty()) {
- LOGGER.error("Alter partition event is missing database name");
- return;
- }
- String tableName = message.getTable();
- if ((tableName == null) || tableName.isEmpty()) {
- LOGGER.error("Alter partition event for {} is missing table name", dbName);
- return;
- }
-
- String authName = dbName.toLowerCase() + "." + tableName.toLowerCase();
-
- String prevLocation = message.getOldLocation();
- if (prevLocation == null || prevLocation.isEmpty()) {
- LOGGER.error("Alter partition event for {} is missing old location", authName);
- }
-
- String prevPath = FullUpdateInitializer.pathFromURI(prevLocation);
- if (prevPath == null) {
- return;
- }
-
- String newLocation = message.getNewLocation();
- if (newLocation == null || newLocation.isEmpty()) {
- LOGGER.error("Alter partition event for {} is missing new location", authName);
- }
-
- String newPath = FullUpdateInitializer.pathFromURI(newLocation);
- if (newPath == null) {
- return;
- }
-
- if (prevPath.equals(newPath)) {
- LOGGER.warn("Alter partition event for {} has the same old and new path {}",
- authName, prevPath);
- return;
- }
-
- Set<String> locations = (Set<String>) image.get(authName);
- if (locations == null) {
- LOGGER.warn("Missing partition locations for {}", authName);
- return;
- }
-
- // Rename partition
- if (locations.remove(prevPath)) {
- LOGGER.debug("Renaming {} to {}", prevPath, newPath);
- locations.add(newPath);
- }
- }
-
- /**
- * Walk through the map and rename all instances of oldKey to newKey.
- */
- @VisibleForTesting
- protected static void renamePrefixKeys(Map<String, Collection<String>> image,
- String oldKey, String newKey) {
- // The trick is that we can't just iterate through the map, remove old values and
- // insert new values. While we can remove old values with iterators,
- // we can't insert new ones while we walk. So we collect the keys to be added in
- // a new map and merge them in the end.
- Map<String, Set<String>> replacement = new HashMap<>();
-
- for (Iterator<Map.Entry<String, Collection<String>>> it = image.entrySet().iterator();
- it.hasNext(); ) {
- Map.Entry<String, Collection<String>> entry = it.next();
- String key = entry.getKey();
- if (key.startsWith(oldKey)) {
- String updatedKey = key.replaceAll("^" + oldKey + "(.*)", newKey + "$1");
- if (!image.containsKey(updatedKey)) {
- LOGGER.debug("Rename {} to {}", key, updatedKey);
- replacement.put(updatedKey, (Set<String>) entry.getValue());
- it.remove();
- } else {
- LOGGER.warn("skipping key {} - already present", updatedKey);
- }
- }
- }
-
- mergeMaps(image, replacement);
- }
-
- /**
- * Merge replacement values into the original map but only if they are not
- * already there.
- *
- * @param m1 source map
- * @param m2 map with replacement values
- */
- private static void mergeMaps(Map<String, Collection<String>> m1, Map<String, Set<String>> m2) {
- // Merge replacement values into the original map but only if they are not
- // already there
- for (Map.Entry<String, Set<String>> entry : m2.entrySet()) {
- if (!m1.containsKey(entry.getKey())) {
- m1.put(entry.getKey(), entry.getValue());
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/b97f5c7a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/GSSCallback.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/GSSCallback.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/GSSCallback.java
deleted file mode 100644
index d2d85d3..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/GSSCallback.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.service.thrift;
-
-import java.util.Arrays;
-import java.util.List;
-
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.AuthorizeCallback;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.SaslRpcServer;
-import org.apache.sentry.core.common.exception.ConnectionDeniedException;
-import org.apache.sentry.service.common.ServiceConstants.ServerConfig;
-
-public class GSSCallback extends SaslRpcServer.SaslGssCallbackHandler {
-
- private final Configuration conf;
- public GSSCallback(Configuration conf) {
- super();
- this.conf = conf;
- }
-
- boolean comparePrincipals(String principal1, String principal2) {
- String[] principalParts1 = SaslRpcServer.splitKerberosName(principal1);
- String[] principalParts2 = SaslRpcServer.splitKerberosName(principal2);
- if (principalParts1.length == 0 || principalParts2.length == 0) {
- return false;
- }
- if (principalParts1.length == principalParts2.length) {
- for (int i=0; i < principalParts1.length; i++) {
- if (!principalParts1[i].equals(principalParts2[i])) {
- return false;
- }
- }
- return true;
- } else {
- return false;
- }
- }
-
- boolean allowConnect(String principal) {
- String allowedPrincipals = conf.get(ServerConfig.ALLOW_CONNECT);
- if (allowedPrincipals == null) {
- return false;
- }
- String principalShortName = getShortName(principal);
- List<String> items = Arrays.asList(allowedPrincipals.split("\\s*,\\s*"));
- for (String item : items) {
- if (comparePrincipals(item, principalShortName)) {
- return true;
- }
- }
- return false;
- }
-
- private String getShortName(String principal) {
- String parts[] = SaslRpcServer.splitKerberosName(principal);
- return parts[0];
- }
-
- @Override
- public void handle(Callback[] callbacks)
- throws UnsupportedCallbackException, ConnectionDeniedException {
- AuthorizeCallback ac = null;
- for (Callback callback : callbacks) {
- if (callback instanceof AuthorizeCallback) {
- ac = (AuthorizeCallback) callback;
- } else {
- throw new UnsupportedCallbackException(callback,
- "Unrecognized SASL GSSAPI Callback");
- }
- }
- if (ac != null) {
- String authid = ac.getAuthenticationID();
- String authzid = ac.getAuthorizationID();
-
- if (allowConnect(authid)) {
- if (authid.equals(authzid)) {
- ac.setAuthorized(true);
- } else {
- ac.setAuthorized(false);
- }
- if (ac.isAuthorized()) {
- ac.setAuthorizedID(authzid);
- }
- } else {
- throw new ConnectionDeniedException(ac,
- "Connection to sentry service denied due to lack of client credentials",
- authid);
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sentry/blob/b97f5c7a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java
deleted file mode 100644
index 7831430..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.service.thrift;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-
-/**
- * AutoCloseable wrapper around HiveMetaStoreClient.
- * It is only used to provide try-with-resource semantics for
- * {@link HiveMetaStoreClient}.
- */
-public class HMSClient implements AutoCloseable {
- private final HiveMetaStoreClient client;
- private boolean valid;
-
- public HMSClient(HiveMetaStoreClient client) {
- this.client = Preconditions.checkNotNull(client);
- valid = true;
- }
-
- public HiveMetaStoreClient getClient() {
- return client;
- }
-
- public void invalidate() {
- if (valid) {
- client.close();
- valid = false;
- }
- }
-
- @Override
- public void close() {
- if (valid) {
- client.close();
- valid = false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/b97f5c7a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollowerState.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollowerState.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollowerState.java
deleted file mode 100644
index 74268f7..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollowerState.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with the License. You may obtain
- * a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.sentry.service.thrift;
-
-/**
- * States for the HMSFollower
- */
-public enum HMSFollowerState implements SentryState {
- /**
- * If the HMSFollower has been started or not.
- */
- STARTED,
-
- /**
- * If the HMSFollower is connected to the HMS
- */
- CONNECTED;
-
- /**
- * The component name this state is for.
- */
- public static final String COMPONENT = "HMSFollower";
-
- /**
- * {@inheritDoc}
- */
- @Override
- public long getValue() {
- return 1 << this.ordinal();
- }
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/b97f5c7a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java
deleted file mode 100644
index 62542c3..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.service.thrift;
-
-import org.apache.hadoop.hive.metastore.api.MetaException;
-
-import java.io.IOException;
-
-public interface HiveConnectionFactory extends AutoCloseable {
- /**
- * Open a new connection to HMS.
- *
- * @return connection to HMS.
- * @throws IOException if connection establishement failed.
- * @throws InterruptedException if connection establishment was interrupted.
- * @throws MetaException if connection establishement failed.
- */
- HMSClient connect() throws IOException, InterruptedException, MetaException;
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/b97f5c7a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java
deleted file mode 100644
index 93cc34f..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
- <p>
- http://www.apache.org/licenses/LICENSE-2.0
- <p>
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
-
-package org.apache.sentry.service.thrift;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
-import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
-import org.apache.sentry.hdfs.UniquePathsUpdate;
-import org.apache.sentry.provider.db.service.persistent.SentryStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Helper class used to fetch Hive MetaStore notifications.
- */
-public final class HiveNotificationFetcher implements AutoCloseable {
- private static final Logger LOGGER = LoggerFactory.getLogger(HiveNotificationFetcher.class);
-
- private final SentryStore sentryStore;
- private final HiveConnectionFactory hmsConnectionFactory;
- private HiveMetaStoreClient hmsClient;
-
- /* The following cache and last filtered ID help us to avoid making less calls to the DB */
- private long lastIdFiltered = 0;
- private Set<String> cache = new HashSet<>();
-
- public HiveNotificationFetcher(SentryStore sentryStore, HiveConnectionFactory hmsConnectionFactory) {
- this.sentryStore = sentryStore;
- this.hmsConnectionFactory = hmsConnectionFactory;
- }
-
- /**
- * Fetch new HMS notifications appeared since a specified event ID. The returned list may
- * include notifications with the same specified ID if they were not seen by Sentry.
- *
- * @param lastEventId The event ID to use to request notifications.
- * @return A list of newer notifications unseen by Sentry.
- * @throws Exception If an error occurs on the HMS communication.
- */
- public List<NotificationEvent> fetchNotifications(long lastEventId) throws Exception {
- return fetchNotifications(lastEventId, Integer.MAX_VALUE);
- }
-
- /**
- * Fetch new HMS notifications appeared since a specified event ID. The returned list may
- * include notifications with the same specified ID if they were not seen by Sentry.
- *
- * @param lastEventId The event ID to use to request notifications.
- * @param maxEvents The maximum number of events to fetch.
- * @return A list of newer notifications unseen by Sentry.
- * @throws Exception If an error occurs on the HMS communication.
- */
- List<NotificationEvent> fetchNotifications(long lastEventId, int maxEvents) throws Exception {
- NotificationFilter filter = null;
-
- /*
- * HMS may bring duplicated events that were committed later than the previous request. To bring
- * those newer duplicated events, we request new notifications from the last seen ID - 1.
- *
- * A current problem is that we could miss duplicates committed much more later, but because
- * HMS does not guarantee the order of those, then it is safer to avoid processing them.
- *
- * TODO: We can avoid doing this once HIVE-16886 is fixed.
- */
- if (lastEventId > 0) {
- filter = createNotificationFilterFor(lastEventId);
- lastEventId--;
- }
-
- LOGGER.debug("Requesting HMS notifications since ID = {}", lastEventId);
-
- NotificationEventResponse response;
- try {
- response = getHmsClient().getNextNotification(lastEventId, maxEvents, filter);
- } catch (Exception e) {
- close();
- throw e;
- }
-
- if (response != null && response.isSetEvents()) {
- LOGGER.debug("Fetched {} new HMS notification(s)", response.getEventsSize());
- return response.getEvents();
- }
-
- return Collections.emptyList();
- }
-
- /**
- * Returns a HMS notification filter for a specific notification ID. HMS notifications may
- * have duplicated IDs, so the filter uses a SHA-1 hash to check for a unique notification.
- *
- * @param id the notification ID to filter
- * @return the HMS notification filter
- */
- private NotificationFilter createNotificationFilterFor(final long id) {
- /*
- * A SHA-1 hex value that keeps unique notifications processed is persisted on the Sentry DB.
- * To keep unnecessary calls to the DB, we use a cache that keeps seen hashes of the
- * specified ID. If a new filter ID is used, then we clean up the cache.
- */
-
- if (lastIdFiltered != id) {
- lastIdFiltered = id;
- cache.clear();
- }
-
- return new NotificationFilter() {
- @Override
- public boolean accept(NotificationEvent notificationEvent) {
- if (notificationEvent.getEventId() == id) {
- String hash = UniquePathsUpdate.sha1(notificationEvent);
-
- try {
- if (cache.contains(hash) || sentryStore.isNotificationProcessed(hash)) {
- cache.add(hash);
-
- LOGGER.debug("Ignoring HMS notification already processed: ID = {}", id);
- return false;
- }
- } catch (Exception e) {
- LOGGER.error("An error occurred while checking if notification {} is already "
- + "processed: {}", id, e.getMessage());
-
- // We cannot throw an exception on this filter, so we return false assuming this
- // notification is already processed
- return false;
- }
- }
-
- return true;
- }
- };
- }
-
- /**
- * Gets the HMS client connection object.
- * If will create a new connection if no connection object exists.
- *
- * @return The HMS client used to communication with the Hive MetaStore.
- * @throws Exception If it cannot connect to the HMS service.
- */
- private HiveMetaStoreClient getHmsClient() throws Exception {
- if (hmsClient == null) {
- try {
- hmsClient = hmsConnectionFactory.connect().getClient();
- } catch (Exception e) {
- LOGGER.error("Fail to connect to the HMS service: {}", e.getMessage());
- throw e;
- }
- }
-
- return hmsClient;
- }
-
- /**
- * @return the latest notification Id logged by the HMS
- * @throws Exception when an error occurs when talking to the HMS client
- */
- public long getCurrentNotificationId() throws Exception {
- CurrentNotificationEventId eventId;
- try {
- eventId = getHmsClient().getCurrentNotificationEventId();
- } catch (Exception e) {
- close();
- throw e;
- }
-
- if (eventId != null && eventId.isSetEventId()) {
- return eventId.getEventId();
- }
-
- return SentryStore.EMPTY_NOTIFICATION_ID;
- }
-
- /* AutoCloseable implementations */
-
- @Override
- public void close() {
- try {
- if (hmsClient != null) {
- hmsClient.close();
- }
-
- cache.clear();
- } finally {
- hmsClient = null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/b97f5c7a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java
deleted file mode 100644
index 31e58fd..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.service.thrift;
-
-import com.google.common.base.Preconditions;
-
-import java.io.File;
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import javax.security.auth.login.LoginException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SaslRpcServer;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-
-import org.apache.sentry.service.common.ServiceConstants.ServerConfig;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Factory used to generate Hive connections.
- * Supports insecure and Kerberos connections.
- * For Kerberos connections starts the ticket renewal thread and sets
- * up Kerberos credentials.
- */
-public final class HiveSimpleConnectionFactory implements HiveConnectionFactory {
- private static final Logger LOGGER = LoggerFactory.getLogger(HiveSimpleConnectionFactory.class);
-
- /** Sentty configuration */
- private final Configuration conf;
- /** Hive configuration */
- private final HiveConf hiveConf;
- private final boolean insecure;
- private SentryKerberosContext kerberosContext = null;
-
- public HiveSimpleConnectionFactory(Configuration sentryConf, HiveConf hiveConf) {
- this.conf = sentryConf;
- this.hiveConf = hiveConf;
- insecure = !ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase(
- sentryConf.get(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_NONE).trim());
- }
-
- /**
- * Initialize the Factory.
- * For insecure connections there is nothing to initialize.
- * For Kerberos connections sets up ticket renewal thread.
- * @throws IOException
- * @throws LoginException
- */
- public void init() throws IOException, LoginException {
- if (insecure) {
- LOGGER.info("Using insecure connection to HMS");
- return;
- }
-
- int port = conf.getInt(ServerConfig.RPC_PORT, ServerConfig.RPC_PORT_DEFAULT);
- String rawPrincipal = Preconditions.checkNotNull(conf.get(ServerConfig.PRINCIPAL),
- "%s is required", ServerConfig.PRINCIPAL);
- String principal = SecurityUtil.getServerPrincipal(rawPrincipal, NetUtils.createSocketAddr(
- conf.get(ServerConfig.RPC_ADDRESS, ServerConfig.RPC_ADDRESS_DEFAULT),
- port).getAddress());
- LOGGER.debug("Opening kerberos connection to HMS using kerberos principal {}", principal);
- String[] principalParts = SaslRpcServer.splitKerberosName(principal);
- Preconditions.checkArgument(principalParts.length == 3,
- "Kerberos principal %s should have 3 parts", principal);
- String keytab = Preconditions.checkNotNull(conf.get(ServerConfig.KEY_TAB),
- "Configuration is missing required %s paraeter", ServerConfig.KEY_TAB);
- File keytabFile = new File(keytab);
- Preconditions.checkState(keytabFile.isFile() && keytabFile.canRead(),
- "Keytab %s does not exist or is not readable", keytab);
- // Instantiating SentryKerberosContext in non-server mode handles the ticket renewal.
- kerberosContext = new SentryKerberosContext(principal, keytab, false);
- UserGroupInformation.setConfiguration(conf);
- LOGGER.info("Using secure connection to HMS");
- }
-
- /**
- * Connect to HMS in unsecure mode or in Kerberos mode according to config.
- *
- * @return HMS connection
- * @throws IOException if could not establish connection
- * @throws InterruptedException if connection was interrupted
- * @throws MetaException if other errors happened
- */
- public HMSClient connect() throws IOException, InterruptedException, MetaException {
- if (insecure) {
- return new HMSClient(new HiveMetaStoreClient(hiveConf));
- }
- UserGroupInformation clientUGI =
- UserGroupInformation.getUGIFromSubject(kerberosContext.getSubject());
- return new HMSClient(clientUGI.doAs(new PrivilegedExceptionAction<HiveMetaStoreClient>() {
- @Override
- public HiveMetaStoreClient run() throws MetaException {
- return new HiveMetaStoreClient(hiveConf);
- }
- }));
- }
-
- @Override
- public void close() throws Exception {
- if (kerberosContext != null) {
- kerberosContext.shutDown();
- kerberosContext = null;
- }
- }
-}