You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ak...@apache.org on 2017/04/05 17:56:01 UTC
sentry git commit: SENTRY-1676:
FullUpdateInitializer#createInitialUpdate should not throw RuntimeException
(Alex Kolbasov, reviewed by Sergio Pena)
Repository: sentry
Updated Branches:
refs/heads/sentry-ha-redesign f0f77668b -> ed2752258
SENTRY-1676: FullUpdateInitializer#createInitialUpdate should not throw RuntimeException (Alex Kolbasov, reviewed by Sergio Pena)
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/ed275225
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/ed275225
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/ed275225
Branch: refs/heads/sentry-ha-redesign
Commit: ed27522584773075a484a4828fe9b7aeb0922961
Parents: f0f7766
Author: Alexander Kolbasov <ak...@cloudera.com>
Authored: Wed Apr 5 10:55:36 2017 -0700
Committer: Alexander Kolbasov <ak...@cloudera.com>
Committed: Wed Apr 5 10:55:36 2017 -0700
----------------------------------------------------------------------
.../sentry/hdfs/FullUpdateInitializer.java | 71 ++++++++++----------
.../sentry/service/thrift/HMSFollower.java | 6 +-
2 files changed, 37 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sentry/blob/ed275225/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
index 146cea2..7be63ea 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
@@ -24,32 +24,30 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
import org.apache.sentry.hdfs.service.thrift.TPathChanges;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
/**
- * FullUpdateInitializer is for fetching hive full update,
- * the <hiveObj, paths> mapping. Multiple tasks will be
- * running concurrently, and throw exception or inform
- * sync incomplete paths update based on user configurable
- * after retry configurable times.
+ * Fetch full snapshot of {@code <hiveObj, paths>} mappings from Hive.
+ * Mappings for different tables are fetched concurrently by multiple threads from a pool.
*/
-public class FullUpdateInitializer implements Closeable {
+public final class FullUpdateInitializer implements AutoCloseable {
private final ExecutorService threadPool;
- private HiveMetaStoreClient client;
+ private final HiveMetaStoreClient client;
private final int maxPartitionsPerCall;
private final int maxTablesPerCall;
private final List<Future<CallResult>> results = new ArrayList<Future<CallResult>>();
@@ -60,16 +58,16 @@ public class FullUpdateInitializer implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(FullUpdateInitializer.class);
- final static class CallResult {
- final private Exception failure;
- final private boolean successStatus;
+ static final class CallResult {
+ private final Exception failure;
+ private final boolean successStatus;
CallResult(Exception ex, boolean successStatus) {
failure = ex;
this.successStatus = successStatus;
}
- public boolean getSuccessStatus() {
+ boolean success() {
return successStatus;
}
@@ -85,7 +83,7 @@ public class FullUpdateInitializer implements Closeable {
*/
private final class RetryStrategy {
private int retryStrategyMaxRetries = 0;
- private int retryStrategyWaitDurationMillis;
+ private final int retryStrategyWaitDurationMillis;
private int retries;
private Exception exception;
@@ -134,13 +132,13 @@ public class FullUpdateInitializer implements Closeable {
}
// Task fails, return the failure flag.
- LOGGER.error("Task did not complete successfully after " + retries
- + " tries. Exception got: " + exception.toString());
+ LOGGER.error("Task did not complete successfully after " + retries + 1
+ + " tries", exception);
return new CallResult(exception, false);
}
}
- private RetryStrategy retryStrategy;
+ private final RetryStrategy retryStrategy;
BaseTask() {
taskCounter.incrementAndGet();
@@ -258,8 +256,8 @@ public class FullUpdateInitializer implements Closeable {
Database db = client.getDatabase(dbName);
List<String> dbPath = PathsUpdate.parsePath(db.getLocationUri());
if (dbPath != null) {
+ Preconditions.checkArgument(dbName.equalsIgnoreCase(db.getName()));
synchronized (update) {
- Preconditions.checkArgument(dbName.equalsIgnoreCase(db.getName()));
update.newPathChange(dbName).addToAddPaths(dbPath);
}
}
@@ -280,27 +278,26 @@ public class FullUpdateInitializer implements Closeable {
public FullUpdateInitializer(HiveMetaStoreClient client, Configuration conf) {
this.client = client;
this.maxPartitionsPerCall = conf.getInt(
- ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC,
- ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC_DEFAULT);
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC,
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC_DEFAULT);
this.maxTablesPerCall = conf.getInt(
- ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC,
- ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC_DEFAULT);
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC,
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC_DEFAULT);
threadPool = Executors.newFixedThreadPool(conf.getInt(
- ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS,
- ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS_DEFAULT));
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS,
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS_DEFAULT));
maxRetries = conf.getInt(
- ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM,
- ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM_DEFAULT);
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM,
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM_DEFAULT);
waitDurationMillis = conf.getInt(
- ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS,
- ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS_DEFAULT);
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS,
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS_DEFAULT);
failOnRetry = conf.getBoolean(
- ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE,
- ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE_DEFAULT);
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE,
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE_DEFAULT);
}
- public Map<String, Set<String>> createInitialUpdate() throws ExecutionException,
- InterruptedException, TException {
+ public Map<String, Set<String>> createInitialUpdate() throws Exception {
PathsUpdate tempUpdate = new PathsUpdate(-1, false);
List<String> allDbStr = client.getAllDatabases();
for (String dbName : allDbStr) {
@@ -318,8 +315,8 @@ public class FullUpdateInitializer implements Closeable {
// Fail the HMS startup if tasks are not all successful and
// fail on partial updates flag is set in the config.
- if (!callResult.getSuccessStatus() && failOnRetry) {
- throw new RuntimeException(callResult.getFailure());
+ if (!callResult.success() && failOnRetry) {
+ throw callResult.getFailure();
}
}
@@ -356,7 +353,7 @@ public class FullUpdateInitializer implements Closeable {
}
@Override
- public void close() throws IOException {
+ public void close() {
if (threadPool != null) {
threadPool.shutdownNow();
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/ed275225/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
index 6c14f5e..16676fb 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
@@ -303,20 +303,20 @@ public class HMSFollower implements Runnable {
* @throws ExecutionException, InterruptedException, TException
*/
private Map<String, Set<String>> fetchFullUpdate()
- throws ExecutionException, InterruptedException, TException {
+ throws Exception {
FullUpdateInitializer updateInitializer = null;
try {
updateInitializer = new FullUpdateInitializer(client, authzConf);
Map<String, Set<String>> pathsUpdate = updateInitializer.createInitialUpdate();
- LOGGER.info("#### Hive full update initialization complete !!");
+ LOGGER.info("Obtained full snapshot from HMS");
return pathsUpdate;
} finally {
if (updateInitializer != null) {
try {
updateInitializer.close();
} catch (Exception e) {
- LOGGER.info("#### Exception while closing updateInitializer !!", e);
+ LOGGER.error("Exception while closing updateInitializer", e);
}
}
}