You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ai...@apache.org on 2018/06/06 21:14:57 UTC
hive git commit: HIVE-18766: Race condition during shutdown of
RemoteDriver, error messages aren't always sent (Aihua Xu,
reviewed by Sahil Takiar)
Repository: hive
Updated Branches:
refs/heads/master 77c145043 -> aae62d871
HIVE-18766: Race condition during shutdown of RemoteDriver, error messages aren't always sent (Aihua Xu, reviewed by Sahil Takiar)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/aae62d87
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/aae62d87
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/aae62d87
Branch: refs/heads/master
Commit: aae62d871bd3bf61281b03e2ef183b214e610cd5
Parents: 77c1450
Author: Aihua Xu <ax...@cloudera.com>
Authored: Tue Jun 5 13:16:54 2018 -0700
Committer: Aihua Xu <ai...@apache.org>
Committed: Wed Jun 6 14:14:39 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 +-
.../apache/hive/spark/client/RemoteDriver.java | 44 +++++++++++++-------
.../hive/spark/client/rpc/RpcConfiguration.java | 8 ++++
3 files changed, 37 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/aae62d87/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index cd425aa..dd42fd1 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4206,7 +4206,7 @@ public class HiveConf extends Configuration {
"in shuffle. This should result in less shuffled data."),
SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout",
"60s", new TimeValidator(TimeUnit.SECONDS),
- "Timeout for requests from Hive client to remote Spark driver."),
+ "Timeout for requests between Hive client and remote Spark driver."),
SPARK_JOB_MONITOR_TIMEOUT("hive.spark.job.monitor.timeout",
"60s", new TimeValidator(TimeUnit.SECONDS),
"Timeout for job monitor to get Spark job state."),
http://git-wip-us.apache.org/repos/asf/hive/blob/aae62d87/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
index caa850c..8130860 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
@@ -35,6 +35,8 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
@@ -92,6 +94,8 @@ public class RemoteDriver {
public static final String REMOTE_DRIVER_PORT_CONF = "--remote-port";
public static final String REMOTE_DRIVER_CONF = "--remote-driver-conf";
+ private final long futureTimeout; // Rpc call timeout in milliseconds
+
private RemoteDriver(String[] args) throws Exception {
this.activeJobs = Maps.newConcurrentMap();
this.jcLock = new Object();
@@ -135,7 +139,9 @@ public class RemoteDriver {
String secret = mapConf.get(SparkClientFactory.CONF_KEY_SECRET);
Preconditions.checkArgument(secret != null, "No secret provided.");
- int threadCount = new RpcConfiguration(mapConf).getRpcThreadCount();
+ RpcConfiguration rpcConf = new RpcConfiguration(mapConf);
+ futureTimeout = rpcConf.getFutureTimeoutMs();
+ int threadCount = rpcConf.getRpcThreadCount();
this.egroup = new NioEventLoopGroup(
threadCount,
new ThreadFactoryBuilder()
@@ -232,13 +238,19 @@ public class RemoteDriver {
for (JobWrapper<?> job : activeJobs.values()) {
cancelJob(job);
}
+
if (error != null) {
- protocol.sendError(error);
+ try {
+ protocol.sendError(error).get(futureTimeout, TimeUnit.MILLISECONDS);
+ } catch(InterruptedException|ExecutionException|TimeoutException e) {
+ LOG.warn("Failed to send out the error during RemoteDriver shutdown", e);
+ }
}
if (jc != null) {
jc.stop();
}
clientRpc.close();
+
egroup.shutdownGracefully();
synchronized (shutdownLock) {
shutdownLock.notifyAll();
@@ -265,34 +277,35 @@ public class RemoteDriver {
private class DriverProtocol extends BaseProtocol {
- void sendError(Throwable error) {
+ Future<Void> sendError(Throwable error) {
LOG.debug("Send error to Client: {}", Throwables.getStackTraceAsString(error));
- clientRpc.call(new Error(Throwables.getStackTraceAsString(error)));
+ return clientRpc.call(new Error(Throwables.getStackTraceAsString(error)));
}
- void sendErrorMessage(String cause) {
+ Future<Void> sendErrorMessage(String cause) {
LOG.debug("Send error to Client: {}", cause);
- clientRpc.call(new Error(cause));
+ return clientRpc.call(new Error(cause));
}
- <T extends Serializable> void jobFinished(String jobId, T result,
+ <T extends Serializable>
+ Future<Void> jobFinished(String jobId, T result,
Throwable error, SparkCounters counters) {
LOG.debug("Send job({}) result to Client.", jobId);
- clientRpc.call(new JobResult(jobId, result, error, counters));
+ return clientRpc.call(new JobResult<T>(jobId, result, error, counters));
}
- void jobStarted(String jobId) {
- clientRpc.call(new JobStarted(jobId));
+ Future<Void> jobStarted(String jobId) {
+ return clientRpc.call(new JobStarted(jobId));
}
- void jobSubmitted(String jobId, int sparkJobId) {
+ Future<Void> jobSubmitted(String jobId, int sparkJobId) {
LOG.debug("Send job({}/{}) submitted to Client.", jobId, sparkJobId);
- clientRpc.call(new JobSubmitted(jobId, sparkJobId));
+ return clientRpc.call(new JobSubmitted(jobId, sparkJobId));
}
- void sendMetrics(String jobId, int sparkJobId, int stageId, long taskId, Metrics metrics) {
+ Future<Void> sendMetrics(String jobId, int sparkJobId, int stageId, long taskId, Metrics metrics) {
LOG.debug("Send task({}/{}/{}/{}) metric to Client.", jobId, sparkJobId, stageId, taskId);
- clientRpc.call(new JobMetrics(jobId, sparkJobId, stageId, taskId, metrics));
+ return clientRpc.call(new JobMetrics(jobId, sparkJobId, stageId, taskId, metrics));
}
private void handle(ChannelHandlerContext ctx, CancelJob msg) {
@@ -550,8 +563,7 @@ public class RemoteDriver {
// If the main thread throws an exception for some reason, propagate the exception to the
// client and initiate a safe shutdown
if (rd.running) {
- rd.protocol.sendError(e);
- rd.shutdown(null);
+ rd.shutdown(e);
}
throw e;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/aae62d87/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
index 090c628..bd3a7a7 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
@@ -46,6 +46,7 @@ public final class RpcConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(RpcConfiguration.class);
public static final ImmutableSet<String> HIVE_SPARK_RSC_CONFIGS = ImmutableSet.of(
+ HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT.varname,
HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname,
HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname,
HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname,
@@ -55,6 +56,7 @@ public final class RpcConfiguration {
HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname
);
public static final ImmutableSet<String> HIVE_SPARK_TIME_CONFIGS = ImmutableSet.of(
+ HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT.varname,
HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname,
HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname
);
@@ -71,6 +73,12 @@ public final class RpcConfiguration {
this.config = Collections.unmodifiableMap(config);
}
+ public long getFutureTimeoutMs() {
+ String value = config.get(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT.varname);
+ return value != null ? Long.parseLong(value) : DEFAULT_CONF.getTimeVar(
+ HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+
long getConnectTimeoutMs() {
String value = config.get(HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname);
return value != null ? Long.parseLong(value) : DEFAULT_CONF.getTimeVar(