You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ak...@apache.org on 2017/07/11 20:20:36 UTC
[2/2] sentry git commit: SENTRY-1630: out of sequence error in
HMSFollower (Alex Kolbasov, reviewed by Vamsee Yarlagadda and Na Li)
SENTRY-1630: out of sequence error in HMSFollower (Alex Kolbasov, reviewed by Vamsee Yarlagadda and Na Li)
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/747c2260
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/747c2260
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/747c2260
Branch: refs/heads/sentry-ha-redesign
Commit: 747c226013f31d02e623b82902f2bc62a87fc4e9
Parents: 5c1d559
Author: Alexander Kolbasov <ak...@cloudera.com>
Authored: Tue Jul 11 22:20:09 2017 +0200
Committer: Alexander Kolbasov <ak...@cloudera.com>
Committed: Tue Jul 11 22:20:09 2017 +0200
----------------------------------------------------------------------
.../sentry/hdfs/FullUpdateInitializer.java | 454 -----------------
.../sentry/hdfs/TestFullUpdateInitializer.java | 320 ------------
.../service/thrift/FullUpdateInitializer.java | 492 +++++++++++++++++++
.../apache/sentry/service/thrift/HMSClient.java | 56 +++
.../sentry/service/thrift/HMSFollower.java | 168 ++-----
.../service/thrift/HiveConnectionFactory.java | 35 ++
.../thrift/HiveSimpleConnectionFactory.java | 129 +++++
.../service/thrift/SentryKerberosContext.java | 6 +-
.../sentry/service/thrift/SentryService.java | 21 +-
.../thrift/TestFullUpdateInitializer.java | 346 +++++++++++++
10 files changed, 1115 insertions(+), 912 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sentry/blob/747c2260/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
deleted file mode 100644
index cf9774c..0000000
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
+++ /dev/null
@@ -1,454 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.hdfs;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Manage fetching full snapshot from HMS.
- * Snapshot is represented as a map from the hive object name to
- * the set of paths for this object.
- * The hive object name is either the Hive database name or
- * Hive database name joined with Hive table name as {@code dbName.tableName}.
- * All table partitions are stored under the table object.
- * <p>
- * Once {@link FullUpdateInitializer}, the {@link FullUpdateInitializer#getFullHMSSnapshot()}
- * method should be called to get the initial update.
- * <p>
- * It is important to close the {@link FullUpdateInitializer} object to prevent resource
- * leaks.
- * <p>
- * The usual way of using {@link FullUpdateInitializer} is
- * <pre>
- * {@code
- * try (FullUpdateInitializer updateInitializer =
- * new FullUpdateInitializer(client, authzConf)) {
- * Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot();
- * return pathsUpdate;
- * }
- */
-public final class FullUpdateInitializer implements AutoCloseable {
-
- /*
- * Implementation note.
- *
- * The snapshot is obtained using an executor. We follow the map/reduce model.
- * Each executor thread (mapper) obtains and returns a partial snapshot which are then
- * reduced to a single combined snapshot by getFullHMSSnapshot().
- *
- * Synchronization between the getFullHMSSnapshot() and executors is done using the
- * 'results' queue. The queue holds the futures for each scheduled task.
- * It is initially populated by getFullHMSSnapshot and each task may add new future
- * results to it. Only getFullHMSSnapshot() removes entries from the results queue.
- * This guarantees that once the results queue is empty there are no pending jobs.
- *
- * Since there are no other data sharing, the implementation is safe without
- * any other synchronization. It is not thread-safe for concurrent calls
- * to getFullHMSSnapshot().
- *
- */
-
- private final ExecutorService threadPool;
- private final HiveMetaStoreClient client;
- private final int maxPartitionsPerCall;
- private final int maxTablesPerCall;
- private final Deque<Future<CallResult>> results = new ConcurrentLinkedDeque<>();
- private final int maxRetries;
- private final int waitDurationMillis;
-
- private static final Logger LOGGER = LoggerFactory.getLogger(FullUpdateInitializer.class);
-
- private static final ObjectMapping emptyObjectMapping =
- new ObjectMapping(Collections.<String, Set<String>>emptyMap());
-
- /**
- * Extract path (not starting with "/") from the full URI
- * @param uri - resource URI (usually with scheme)
- * @return path if uri is valid or null
- */
- private static String pathFromURI(String uri) {
- try {
- return PathsUpdate.parsePath(uri);
- } catch (SentryMalformedPathException e) {
- LOGGER.warn(String.format("Ignoring invalid uri %s: %s",
- uri, e.getReason()));
- return null;
- }
- }
-
- /**
- * Mapping of object to set of paths.
- * Used to represent partial results from executor threads. Multiple
- * ObjectMapping objects are combined in a single mapping
- * to get the final result.
- */
- private static final class ObjectMapping {
- private final Map<String, Set<String>> objects;
-
- ObjectMapping(Map<String, Set<String>> objects) {
- this.objects = objects;
- }
-
- ObjectMapping(String authObject, String path) {
- Set<String> values = Collections.singleton(safeIntern(path));
- objects = ImmutableMap.of(authObject, values);
- }
-
- ObjectMapping(String authObject, Collection<String> paths) {
- Set<String> values = new HashSet<>(paths);
- objects = ImmutableMap.of(authObject, values);
- }
-
- Map<String, Set<String>> getObjects() {
- return objects;
- }
- }
-
- private static final class CallResult {
- private final Exception failure;
- private final boolean successStatus;
- private final ObjectMapping objectMapping;
-
- CallResult(Exception ex) {
- failure = ex;
- successStatus = false;
- objectMapping = emptyObjectMapping;
- }
-
- CallResult(ObjectMapping objectMapping) {
- failure = null;
- successStatus = true;
- this.objectMapping = objectMapping;
- }
-
- boolean success() {
- return successStatus;
- }
-
- ObjectMapping getObjectMapping() {
- return objectMapping;
- }
-
- public Exception getFailure() {
- return failure;
- }
- }
-
- private abstract class BaseTask implements Callable<CallResult> {
-
- /**
- * Class represents retry strategy for BaseTask.
- */
- private final class RetryStrategy {
- private int retryStrategyMaxRetries = 0;
- private final int retryStrategyWaitDurationMillis;
-
- private RetryStrategy(int retryStrategyMaxRetries, int retryStrategyWaitDurationMillis) {
- this.retryStrategyMaxRetries = retryStrategyMaxRetries;
-
- // Assign default wait duration if negative value is provided.
- this.retryStrategyWaitDurationMillis = (retryStrategyWaitDurationMillis > 0) ?
- retryStrategyWaitDurationMillis : 1000;
- }
-
- @SuppressWarnings({"squid:S1141", "squid:S2142"})
- public CallResult exec() {
- // Retry logic is happening inside callable/task to avoid
- // synchronous waiting on getting the result.
- // Retry the failure task until reach the max retry number.
- // Wait configurable duration for next retry.
- //
- // Only thrift exceptions are retried.
- // Other exceptions are propagated up the stack.
- Exception exception = null;
- try {
- // We catch all exceptions except Thrift exceptions which are retried
- for (int i = 0; i < retryStrategyMaxRetries; i++) {
- //noinspection NestedTryStatement
- try {
- return new CallResult(doTask());
- } catch (TException ex) {
- LOGGER.debug("Failed to execute task on " + (i + 1) + " attempts." +
- " Sleeping for " + retryStrategyWaitDurationMillis + " ms. Exception: " +
- ex.toString(), ex);
- exception = ex;
-
- try {
- Thread.sleep(retryStrategyWaitDurationMillis);
- } catch (InterruptedException ignored) {
- // Skip the rest retries if get InterruptedException.
- // And set the corresponding retries number.
- LOGGER.warn("Interrupted during update fetch during iteration " + (i + 1));
- break;
- }
- }
- }
- } catch (Exception ex) {
- exception = ex;
- }
- LOGGER.error("Failed to execute task", exception);
- // We will fail in the end, so we are shutting down the pool to prevent
- // new tasks from being scheduled.
- threadPool.shutdown();
- return new CallResult(exception);
- }
- }
-
- private final RetryStrategy retryStrategy;
-
- BaseTask() {
- retryStrategy = new RetryStrategy(maxRetries, waitDurationMillis);
- }
-
- @Override
- public CallResult call() throws Exception {
- return retryStrategy.exec();
- }
-
- abstract ObjectMapping doTask() throws TException;
- }
-
- private class PartitionTask extends BaseTask {
- private final String dbName;
- private final String tblName;
- private final String authName;
- private final List<String> partNames;
-
- PartitionTask(String dbName, String tblName, String authName,
- List<String> partNames) {
- this.dbName = safeIntern(dbName);
- this.tblName = safeIntern(tblName);
- this.authName = safeIntern(authName);
- this.partNames = partNames;
- }
-
- @Override
- ObjectMapping doTask() throws TException {
- List<Partition> tblParts = client.getPartitionsByNames(dbName, tblName, partNames);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("#### Fetching partitions " +
- "[" + dbName + "." + tblName + "]" + "[" + partNames + "]");
- }
- Collection<String> partitionNames = new ArrayList<>(tblParts.size());
- for (Partition part : tblParts) {
- String partPath = pathFromURI(part.getSd().getLocation());
- if (partPath != null) {
- partitionNames.add(partPath.intern());
- }
- }
- return new ObjectMapping(authName, partitionNames);
- }
- }
-
- private class TableTask extends BaseTask {
- private final String dbName;
- private final List<String> tableNames;
-
- TableTask(Database db, List<String> tableNames) {
- dbName = safeIntern(db.getName());
- this.tableNames = tableNames;
- }
-
- @Override
- @SuppressWarnings({"squid:S2629", "squid:S135"})
- ObjectMapping doTask() throws TException {
- List<Table> tables = client.getTableObjectsByName(dbName, tableNames);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("#### Fetching tables [" + dbName + "][" +
- tableNames + "]");
- }
- Map<String, Set<String>> objectMapping = new HashMap<>(tables.size());
- for (Table tbl : tables) {
- // Table names are case insensitive
- if (!tbl.getDbName().equalsIgnoreCase(dbName)) {
- // Inconsistency in HMS data
- LOGGER.warn(String.format("DB name %s for table %s does not match %s",
- tbl.getDbName(), tbl.getTableName(), dbName));
- continue;
- }
-
- String tableName = safeIntern(tbl.getTableName().toLowerCase());
- String authzObject = (dbName + "." + tableName).intern();
- List<String> tblPartNames = client.listPartitionNames(dbName, tableName, (short) -1);
- for (int i = 0; i < tblPartNames.size(); i += maxPartitionsPerCall) {
- List<String> partsToFetch = tblPartNames.subList(i,
- Math.min(i + maxPartitionsPerCall, tblPartNames.size()));
- Callable<CallResult> partTask = new PartitionTask(dbName,
- tableName, authzObject, partsToFetch);
- results.add(threadPool.submit(partTask));
- }
- String tblPath = safeIntern(pathFromURI(tbl.getSd().getLocation()));
- if (tblPath == null) {
- continue;
- }
- Set<String> paths = objectMapping.get(authzObject);
- if (paths == null) {
- paths = new HashSet<>(1);
- objectMapping.put(authzObject, paths);
- }
- paths.add(tblPath);
- }
- return new ObjectMapping(Collections.unmodifiableMap(objectMapping));
- }
- }
-
- private class DbTask extends BaseTask {
-
- private final String dbName;
-
- DbTask(String dbName) {
- //Database names are case insensitive
- this.dbName = safeIntern(dbName.toLowerCase());
- }
-
- @Override
- ObjectMapping doTask() throws TException {
- Database db = client.getDatabase(dbName);
- if (!dbName.equalsIgnoreCase(db.getName())) {
- LOGGER.warn("Database name {} does not match {}", db.getName(), dbName);
- return emptyObjectMapping;
- }
- List<String> allTblStr = client.getAllTables(dbName);
- for (int i = 0; i < allTblStr.size(); i += maxTablesPerCall) {
- List<String> tablesToFetch = allTblStr.subList(i,
- Math.min(i + maxTablesPerCall, allTblStr.size()));
- Callable<CallResult> tableTask = new TableTask(db, tablesToFetch);
- results.add(threadPool.submit(tableTask));
- }
- String dbPath = safeIntern(pathFromURI(db.getLocationUri()));
- return (dbPath != null) ? new ObjectMapping(dbName, dbPath) :
- emptyObjectMapping;
- }
- }
-
- public FullUpdateInitializer(HiveMetaStoreClient client, Configuration conf) {
- this.client = client;
- maxPartitionsPerCall = conf.getInt(
- ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC,
- ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC_DEFAULT);
- maxTablesPerCall = conf.getInt(
- ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC,
- ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC_DEFAULT);
- maxRetries = conf.getInt(
- ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM,
- ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM_DEFAULT);
- waitDurationMillis = conf.getInt(
- ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS,
- ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS_DEFAULT);
- threadPool = Executors.newFixedThreadPool(conf.getInt(
- ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS,
- ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS_DEFAULT));
- }
-
- /**
- * Get Full HMS snapshot.
- * @return Full snapshot of HMS objects.
- * @throws TException if Thrift error occured
- * @throws ExecutionException if there was a scheduling error
- * @throws InterruptedException if processing was interrupted
- */
- @SuppressWarnings("squid:S00112")
- public Map<String, Set<String>> getFullHMSSnapshot()
- throws Exception {
- // Get list of all HMS databases
- List<String> allDbStr = client.getAllDatabases();
- // Schedule async task for each database responsible for fetching per-database
- // objects.
- for (String dbName : allDbStr) {
- results.add(threadPool.submit(new DbTask(dbName)));
- }
-
- // Resulting full snapshot
- Map<String, Set<String>> fullSnapshot = new HashMap<>();
-
- // As async tasks complete, merge their results into full snapshot.
- while (!results.isEmpty()) {
- // This is the only thread that takes elements off the results list - all other threads
- // only add to it. Once the list is empty it can't become non-empty
- // This means that if we check that results is non-empty we can safely call pop() and
- // know that the result of poll() is not null.
- Future<CallResult> result = results.pop();
- // Wait for the task to complete
- CallResult callResult = result.get();
- // Fail if we got errors
- if (!callResult.success()) {
- throw callResult.getFailure();
- }
- // Merge values into fullUpdate
- Map<String, Set<String>> objectMapping =
- callResult.getObjectMapping().getObjects();
- for (Map.Entry<String, Set<String>> entry: objectMapping.entrySet()) {
- String key = entry.getKey();
- Set<String> val = entry.getValue();
- Set<String> existingSet = fullSnapshot.get(key);
- if (existingSet == null) {
- fullSnapshot.put(key, val);
- continue;
- }
- existingSet.addAll(val);
- }
- }
- return fullSnapshot;
- }
-
- @Override
- public void close() {
- threadPool.shutdownNow();
- try {
- threadPool.awaitTermination(1, TimeUnit.SECONDS);
- } catch (InterruptedException ignored) {
- LOGGER.warn("Interrupted shutdown");
- Thread.currentThread().interrupt();
- }
- }
-
- /**
- * Intern a string but only if it is not null
- * @param arg String to be interned, may be null
- * @return interned string or null
- */
- static String safeIntern(String arg) {
- return (arg != null) ? arg.intern() : null;
- }
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/747c2260/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java
deleted file mode 100644
index 389e9b8..0000000
--- a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.hdfs;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.thrift.TException;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class TestFullUpdateInitializer {
-
- private static Configuration conf = new Configuration();
-
- static {
- conf.setInt(ServiceConstants.ServerConfig
- .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC, 1);
- conf.setInt(ServiceConstants.ServerConfig
- .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC, 1);
- conf.setInt(ServiceConstants.ServerConfig
- .SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS, 8);
- }
-
- /**
- * Representation of a Hive table. A table has a name and a list of partitions.
- */
- private static class HiveTable {
- String name;
- List<String> partitions;
-
- HiveTable(String name) {
- this.name = name;
- this.partitions = new ArrayList<>();
- }
-
- HiveTable(String name, List<String> partitions) {
- this.name = name;
- this.partitions = partitions;
- if (this.partitions == null) {
- this.partitions = new ArrayList<>();
- }
- }
-
- HiveTable add(String partition) {
- partitions.add(partition);
- return this;
- }
- }
-
- /**
- * Representation of a Hive database. A database has a name and a list of tables
- */
- private static class HiveDb {
- String name;
- Collection<HiveTable> tables;
-
- HiveDb(String name) {
- this.name = name;
- tables = new ArrayList<>();
- }
-
- HiveDb(String name, Collection<HiveTable> tables) {
- this.name = name;
- this.tables = tables;
- if (this.tables == null) {
- this.tables = new ArrayList<>();
- }
- }
-
- void add(HiveTable table) {
- this.tables.add(table);
- }
- }
-
- /**
- * Representation of a full Hive snapshot. A snapshot is collection of databases
- */
- private static class HiveSnapshot {
- List<HiveDb> databases = new ArrayList<>();
-
- HiveSnapshot() {
- }
-
- HiveSnapshot(Collection<HiveDb> dblist) {
- if (dblist != null) {
- databases.addAll(dblist);
- }
- }
-
- HiveSnapshot add(HiveDb db) {
- this.databases.add(db);
- return this;
- }
- }
-
- /**
- * Convert Hive snapshot to mock client that will return proper values
- * for the snapshot.
- */
- private static class MockClient {
- HiveMetaStoreClient client;
-
- MockClient(HiveSnapshot snapshot) throws TException {
- client = Mockito.mock(HiveMetaStoreClient.class);
- List<String> dbNames = new ArrayList<>(snapshot.databases.size());
- // Walk over all databases and mock appropriate objects
- for (HiveDb mdb: snapshot.databases) {
- String dbName = mdb.name;
- dbNames.add(dbName);
- Database db = makeDb(dbName);
- Mockito.when(client.getDatabase(dbName)).thenReturn(db);
- List<String> tableNames = new ArrayList<>(mdb.tables.size());
- // Walk over all tables for the database and mock appropriate objects
- for (HiveTable table: mdb.tables) {
- String tableName = table.name;
- tableNames.add(tableName);
- Table mockTable = makeTable(dbName, tableName);
- Mockito.when(client.getTableObjectsByName(dbName,
- Lists.newArrayList(tableName)))
- .thenReturn(Lists.newArrayList(mockTable));
- Mockito.when(client.listPartitionNames(dbName, tableName, (short) -1))
- .thenReturn(table.partitions);
- // Walk across all partitions and mock appropriate objects
- for (String partName: table.partitions) {
- Partition p = makePartition(dbName, tableName, partName);
- Mockito.when(client.getPartitionsByNames(dbName, tableName,
- Lists.<String>newArrayList(partName)))
- .thenReturn(Lists.<Partition>newArrayList(p));
- }
- }
- Mockito.when(client.getAllTables(dbName)).thenReturn(tableNames);
- }
- // Return all database names
- Mockito.when(client.getAllDatabases()).thenReturn(dbNames);
- }
- }
-
- /**
- * Create mock database with the given name
- * @param name Database name
- * @return Mock database object
- */
- private static Database makeDb(String name) {
- Database db = Mockito.mock(Database.class);
- Mockito.when(db.getName()).thenReturn(name);
- Mockito.when(db.getLocationUri()).thenReturn("hdfs:///" + name);
- return db;
- }
-
- /**
- * Create mock table
- * @param dbName db for this table
- * @param tableName name of the table
- * @return mock table object
- */
- private static Table makeTable(String dbName, String tableName) {
- Table table = Mockito.mock(Table.class);
- Mockito.when(table.getDbName()).thenReturn(dbName);
- Mockito.when(table.getTableName()).thenReturn(tableName);
- StorageDescriptor sd = Mockito.mock(StorageDescriptor.class);
- Mockito.when(sd.getLocation()).thenReturn(
- String.format("hdfs:///%s/%s", dbName, tableName));
- Mockito.when(table.getSd()).thenReturn(sd);
- return table;
- }
-
- /**
- * Create mock partition
- * @param dbName database for this partition
- * @param tableName table for this partition
- * @param partName partition name
- * @return mock partition object
- */
- private static Partition makePartition(String dbName, String tableName, String partName) {
- Partition partition = Mockito.mock(Partition.class);
- StorageDescriptor sd = Mockito.mock(StorageDescriptor.class);
- Mockito.when(sd.getLocation()).thenReturn(
- String.format("hdfs:///%s/%s/%s", dbName, tableName, partName));
- Mockito.when(partition.getSd()).thenReturn(sd);
- return partition;
- }
-
- @Test
- // Test basic operation with small database
- public void testSimple() throws Exception {
- HiveTable tab21 = new HiveTable("tab21");
- HiveTable tab31 = new HiveTable("tab31").add("part311").add("part312");
- HiveDb db3 = new HiveDb("db3", Lists.newArrayList(tab31));
- HiveDb db2 = new HiveDb("db2", Lists.newArrayList(tab21));
- HiveDb db1 = new HiveDb("db1");
- HiveSnapshot snap = new HiveSnapshot().add(db1).add(db2).add(db3);
- MockClient c = new MockClient(snap);
-
- Map<String, Set<String>> update;
- try(FullUpdateInitializer cacheInitializer = new FullUpdateInitializer(c.client, conf)) {
- update = cacheInitializer.getFullHMSSnapshot();
- }
- Assert.assertEquals(5, update.size());
- Assert.assertEquals(Sets.newHashSet("db1"), update.get("db1"));
- Assert.assertEquals(Sets.newHashSet("db2"), update.get("db2"));
- Assert.assertEquals(Sets.newHashSet("db3"), update.get("db3"));
- Assert.assertEquals(Sets.newHashSet("db2/tab21"), update.get("db2.tab21"));
- Assert.assertEquals(Sets.newHashSet("db3/tab31",
- "db3/tab31/part311", "db3/tab31/part312"), update.get("db3.tab31"));
- }
-
- @Test
- // Test that invalid paths are handled correctly
- public void testInvalidPaths() throws Exception {
- //Set up mocks: db1.tb1, with tb1 returning a wrong dbname (db2)
- Database db1 = makeDb("db1");
-
- Table tab1 = Mockito.mock(Table.class);
- //Return a wrong db name, so that this triggers an exception
- Mockito.when(tab1.getDbName()).thenReturn("db2");
- Mockito.when(tab1.getTableName()).thenReturn("tab1");
-
- HiveMetaStoreClient client = Mockito.mock(HiveMetaStoreClient.class);
- Mockito.when(client.getAllDatabases()).thenReturn(Lists.newArrayList("db1"));
- Mockito.when(client.getDatabase("db1")).thenReturn(db1);
-
- Table tab12 = Mockito.mock(Table.class);
- Mockito.when(tab12.getDbName()).thenReturn("db1");
- Mockito.when(tab12.getTableName()).thenReturn("tab21");
- StorageDescriptor sd21 = Mockito.mock(StorageDescriptor.class);
- Mockito.when(sd21.getLocation()).thenReturn("hdfs:///db1/tab21");
- Mockito.when(tab12.getSd()).thenReturn(sd21);
-
- Mockito.when(client.getTableObjectsByName("db1",
- Lists.newArrayList("tab1"))).thenReturn(Lists.newArrayList(tab1));
- Mockito.when(client.getTableObjectsByName("db1",
- Lists.newArrayList("tab12"))).thenReturn(Lists.newArrayList(tab12));
- Mockito.when(client.getAllTables("db1")).
- thenReturn(Lists.newArrayList("tab1", "tab12"));
-
-
- Map<String, Set<String>> update;
- try(FullUpdateInitializer cacheInitializer = new FullUpdateInitializer(client, conf)) {
- update = cacheInitializer.getFullHMSSnapshot();
- }
- Assert.assertEquals(2, update.size());
- Assert.assertEquals(Sets.newHashSet("db1"), update.get("db1"));
- Assert.assertEquals(Sets.newHashSet("db1/tab21"), update.get("db1.tab21"));
- }
-
- @Test
- // Test handling of a big tables and partitions
- public void testBig() throws Exception {
- int ndbs = 3;
- int ntables = 51;
- int nparts = 131;
-
- HiveSnapshot snap = new HiveSnapshot();
-
- for (int i = 0; i < ndbs; i++) {
- HiveDb db = new HiveDb("db" + i);
- for (int j = 0; j < ntables; j++) {
- HiveTable table = new HiveTable("table" + i + j);
- for (int k = 0; k < nparts; k++) {
- table.add("part" + i + j + k);
- }
- db.add(table);
- }
- snap.add(db);
- }
- MockClient c = new MockClient(snap);
- Map<String, Set<String>> update;
- try(FullUpdateInitializer cacheInitializer = new FullUpdateInitializer(c.client, conf)) {
- update = cacheInitializer.getFullHMSSnapshot();
- }
- Assert.assertEquals((ntables * ndbs) + ndbs, update.size());
- for (int i = 0; i < ndbs; i++) {
- String dbName = "db" + i;
- Assert.assertEquals(Sets.newHashSet(dbName), update.get(dbName));
-
- for (int j = 0; j < ntables; j++) {
- String tableName = "table" + i + j;
- Set<String> values = new HashSet<>();
- values.add(String.format("%s/%s", dbName, tableName));
- for (int k = 0; k < nparts; k++) {
- String partName = "part" + i + j + k;
- values.add(String.format("%s/%s/%s", dbName, tableName, partName));
- }
- String authz = dbName + "." + tableName;
- Assert.assertEquals(values, update.get(authz));
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/747c2260/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java
new file mode 100644
index 0000000..1490581
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java
@@ -0,0 +1,492 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.service.thrift;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.sentry.hdfs.PathsUpdate;
+import org.apache.sentry.hdfs.SentryMalformedPathException;
+import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Manage fetching full snapshot from HMS.
+ * Snapshot is represented as a map from the hive object name to
+ * the set of paths for this object.
+ * The hive object name is either the Hive database name or
+ * Hive database name joined with Hive table name as {@code dbName.tableName}.
+ * All table partitions are stored under the table object.
+ * <p>
+ * Once {@link FullUpdateInitializer}, the {@link FullUpdateInitializer#getFullHMSSnapshot()}
+ * method should be called to get the initial update.
+ * <p>
+ * It is important to close the {@link FullUpdateInitializer} object to prevent resource
+ * leaks.
+ * <p>
+ * The usual way of using {@link FullUpdateInitializer} is
+ * <pre>
+ * {@code
+ * try (FullUpdateInitializer updateInitializer =
+ * new FullUpdateInitializer(clientFactory, authzConf)) {
+ * Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot();
+ * return pathsUpdate;
+ * }
+ */
+public final class FullUpdateInitializer implements AutoCloseable {
+
+ /*
+ * Implementation note.
+ *
+ * The snapshot is obtained using an executor. We follow the map/reduce model.
+ * Each executor thread (mapper) obtains and returns a partial snapshot which are then
+ * reduced to a single combined snapshot by getFullHMSSnapshot().
+ *
+ * Synchronization between the getFullHMSSnapshot() and executors is done using the
+ * 'results' queue. The queue holds the futures for each scheduled task.
+ * It is initially populated by getFullHMSSnapshot and each task may add new future
+ * results to it. Only getFullHMSSnapshot() removes entries from the results queue.
+ * This guarantees that once the results queue is empty there are no pending jobs.
+ *
+ * Since there are no other data sharing, the implementation is safe without
+ * any other synchronization. It is not thread-safe for concurrent calls
+ * to getFullHMSSnapshot().
+ *
+ */
+
+ private final ExecutorService threadPool;
+ private final int maxPartitionsPerCall;
+ private final int maxTablesPerCall;
+ private final Deque<Future<CallResult>> results = new ConcurrentLinkedDeque<>();
+ private final int maxRetries;
+ private final int waitDurationMillis;
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(FullUpdateInitializer.class);
+
+ private static final ObjectMapping emptyObjectMapping =
+ new ObjectMapping(Collections.<String, Set<String>>emptyMap());
+ private final HiveConnectionFactory clientFactory;
+
+ /**
+ * Extract path (not starting with "/") from the full URI
+ * @param uri - resource URI (usually with scheme)
+ * @return path if uri is valid or null
+ */
+ private static String pathFromURI(String uri) {
+ try {
+ return PathsUpdate.parsePath(uri);
+ } catch (SentryMalformedPathException e) {
+ LOGGER.warn(String.format("Ignoring invalid uri %s: %s",
+ uri, e.getReason()));
+ return null;
+ }
+ }
+
+ /**
+ * Mapping of object to set of paths.
+ * Used to represent partial results from executor threads. Multiple
+ * ObjectMapping objects are combined in a single mapping
+ * to get the final result.
+ */
+ private static final class ObjectMapping {
+ private final Map<String, Set<String>> objects;
+
+ ObjectMapping(Map<String, Set<String>> objects) {
+ this.objects = objects;
+ }
+
+ ObjectMapping(String authObject, String path) {
+ Set<String> values = Collections.singleton(safeIntern(path));
+ objects = ImmutableMap.of(authObject, values);
+ }
+
+ ObjectMapping(String authObject, Collection<String> paths) {
+ Set<String> values = new HashSet<>(paths);
+ objects = ImmutableMap.of(authObject, values);
+ }
+
+ Map<String, Set<String>> getObjects() {
+ return objects;
+ }
+ }
+
+ private static final class CallResult {
+ private final Exception failure;
+ private final boolean successStatus;
+ private final ObjectMapping objectMapping;
+
+ CallResult(Exception ex) {
+ failure = ex;
+ successStatus = false;
+ objectMapping = emptyObjectMapping;
+ }
+
+ CallResult(ObjectMapping objectMapping) {
+ failure = null;
+ successStatus = true;
+ this.objectMapping = objectMapping;
+ }
+
+ boolean success() {
+ return successStatus;
+ }
+
+ ObjectMapping getObjectMapping() {
+ return objectMapping;
+ }
+
+ public Exception getFailure() {
+ return failure;
+ }
+ }
+
+ private abstract class BaseTask implements Callable<CallResult> {
+
+ /**
+ * Class represents retry strategy for BaseTask.
+ */
+ private final class RetryStrategy {
+ private int retryStrategyMaxRetries = 0;
+ private final int retryStrategyWaitDurationMillis;
+
+ private RetryStrategy(int retryStrategyMaxRetries, int retryStrategyWaitDurationMillis) {
+ this.retryStrategyMaxRetries = retryStrategyMaxRetries;
+
+ // Assign default wait duration if negative value is provided.
+ this.retryStrategyWaitDurationMillis = (retryStrategyWaitDurationMillis > 0) ?
+ retryStrategyWaitDurationMillis : 1000;
+ }
+
+ @SuppressWarnings({"squid:S1141", "squid:S2142"})
+ public CallResult exec() {
+ // Retry logic is happening inside callable/task to avoid
+ // synchronous waiting on getting the result.
+ // Retry the failure task until reach the max retry number.
+ // Wait configurable duration for next retry.
+ //
+ // Only thrift exceptions are retried.
+ // Other exceptions are propagated up the stack.
+ Exception exception = null;
+ try {
+ // We catch all exceptions except Thrift exceptions which are retried
+ for (int i = 0; i < retryStrategyMaxRetries; i++) {
+ //noinspection NestedTryStatement
+ try {
+ return new CallResult(doTask());
+ } catch (TException ex) {
+ LOGGER.debug("Failed to execute task on " + (i + 1) + " attempts." +
+ " Sleeping for " + retryStrategyWaitDurationMillis + " ms. Exception: " +
+ ex.toString(), ex);
+ exception = ex;
+
+ try {
+ Thread.sleep(retryStrategyWaitDurationMillis);
+ } catch (InterruptedException ignored) {
+ // Skip the rest retries if get InterruptedException.
+ // And set the corresponding retries number.
+ LOGGER.warn("Interrupted during update fetch during iteration " + (i + 1));
+ break;
+ }
+ }
+ }
+ } catch (Exception ex) {
+ exception = ex;
+ }
+ LOGGER.error("Failed to execute task", exception);
+ // We will fail in the end, so we are shutting down the pool to prevent
+ // new tasks from being scheduled.
+ threadPool.shutdown();
+ return new CallResult(exception);
+ }
+ }
+
+ private final RetryStrategy retryStrategy;
+
+ BaseTask() {
+ retryStrategy = new RetryStrategy(maxRetries, waitDurationMillis);
+ }
+
+ @Override
+ public CallResult call() throws Exception {
+ return retryStrategy.exec();
+ }
+
+ abstract ObjectMapping doTask() throws Exception;
+ }
+
+ private class PartitionTask extends BaseTask {
+ private final String dbName;
+ private final String tblName;
+ private final String authName;
+ private final List<String> partNames;
+
+ PartitionTask(String dbName, String tblName, String authName,
+ List<String> partNames) {
+ this.dbName = safeIntern(dbName);
+ this.tblName = safeIntern(tblName);
+ this.authName = safeIntern(authName);
+ this.partNames = partNames;
+ }
+
+ @Override
+ ObjectMapping doTask() throws Exception {
+ List<Partition> tblParts;
+ HMSClient c = null;
+ try (HMSClient client = clientFactory.connect()) {
+ c = client;
+ tblParts = client.getClient().getPartitionsByNames(dbName, tblName, partNames);
+ } catch (Exception e) {
+ if (c != null) {
+ c.invalidate();
+ }
+ throw e;
+ }
+
+ LOGGER.debug("Fetched partitions for db = {}, table = {}",
+ dbName, tblName);
+
+ Collection<String> partitionNames = new ArrayList<>(tblParts.size());
+ for (Partition part : tblParts) {
+ String partPath = pathFromURI(part.getSd().getLocation());
+ if (partPath != null) {
+ partitionNames.add(partPath.intern());
+ }
+ }
+ return new ObjectMapping(authName, partitionNames);
+ }
+ }
+
+ private class TableTask extends BaseTask {
+ private final String dbName;
+ private final List<String> tableNames;
+
+ TableTask(Database db, List<String> tableNames) {
+ dbName = safeIntern(db.getName());
+ this.tableNames = tableNames;
+ }
+
+ @Override
+ @SuppressWarnings({"squid:S2629", "squid:S135"})
+ ObjectMapping doTask() throws Exception {
+ HMSClient c = null;
+ try (HMSClient client = clientFactory.connect()) {
+ c = client;
+ List<Table> tables = client.getClient().getTableObjectsByName(dbName, tableNames);
+
+ LOGGER.debug("Fetching tables for db = {}, tables = {}", dbName, tableNames);
+
+ Map<String, Set<String>> objectMapping = new HashMap<>(tables.size());
+ for (Table tbl : tables) {
+ // Table names are case insensitive
+ if (!tbl.getDbName().equalsIgnoreCase(dbName)) {
+ // Inconsistency in HMS data
+ LOGGER.warn(String.format("DB name %s for table %s does not match %s",
+ tbl.getDbName(), tbl.getTableName(), dbName));
+ continue;
+ }
+
+ String tableName = safeIntern(tbl.getTableName().toLowerCase());
+ String authzObject = (dbName + "." + tableName).intern();
+ List<String> tblPartNames = client.getClient().listPartitionNames(dbName, tableName, (short) -1);
+ for (int i = 0; i < tblPartNames.size(); i += maxPartitionsPerCall) {
+ List<String> partsToFetch = tblPartNames.subList(i,
+ Math.min(i + maxPartitionsPerCall, tblPartNames.size()));
+ Callable<CallResult> partTask = new PartitionTask(dbName,
+ tableName, authzObject, partsToFetch);
+ results.add(threadPool.submit(partTask));
+ }
+ String tblPath = safeIntern(pathFromURI(tbl.getSd().getLocation()));
+ if (tblPath == null) {
+ continue;
+ }
+ Set<String> paths = objectMapping.get(authzObject);
+ if (paths == null) {
+ paths = new HashSet<>(1);
+ objectMapping.put(authzObject, paths);
+ }
+ paths.add(tblPath);
+ }
+ return new ObjectMapping(Collections.unmodifiableMap(objectMapping));
+ } catch (Exception e) {
+ if (c != null) {
+ c.invalidate();
+ }
+ throw e;
+ }
+ }
+ }
+
+ private class DbTask extends BaseTask {
+
+ private final String dbName;
+
+ DbTask(String dbName) {
+ //Database names are case insensitive
+ this.dbName = safeIntern(dbName.toLowerCase());
+ }
+
+ @Override
+ ObjectMapping doTask() throws Exception {
+ HMSClient c = null;
+ try (HMSClient client = clientFactory.connect()) {
+ c = client;
+ Database db = client.getClient().getDatabase(dbName);
+ if (!dbName.equalsIgnoreCase(db.getName())) {
+ LOGGER.warn("Database name {} does not match {}", db.getName(), dbName);
+ return emptyObjectMapping;
+ }
+ List<String> allTblStr = client.getClient().getAllTables(dbName);
+ for (int i = 0; i < allTblStr.size(); i += maxTablesPerCall) {
+ List<String> tablesToFetch = allTblStr.subList(i,
+ Math.min(i + maxTablesPerCall, allTblStr.size()));
+ Callable<CallResult> tableTask = new TableTask(db, tablesToFetch);
+ results.add(threadPool.submit(tableTask));
+ }
+ String dbPath = safeIntern(pathFromURI(db.getLocationUri()));
+ return (dbPath != null) ? new ObjectMapping(dbName, dbPath) :
+ emptyObjectMapping;
+ } catch (Exception e) {
+ if (c != null) {
+ c.invalidate();
+ }
+ throw e;
+ }
+ }
+ }
+
+ FullUpdateInitializer(HiveConnectionFactory clientFactory, Configuration conf) {
+ this.clientFactory = clientFactory;
+ maxPartitionsPerCall = conf.getInt(
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC,
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC_DEFAULT);
+ maxTablesPerCall = conf.getInt(
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC,
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC_DEFAULT);
+ maxRetries = conf.getInt(
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM,
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM_DEFAULT);
+ waitDurationMillis = conf.getInt(
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS,
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS_DEFAULT);
+ threadPool = Executors.newFixedThreadPool(conf.getInt(
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS,
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS_DEFAULT));
+ }
+
+ /**
+ * Get Full HMS snapshot.
+ * @return Full snapshot of HMS objects.
+ * @throws TException if Thrift error occured
+ * @throws ExecutionException if there was a scheduling error
+ * @throws InterruptedException if processing was interrupted
+ */
+ @SuppressWarnings("squid:S00112")
+ Map<String, Set<String>> getFullHMSSnapshot() throws Exception {
+ // Get list of all HMS databases
+ List<String> allDbStr;
+ HMSClient c = null;
+ try (HMSClient client = clientFactory.connect()) {
+ c = client;
+ allDbStr = client.getClient().getAllDatabases();
+ } catch (Exception e) {
+ if (c != null) {
+ c.invalidate();
+ }
+ throw e;
+ }
+
+ // Schedule async task for each database responsible for fetching per-database
+ // objects.
+ for (String dbName : allDbStr) {
+ results.add(threadPool.submit(new DbTask(dbName)));
+ }
+
+ // Resulting full snapshot
+ Map<String, Set<String>> fullSnapshot = new HashMap<>();
+
+ // As async tasks complete, merge their results into full snapshot.
+ while (!results.isEmpty()) {
+ // This is the only thread that takes elements off the results list - all other threads
+ // only add to it. Once the list is empty it can't become non-empty
+ // This means that if we check that results is non-empty we can safely call pop() and
+ // know that the result of poll() is not null.
+ Future<CallResult> result = results.pop();
+ // Wait for the task to complete
+ CallResult callResult = result.get();
+ // Fail if we got errors
+ if (!callResult.success()) {
+ throw callResult.getFailure();
+ }
+ // Merge values into fullUpdate
+ Map<String, Set<String>> objectMapping =
+ callResult.getObjectMapping().getObjects();
+ for (Map.Entry<String, Set<String>> entry: objectMapping.entrySet()) {
+ String key = entry.getKey();
+ Set<String> val = entry.getValue();
+ Set<String> existingSet = fullSnapshot.get(key);
+ if (existingSet == null) {
+ fullSnapshot.put(key, val);
+ continue;
+ }
+ existingSet.addAll(val);
+ }
+ }
+ return fullSnapshot;
+ }
+
+ @Override
+ public void close() {
+ threadPool.shutdownNow();
+ try {
+ threadPool.awaitTermination(1, TimeUnit.SECONDS);
+ } catch (InterruptedException ignored) {
+ LOGGER.warn("Interrupted shutdown");
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Intern a string but only if it is not null
+ * @param arg String to be interned, may be null
+ * @return interned string or null
+ */
+ static String safeIntern(String arg) {
+ return (arg != null) ? arg.intern() : null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/747c2260/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java
new file mode 100644
index 0000000..86ff47e
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+
+/**
+ * AutoCloseable wrapper around HiveMetaStoreClient.
+ * It is only used to provide try-with-resource semantics for
+ * {@link HiveMetaStoreClient}.
+ */
+class HMSClient implements AutoCloseable {
+ private final HiveMetaStoreClient client;
+ private boolean valid;
+
+ HMSClient(HiveMetaStoreClient client) {
+ this.client = Preconditions.checkNotNull(client);
+ valid = true;
+ }
+
+ public HiveMetaStoreClient getClient() {
+ return client;
+ }
+
+ public void invalidate() {
+ if (valid) {
+ client.close();
+ valid = false;
+ }
+ }
+
+ @Override
+ public void close() {
+ if (valid) {
+ client.close();
+ valid = false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/747c2260/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
index 1f7eb18..2d581f7 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
@@ -18,7 +18,6 @@
package org.apache.sentry.service.thrift;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -27,17 +26,12 @@ import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SaslRpcServer;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.hcatalog.messaging.HCatEventMessage;
import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
import org.apache.sentry.core.common.exception.SentryInvalidHMSEventException;
import org.apache.sentry.core.common.exception.SentryInvalidInputException;
import org.apache.sentry.core.common.exception.SentryNoSuchObjectException;
import org.apache.sentry.hdfs.PermissionsUpdate;
-import org.apache.sentry.hdfs.FullUpdateInitializer;
import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges;
import org.apache.sentry.provider.db.SentryPolicyStorePlugin;
import org.apache.sentry.provider.db.service.persistent.SentryStore;
@@ -50,10 +44,8 @@ import org.apache.sentry.binding.metastore.messaging.json.*;
import javax.jdo.JDODataStoreException;
import javax.security.auth.login.LoginException;
-import java.io.File;
import java.io.IOException;
import java.net.SocketException;
-import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -73,32 +65,33 @@ import static org.apache.sentry.hdfs.Updateable.Update;
@SuppressWarnings("PMD")
public class HMSFollower implements Runnable, AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(HMSFollower.class);
+ private HiveSimpleConnectionFactory hiveConnectionFactory;
// Track the latest eventId of the event that has been logged. So we don't log the same message
private long lastLoggedEventId = SentryStore.EMPTY_CHANGE_ID;
private static boolean connectedToHMS = false;
- private HiveMetaStoreClient client;
- private SentryKerberosContext kerberosContext;
+ private HMSClient client;
private final Configuration authzConf;
- private boolean kerberos;
private final SentryStore sentryStore;
private String hiveInstance;
private boolean needLogHMSSupportReady = true;
private final LeaderStatusMonitor leaderMonitor;
- HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor) {
- LOGGER.info("HMSFollower is being initialized");
+ HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor,
+ HiveSimpleConnectionFactory hiveConnectionFactory) {
authzConf = conf;
this.leaderMonitor = leaderMonitor;
sentryStore = store;
+ this.hiveConnectionFactory = hiveConnectionFactory;
}
@VisibleForTesting
- HMSFollower(Configuration conf, SentryStore sentryStore, String hiveInstance) {
- this.authzConf = conf;
- this.sentryStore = sentryStore;
+ HMSFollower(Configuration conf, SentryStore sentryStore, String hiveInstance)
+ throws IOException, LoginException {
+ this(conf, sentryStore, null, null);
this.hiveInstance = hiveInstance;
- this.leaderMonitor = null;
+ hiveConnectionFactory = new HiveSimpleConnectionFactory(conf, new HiveConf());
+ hiveConnectionFactory.init();
}
@VisibleForTesting
@@ -110,6 +103,11 @@ public class HMSFollower implements Runnable, AutoCloseable {
public void close() {
// Close any outstanding connections to HMS
closeHMSConnection();
+ try {
+ hiveConnectionFactory.close();
+ } catch (Exception e) {
+ LOGGER.error("failed to close Hive Connection Factory", e);
+ }
}
/**
@@ -117,77 +115,13 @@ public class HMSFollower implements Runnable, AutoCloseable {
* Throws @LoginException if Kerberos context creation failed using Sentry's kerberos credentials
* Throws @MetaException if there was a problem on creating an HMSClient
*/
- private HiveMetaStoreClient getMetaStoreClient(Configuration conf)
- throws IOException, InterruptedException, LoginException, MetaException {
- if (client != null) {
- return client;
- }
-
- final HiveConf hiveConf = new HiveConf();
- hiveInstance = hiveConf.get(HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME.getVar());
-
- String principal, keytab;
-
- //TODO: Is this the right(standard) way to create a HMS client? HiveMetastoreClientFactoryImpl?
- //TODO: Check if HMS is using kerberos instead of relying on Sentry conf
- kerberos = ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase(
- conf.get(ServiceConstants.ServerConfig.SECURITY_MODE, ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS).trim());
- if (kerberos) {
- LOGGER.info("Making a kerberos connection to HMS");
- try {
- int port = conf.getInt(ServiceConstants.ServerConfig.RPC_PORT, ServiceConstants.ServerConfig.RPC_PORT_DEFAULT);
- String rawPrincipal = Preconditions.checkNotNull(conf.get(ServiceConstants.ServerConfig.PRINCIPAL),
- ServiceConstants.ServerConfig.PRINCIPAL + " is required");
- principal = SecurityUtil.getServerPrincipal(rawPrincipal, NetUtils.createSocketAddr(
- conf.get(ServiceConstants.ServerConfig.RPC_ADDRESS, ServiceConstants.ServerConfig.RPC_ADDRESS_DEFAULT), port).getAddress());
- } catch (IOException io) {
- throw new RuntimeException("Can't translate kerberos principal'", io);
- }
-
- LOGGER.info("Using kerberos principal: " + principal);
- final String[] principalParts = SaslRpcServer.splitKerberosName(principal);
- Preconditions.checkArgument(principalParts.length == 3,
- "Kerberos principal should have 3 parts: " + principal);
-
- keytab = Preconditions.checkNotNull(conf.get(ServiceConstants.ServerConfig.KEY_TAB),
- ServiceConstants.ServerConfig.KEY_TAB + " is required");
- File keytabFile = new File(keytab);
- Preconditions.checkState(keytabFile.isFile() && keytabFile.canRead(),
- "Keytab " + keytab + " does not exist or is not readable.");
-
- try {
- // Instantiating SentryKerberosContext in non-server mode handles the ticket renewal.
- kerberosContext = new SentryKerberosContext(principal, keytab, false);
-
- UserGroupInformation.setConfiguration(hiveConf);
- UserGroupInformation clientUGI = UserGroupInformation.getUGIFromSubject(kerberosContext.getSubject());
-
- // HiveMetaStoreClient handles the connection retry logic to HMS and can be configured using properties:
- // hive.metastore.connect.retries, hive.metastore.client.connect.retry.delay
- client = clientUGI.doAs(new PrivilegedExceptionAction<HiveMetaStoreClient>() {
- @Override
- public HiveMetaStoreClient run() throws Exception {
- return new HiveMetaStoreClient(hiveConf);
- }
- });
- LOGGER.info("Secure connection established with HMS");
- } catch (LoginException e) {
- // Kerberos login failed
- LOGGER.error("Failed to setup kerberos context.");
- throw e;
- } finally {
- // Shutdown kerberos context if HMS connection failed to setup to avoid thread leaks.
- if ((kerberosContext != null) && (client == null)) {
- kerberosContext.shutDown();
- kerberosContext = null;
- }
- }
- } else {
- //This is only for testing purposes. Sentry strongly recommends strong authentication
- client = new HiveMetaStoreClient(hiveConf);
- LOGGER.info("Established non-secure connection with HMS");
+ private HiveMetaStoreClient getMetaStoreClient()
+ throws IOException, InterruptedException, MetaException {
+ if (client == null) {
+ client = hiveConnectionFactory.connect();
+ connectedToHMS = true;
}
- return client;
+ return client.getClient();
}
@Override
@@ -209,7 +143,7 @@ public class HMSFollower implements Runnable, AutoCloseable {
closeHMSConnection();
return;
}
- processHiveMetastoreUpdates(lastProcessedNotificationID);
+ processHiveMetastoreUpdates();
}
/**
@@ -236,26 +170,11 @@ public class HMSFollower implements Runnable, AutoCloseable {
*
* Clients connections waiting for an event notification will be woken up afterwards.
*/
- private void processHiveMetastoreUpdates(Long lastProcessedNotificationID) {
- if (client == null) {
- try {
- client = getMetaStoreClient(authzConf);
- if (client == null) {
- //TODO: Do we want to throw an exception after a certain timeout?
- return;
- } else {
- connectedToHMS = true;
- LOGGER.info("HMSFollower of Sentry successfully connected to HMS");
- }
- } catch (Throwable e) {
- LOGGER.error("HMSFollower cannot connect to HMS!!", e);
- return;
- }
- }
-
+ private void processHiveMetastoreUpdates() {
try {
// Decision of taking full snapshot is based on AuthzPathsMapping information persisted
// in the sentry persistent store. If AuthzPathsMapping is empty, shapshot is needed.
+ Long lastProcessedNotificationID;
if (sentryStore.isAuthzPathsMappingEmpty()) {
// TODO: expose time used for full update in the metrics
@@ -270,27 +189,26 @@ public class HMSFollower implements Runnable, AutoCloseable {
// will be dropped. A new attempts will be made after 500 milliseconds when
// HMSFollower run again.
- Map<String, Set<String>> pathsFullSnapshot;
- CurrentNotificationEventId eventIDBefore = client.getCurrentNotificationEventId();
- LOGGER.info(String.format("Before fetching hive full snapshot, Current NotificationID = %s.", eventIDBefore));
+ CurrentNotificationEventId eventIDBefore = getMetaStoreClient().getCurrentNotificationEventId();
+ LOGGER.info("Before fetching hive full snapshot, Current NotificationID = {}", eventIDBefore);
- pathsFullSnapshot = fetchFullUpdate();
+ Map<String, Set<String>> pathsFullSnapshot = fetchFullUpdate();
if(pathsFullSnapshot.isEmpty()) {
LOGGER.info("Hive full snapshot is Empty. Perhaps, HMS does not have any data");
return;
}
- CurrentNotificationEventId eventIDAfter = client.getCurrentNotificationEventId();
- LOGGER.info(String.format("After fetching hive full snapshot, Current NotificationID = %s.", eventIDAfter));
+ CurrentNotificationEventId eventIDAfter = getMetaStoreClient().getCurrentNotificationEventId();
+ LOGGER.info("After fetching hive full snapshot, Current NotificationID = {}", eventIDAfter);
if (!eventIDBefore.equals(eventIDAfter)) {
- LOGGER.error("#### Fail to get a point-in-time hive full snapshot !! Current NotificationID = " +
- eventIDAfter.toString());
+ LOGGER.error("Fail to get a point-in-time hive full snapshot. Current ID = {}",
+ eventIDAfter);
return;
}
- LOGGER.info(String.format("Successfully fetched hive full snapshot, Current NotificationID = %s.",
- eventIDAfter));
+ LOGGER.info("Successfully fetched hive full snapshot, Current NotificationID = {}",
+ eventIDAfter);
// As eventIDAfter is the last event that was processed, eventIDAfter is used to update
// lastProcessedNotificationID instead of getting it from persistent store.
lastProcessedNotificationID = eventIDAfter.getEventId();
@@ -314,18 +232,18 @@ public class HMSFollower implements Runnable, AutoCloseable {
// HIVE-15761: Currently getNextNotification API may return an empty
// NotificationEventResponse causing TProtocolException.
// Workaround: Only processes the notification events newer than the last updated one.
- CurrentNotificationEventId eventId = client.getCurrentNotificationEventId();
+ CurrentNotificationEventId eventId = getMetaStoreClient().getCurrentNotificationEventId();
LOGGER.debug("Last Notification in HMS {} lastProcessedNotificationID is {}",
eventId.getEventId(), lastProcessedNotificationID);
if (eventId.getEventId() > lastProcessedNotificationID) {
NotificationEventResponse response =
- client.getNextNotification(lastProcessedNotificationID, Integer.MAX_VALUE, null);
+ getMetaStoreClient().getNextNotification(lastProcessedNotificationID, Integer.MAX_VALUE, null);
if (response.isSetEvents()) {
if (!response.getEvents().isEmpty()) {
if (lastProcessedNotificationID != lastLoggedEventId) {
// Only log when there are updates and the notification ID has changed.
- LOGGER.debug(String.format("lastProcessedNotificationID = %s. Processing %s events",
- lastProcessedNotificationID, response.getEvents().size()));
+ LOGGER.debug("lastProcessedNotificationID = {}. Processing {} events",
+ lastProcessedNotificationID, response.getEvents().size());
lastLoggedEventId = lastProcessedNotificationID;
}
@@ -337,6 +255,7 @@ public class HMSFollower implements Runnable, AutoCloseable {
// If the underlying exception is around socket exception, it is better to retry connection to HMS
if (e.getCause() instanceof SocketException) {
LOGGER.error("Encountered Socket Exception during fetching Notification entries, will reconnect to HMS", e);
+ client.invalidate();
closeHMSConnection();
} else {
LOGGER.error("ThriftException occured fetching Notification entries, will try", e);
@@ -360,16 +279,10 @@ public class HMSFollower implements Runnable, AutoCloseable {
if (client != null) {
LOGGER.info("Closing the HMS client connection");
client.close();
+ connectedToHMS = false;
}
- if (kerberosContext != null) {
- LOGGER.info("Shutting down kerberos context associated with the HMS client connection");
- kerberosContext.shutDown();
- }
- } catch (LoginException le) {
- LOGGER.warn("Failed to stop kerberos context (potential to cause thread leak)", le);
} finally {
client = null;
- kerberosContext = null;
}
}
@@ -385,7 +298,8 @@ public class HMSFollower implements Runnable, AutoCloseable {
private Map<String, Set<String>> fetchFullUpdate()
throws TException, ExecutionException {
LOGGER.info("Request full HMS snapshot");
- try (FullUpdateInitializer updateInitializer = new FullUpdateInitializer(client, authzConf)) {
+ try (FullUpdateInitializer updateInitializer =
+ new FullUpdateInitializer(hiveConnectionFactory, authzConf)) {
Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot();
LOGGER.info("Obtained full HMS snapshot");
return pathsUpdate;
http://git-wip-us.apache.org/repos/asf/sentry/blob/747c2260/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java
new file mode 100644
index 0000000..62542c3
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import org.apache.hadoop.hive.metastore.api.MetaException;
+
+import java.io.IOException;
+
+public interface HiveConnectionFactory extends AutoCloseable {
+ /**
+ * Open a new connection to HMS.
+ *
+ * @return connection to HMS.
+ * @throws IOException if connection establishement failed.
+ * @throws InterruptedException if connection establishment was interrupted.
+ * @throws MetaException if connection establishement failed.
+ */
+ HMSClient connect() throws IOException, InterruptedException, MetaException;
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/747c2260/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java
new file mode 100644
index 0000000..3d67401
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import com.google.common.base.Preconditions;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import javax.security.auth.login.LoginException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Factory used to generate Hive connections.
+ * Supports insecure and Kerberos connections.
+ * For Kerberos connections starts the ticket renewal thread and sets
+ * up Kerberos credentials.
+ */
+public final class HiveSimpleConnectionFactory implements HiveConnectionFactory {
+ private static final Logger LOGGER = LoggerFactory.getLogger(HiveSimpleConnectionFactory.class);
+
+ /** Sentty configuration */
+ private final Configuration conf;
+ /** Hive configuration */
+ private final HiveConf hiveConf;
+ private final boolean insecure;
+ private SentryKerberosContext kerberosContext = null;
+
+ HiveSimpleConnectionFactory(Configuration sentryConf, HiveConf hiveConf) {
+ this.conf = sentryConf;
+ this.hiveConf = hiveConf;
+ insecure = !ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase(
+ sentryConf.get(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_NONE).trim());
+ }
+
+ /**
+ * Initialize the Factory.
+ * For insecure connections there is nothing to initialize.
+ * For Kerberos connections sets up ticket renewal thread.
+ * @throws IOException
+ * @throws LoginException
+ */
+ void init() throws IOException, LoginException {
+ if (insecure) {
+ LOGGER.info("Using insecure connection to HMS");
+ return;
+ }
+
+ int port = conf.getInt(ServerConfig.RPC_PORT, ServerConfig.RPC_PORT_DEFAULT);
+ String rawPrincipal = Preconditions.checkNotNull(conf.get(ServerConfig.PRINCIPAL),
+ "%s is required", ServerConfig.PRINCIPAL);
+ String principal = SecurityUtil.getServerPrincipal(rawPrincipal, NetUtils.createSocketAddr(
+ conf.get(ServerConfig.RPC_ADDRESS, ServerConfig.RPC_ADDRESS_DEFAULT),
+ port).getAddress());
+ LOGGER.debug("Opening kerberos connection to HMS using kerberos principal {}", principal);
+ String[] principalParts = SaslRpcServer.splitKerberosName(principal);
+ Preconditions.checkArgument(principalParts.length == 3,
+ "Kerberos principal %s should have 3 parts", principal);
+ String keytab = Preconditions.checkNotNull(conf.get(ServerConfig.KEY_TAB),
+ "Configuration is missing required %s paraeter", ServerConfig.KEY_TAB);
+ File keytabFile = new File(keytab);
+ Preconditions.checkState(keytabFile.isFile() && keytabFile.canRead(),
+ "Keytab %s does not exist or is not readable", keytab);
+ // Instantiating SentryKerberosContext in non-server mode handles the ticket renewal.
+ kerberosContext = new SentryKerberosContext(principal, keytab, false);
+ UserGroupInformation.setConfiguration(conf);
+ LOGGER.info("Using secure connection to HMS");
+ }
+
+ /**
+ * Connect to HMS in unsecure mode or in Kerberos mode according to config.
+ *
+ * @return HMS connection
+ * @throws IOException if could not establish connection
+ * @throws InterruptedException if connection was interrupted
+ * @throws MetaException if other errors happened
+ */
+ public HMSClient connect() throws IOException, InterruptedException, MetaException {
+ if (insecure) {
+ return new HMSClient(new HiveMetaStoreClient(hiveConf));
+ }
+ UserGroupInformation clientUGI =
+ UserGroupInformation.getUGIFromSubject(kerberosContext.getSubject());
+ return new HMSClient(clientUGI.doAs(new PrivilegedExceptionAction<HiveMetaStoreClient>() {
+ @Override
+ public HiveMetaStoreClient run() throws MetaException {
+ return new HiveMetaStoreClient(hiveConf);
+ }
+ }));
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (kerberosContext != null) {
+ kerberosContext.shutDown();
+ kerberosContext = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/747c2260/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryKerberosContext.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryKerberosContext.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryKerberosContext.java
index 8d78d1d..edb8006 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryKerberosContext.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryKerberosContext.java
@@ -40,9 +40,9 @@ public class SentryKerberosContext implements Runnable {
private LoginContext loginContext;
private Subject subject;
private final javax.security.auth.login.Configuration kerberosConfig;
- @Deprecated
+
private Thread renewerThread;
- @Deprecated
+
private boolean shutDownRenewer = false;
public SentryKerberosContext(String principal, String keyTab, boolean server)
@@ -113,7 +113,6 @@ public class SentryKerberosContext implements Runnable {
* Ticket renewer thread
* wait till 80% time interval left on the ticket and then renew it
*/
- @Deprecated
@Override
public void run() {
try {
@@ -145,7 +144,6 @@ public class SentryKerberosContext implements Runnable {
}
}
- @Deprecated
public void startRenewerThread() {
renewerThread = new Thread(this);
renewerThread.start();
http://git-wip-us.apache.org/repos/asf/sentry/blob/747c2260/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
index ec938da..322197b 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
@@ -40,6 +40,7 @@ import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
@@ -74,6 +75,7 @@ import static org.apache.sentry.core.common.utils.SigUtils.registerSigListener;
public class SentryService implements Callable, SigUtils.SigListener {
private static final Logger LOGGER = LoggerFactory.getLogger(SentryService.class);
+ private HiveSimpleConnectionFactory hiveConnectionFactory;
private enum Status {
NOT_STARTED,
@@ -276,7 +278,7 @@ public class SentryService implements Callable, SigUtils.SigListener {
thriftServer.serve();
}
- private void startHMSFollower(Configuration conf) throws Exception{
+ private void startHMSFollower(Configuration conf) throws Exception {
if (!hdfsSyncEnabled) {
LOGGER.info("HMS follower is not started because HDFS sync is disabled.");
return;
@@ -296,13 +298,11 @@ public class SentryService implements Callable, SigUtils.SigListener {
Preconditions.checkState(hmsFollower == null);
Preconditions.checkState(hmsFollowerExecutor == null);
+ Preconditions.checkState(hiveConnectionFactory == null);
- try {
- hmsFollower = new HMSFollower(conf, sentryStore, leaderMonitor);
- } catch (Exception ex) {
- LOGGER.error("Could not create HMSFollower", ex);
- throw ex;
- }
+ hiveConnectionFactory = new HiveSimpleConnectionFactory(conf, new HiveConf());
+ hiveConnectionFactory.init();
+ hmsFollower = new HMSFollower(conf, sentryStore, leaderMonitor, hiveConnectionFactory);
long initDelay = conf.getLong(ServerConfig.SENTRY_HMSFOLLOWER_INIT_DELAY_MILLS,
ServerConfig.SENTRY_HMSFOLLOWER_INIT_DELAY_MILLS_DEFAULT);
@@ -334,6 +334,7 @@ public class SentryService implements Callable, SigUtils.SigListener {
Preconditions.checkNotNull(hmsFollowerExecutor);
Preconditions.checkNotNull(hmsFollower);
+ Preconditions.checkNotNull(hiveConnectionFactory);
// use follower scheduling interval as timeout for shutting down its executor as
// such scheduling interval should be an upper bound of how long the task normally takes to finish
@@ -343,7 +344,13 @@ public class SentryService implements Callable, SigUtils.SigListener {
SentryServiceUtil.shutdownAndAwaitTermination(hmsFollowerExecutor, "hmsFollowerExecutor",
timeoutValue, TimeUnit.MILLISECONDS, LOGGER);
} finally {
+ try {
+ hiveConnectionFactory.close();
+ } catch (Exception e) {
+ LOGGER.error("Can't close HiveConnectionFactory", e);
+ }
hmsFollowerExecutor = null;
+ hiveConnectionFactory = null;
try {
// close connections
hmsFollower.close();