You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by va...@apache.org on 2017/06/14 00:57:26 UTC
[48/52] [abbrv] sentry git commit: SENTRY-1780: FullUpdateInitializer
does not kill the threads whenever getFullHMSSnapshot throws an exception
(Alex Kolbasov, reviewed by Na Li and Vamsee Yarlagadda)
SENTRY-1780: FullUpdateInitializer does not kill the threads whenever getFullHMSSnapshot throws an exception (Alex Kolbasov, reviewed by Na Li and Vamsee Yarlagadda)
Change-Id: I2711f0f8cbb1df2558242ca0c9eb3b70601760e6
Reviewed-on: http://gerrit.sjc.cloudera.com:8080/23480
Tested-by: Jenkins User
Reviewed-by: Alexander Kolbasov <ak...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/b085c37c
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/b085c37c
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/b085c37c
Branch: refs/for/cdh5-1.5.1_ha
Commit: b085c37c24855098a188ed4b2cfb06b1c4e7175d
Parents: 302d3d2
Author: Alexander Kolbasov <ak...@cloudera.com>
Authored: Mon Jun 5 23:31:41 2017 -0700
Committer: Alexander Kolbasov <ak...@cloudera.com>
Committed: Tue Jun 13 00:08:57 2017 -0700
----------------------------------------------------------------------
.../sentry/hdfs/FullUpdateInitializer.java | 80 ++++++++++----------
.../sentry/service/thrift/HMSFollower.java | 17 ++---
2 files changed, 49 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sentry/blob/b085c37c/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
index efd3fa3..cf9774c 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
@@ -18,7 +18,6 @@
package org.apache.sentry.hdfs;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Database;
@@ -147,11 +146,11 @@ public final class FullUpdateInitializer implements AutoCloseable {
}
private static final class CallResult {
- private final TException failure;
+ private final Exception failure;
private final boolean successStatus;
private final ObjectMapping objectMapping;
- CallResult(TException ex) {
+ CallResult(Exception ex) {
failure = ex;
successStatus = false;
objectMapping = emptyObjectMapping;
@@ -171,7 +170,7 @@ public final class FullUpdateInitializer implements AutoCloseable {
return objectMapping;
}
- public TException getFailure() {
+ public Exception getFailure() {
return failure;
}
}
@@ -184,53 +183,54 @@ public final class FullUpdateInitializer implements AutoCloseable {
private final class RetryStrategy {
private int retryStrategyMaxRetries = 0;
private final int retryStrategyWaitDurationMillis;
- private int retries;
private RetryStrategy(int retryStrategyMaxRetries, int retryStrategyWaitDurationMillis) {
this.retryStrategyMaxRetries = retryStrategyMaxRetries;
- retries = 0;
// Assign default wait duration if negative value is provided.
- if (retryStrategyWaitDurationMillis > 0) {
- this.retryStrategyWaitDurationMillis = retryStrategyWaitDurationMillis;
- } else {
- this.retryStrategyWaitDurationMillis = 1000;
- }
+ this.retryStrategyWaitDurationMillis = (retryStrategyWaitDurationMillis > 0) ?
+ retryStrategyWaitDurationMillis : 1000;
}
+ @SuppressWarnings({"squid:S1141", "squid:S2142"})
public CallResult exec() {
-
// Retry logic is happening inside callable/task to avoid
// synchronous waiting on getting the result.
// Retry the failure task until reach the max retry number.
// Wait configurable duration for next retry.
- TException exception = null;
- for (int i = 0; i < retryStrategyMaxRetries; i++) {
- try {
- return new CallResult(doTask());
- } catch (TException ex) {
- LOGGER.debug("Failed to execute task on " + (i + 1) + " attempts." +
- " Sleeping for " + retryStrategyWaitDurationMillis + " ms. Exception: " +
- ex.toString(), ex);
- exception = ex;
-
+ //
+ // Only thrift exceptions are retried.
+ // Other exceptions are propagated up the stack.
+ Exception exception = null;
+ try {
+ // We catch all exceptions except Thrift exceptions which are retried
+ for (int i = 0; i < retryStrategyMaxRetries; i++) {
+ //noinspection NestedTryStatement
try {
- Thread.sleep(retryStrategyWaitDurationMillis);
- } catch (InterruptedException ignored) {
- // Skip the rest retries if get InterruptedException.
- // And set the corresponding retries number.
- LOGGER.warn("Interrupted during update fetch during iteration " + (i + 1));
- retries = i;
- i = retryStrategyMaxRetries;
+ return new CallResult(doTask());
+ } catch (TException ex) {
+ LOGGER.debug("Failed to execute task on " + (i + 1) + " attempts." +
+ " Sleeping for " + retryStrategyWaitDurationMillis + " ms. Exception: " +
+ ex.toString(), ex);
+ exception = ex;
+
+ try {
+ Thread.sleep(retryStrategyWaitDurationMillis);
+ } catch (InterruptedException ignored) {
+ // Skip the rest retries if get InterruptedException.
+ // And set the corresponding retries number.
+ LOGGER.warn("Interrupted during update fetch during iteration " + (i + 1));
+ break;
+ }
}
}
-
- retries = i;
+ } catch (Exception ex) {
+ exception = ex;
}
-
- // Task fails, return the failure flag.
- LOGGER.error("Task did not complete successfully after " + (retries + 1)
- + " tries", exception);
+ LOGGER.error("Failed to execute task", exception);
+ // We will fail in the end, so we are shutting down the pool to prevent
+ // new tasks from being scheduled.
+ threadPool.shutdown();
return new CallResult(exception);
}
}
@@ -291,6 +291,7 @@ public final class FullUpdateInitializer implements AutoCloseable {
}
@Override
+ @SuppressWarnings({"squid:S2629", "squid:S135"})
ObjectMapping doTask() throws TException {
List<Table> tables = client.getTableObjectsByName(dbName, tableNames);
if (LOGGER.isDebugEnabled()) {
@@ -345,8 +346,7 @@ public final class FullUpdateInitializer implements AutoCloseable {
ObjectMapping doTask() throws TException {
Database db = client.getDatabase(dbName);
if (!dbName.equalsIgnoreCase(db.getName())) {
- LOGGER.warn(String.format("Database name %s does not match %s",
- db.getName(), dbName));
+ LOGGER.warn("Database name {} does not match {}", db.getName(), dbName);
return emptyObjectMapping;
}
List<String> allTblStr = client.getAllTables(dbName);
@@ -388,8 +388,9 @@ public final class FullUpdateInitializer implements AutoCloseable {
* @throws ExecutionException if there was a scheduling error
* @throws InterruptedException if processing was interrupted
*/
+ @SuppressWarnings("squid:S00112")
public Map<String, Set<String>> getFullHMSSnapshot()
- throws TException, ExecutionException, InterruptedException {
+ throws Exception {
// Get list of all HMS databases
List<String> allDbStr = client.getAllDatabases();
// Schedule async task for each database responsible for fetching per-database
@@ -410,7 +411,7 @@ public final class FullUpdateInitializer implements AutoCloseable {
Future<CallResult> result = results.pop();
// Wait for the task to complete
CallResult callResult = result.get();
- // Fail if we got Thrift errors
+ // Fail if we got errors
if (!callResult.success()) {
throw callResult.getFailure();
}
@@ -438,6 +439,7 @@ public final class FullUpdateInitializer implements AutoCloseable {
threadPool.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException ignored) {
LOGGER.warn("Interrupted shutdown");
+ Thread.currentThread().interrupt();
}
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/b085c37c/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
index 2932055..80d2473 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
@@ -54,6 +54,7 @@ import java.io.File;
import java.io.IOException;
import java.net.SocketException;
import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -273,14 +274,9 @@ public class HMSFollower implements Runnable, AutoCloseable {
CurrentNotificationEventId eventIDBefore = client.getCurrentNotificationEventId();
LOGGER.info(String.format("Before fetching hive full snapshot, Current NotificationID = %s.", eventIDBefore));
- try {
- pathsFullSnapshot = fetchFullUpdate();
- if(pathsFullSnapshot.isEmpty()) {
- LOGGER.info("Hive full snapshot is Empty. Perhaps, HMS does not have any data");
- return;
- }
- } catch (ExecutionException | InterruptedException ex) {
- LOGGER.error("#### Encountered failure during fetching hive full snapshot !!", ex);
+ pathsFullSnapshot = fetchFullUpdate();
+ if(pathsFullSnapshot.isEmpty()) {
+ LOGGER.info("Hive full snapshot is Empty. Perhaps, HMS does not have any data");
return;
}
@@ -385,12 +381,15 @@ public class HMSFollower implements Runnable, AutoCloseable {
* @throws ExecutionException
*/
private Map<String, Set<String>> fetchFullUpdate()
- throws InterruptedException, TException, ExecutionException {
+ throws TException, ExecutionException {
LOGGER.info("Request full HMS snapshot");
try (FullUpdateInitializer updateInitializer = new FullUpdateInitializer(client, authzConf)) {
Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot();
LOGGER.info("Obtained full HMS snapshot");
return pathsUpdate;
+ } catch (Exception ignored) {
+ // Caller will retry later
+ return Collections.emptyMap();
}
}