You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ai...@apache.org on 2018/06/06 21:14:57 UTC

hive git commit: HIVE-18766: Race condition during shutdown of RemoteDriver, error messages aren't always sent (Aihua Xu, reviewed by Sahil Takiar)

Repository: hive
Updated Branches:
  refs/heads/master 77c145043 -> aae62d871


HIVE-18766: Race condition during shutdown of RemoteDriver, error messages aren't always sent (Aihua Xu, reviewed by Sahil Takiar)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/aae62d87
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/aae62d87
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/aae62d87

Branch: refs/heads/master
Commit: aae62d871bd3bf61281b03e2ef183b214e610cd5
Parents: 77c1450
Author: Aihua Xu <ax...@cloudera.com>
Authored: Tue Jun 5 13:16:54 2018 -0700
Committer: Aihua Xu <ai...@apache.org>
Committed: Wed Jun 6 14:14:39 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  2 +-
 .../apache/hive/spark/client/RemoteDriver.java  | 44 +++++++++++++-------
 .../hive/spark/client/rpc/RpcConfiguration.java |  8 ++++
 3 files changed, 37 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/aae62d87/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index cd425aa..dd42fd1 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4206,7 +4206,7 @@ public class HiveConf extends Configuration {
         "in shuffle. This should result in less shuffled data."),
     SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout",
       "60s", new TimeValidator(TimeUnit.SECONDS),
-      "Timeout for requests from Hive client to remote Spark driver."),
+      "Timeout for requests between Hive client and remote Spark driver."),
     SPARK_JOB_MONITOR_TIMEOUT("hive.spark.job.monitor.timeout",
       "60s", new TimeValidator(TimeUnit.SECONDS),
       "Timeout for job monitor to get Spark job state."),

http://git-wip-us.apache.org/repos/asf/hive/blob/aae62d87/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
index caa850c..8130860 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
@@ -35,6 +35,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.FileUtils;
@@ -92,6 +94,8 @@ public class RemoteDriver {
   public static final String REMOTE_DRIVER_PORT_CONF = "--remote-port";
   public static final String REMOTE_DRIVER_CONF = "--remote-driver-conf";
 
+  private final long futureTimeout; // Rpc call timeout in milliseconds
+
   private RemoteDriver(String[] args) throws Exception {
     this.activeJobs = Maps.newConcurrentMap();
     this.jcLock = new Object();
@@ -135,7 +139,9 @@ public class RemoteDriver {
     String secret = mapConf.get(SparkClientFactory.CONF_KEY_SECRET);
     Preconditions.checkArgument(secret != null, "No secret provided.");
 
-    int threadCount = new RpcConfiguration(mapConf).getRpcThreadCount();
+    RpcConfiguration rpcConf = new RpcConfiguration(mapConf);
+    futureTimeout = rpcConf.getFutureTimeoutMs();
+    int threadCount = rpcConf.getRpcThreadCount();
     this.egroup = new NioEventLoopGroup(
         threadCount,
         new ThreadFactoryBuilder()
@@ -232,13 +238,19 @@ public class RemoteDriver {
       for (JobWrapper<?> job : activeJobs.values()) {
         cancelJob(job);
       }
+
       if (error != null) {
-        protocol.sendError(error);
+        try {
+          protocol.sendError(error).get(futureTimeout, TimeUnit.MILLISECONDS);
+        } catch(InterruptedException|ExecutionException|TimeoutException e) {
+          LOG.warn("Failed to send out the error during RemoteDriver shutdown", e);
+        }
       }
       if (jc != null) {
         jc.stop();
       }
       clientRpc.close();
+
       egroup.shutdownGracefully();
       synchronized (shutdownLock) {
         shutdownLock.notifyAll();
@@ -265,34 +277,35 @@ public class RemoteDriver {
 
   private class DriverProtocol extends BaseProtocol {
 
-    void sendError(Throwable error) {
+    Future<Void> sendError(Throwable error) {
       LOG.debug("Send error to Client: {}", Throwables.getStackTraceAsString(error));
-      clientRpc.call(new Error(Throwables.getStackTraceAsString(error)));
+      return clientRpc.call(new Error(Throwables.getStackTraceAsString(error)));
     }
 
-    void sendErrorMessage(String cause) {
+    Future<Void> sendErrorMessage(String cause) {
       LOG.debug("Send error to Client: {}", cause);
-      clientRpc.call(new Error(cause));
+      return clientRpc.call(new Error(cause));
     }
 
-    <T extends Serializable> void jobFinished(String jobId, T result,
+    <T extends Serializable>
+    Future<Void> jobFinished(String jobId, T result,
         Throwable error, SparkCounters counters) {
       LOG.debug("Send job({}) result to Client.", jobId);
-      clientRpc.call(new JobResult(jobId, result, error, counters));
+      return clientRpc.call(new JobResult<T>(jobId, result, error, counters));
     }
 
-    void jobStarted(String jobId) {
-      clientRpc.call(new JobStarted(jobId));
+    Future<Void> jobStarted(String jobId) {
+      return clientRpc.call(new JobStarted(jobId));
     }
 
-    void jobSubmitted(String jobId, int sparkJobId) {
+    Future<Void> jobSubmitted(String jobId, int sparkJobId) {
       LOG.debug("Send job({}/{}) submitted to Client.", jobId, sparkJobId);
-      clientRpc.call(new JobSubmitted(jobId, sparkJobId));
+      return clientRpc.call(new JobSubmitted(jobId, sparkJobId));
     }
 
-    void sendMetrics(String jobId, int sparkJobId, int stageId, long taskId, Metrics metrics) {
+    Future<Void> sendMetrics(String jobId, int sparkJobId, int stageId, long taskId, Metrics metrics) {
       LOG.debug("Send task({}/{}/{}/{}) metric to Client.", jobId, sparkJobId, stageId, taskId);
-      clientRpc.call(new JobMetrics(jobId, sparkJobId, stageId, taskId, metrics));
+      return clientRpc.call(new JobMetrics(jobId, sparkJobId, stageId, taskId, metrics));
     }
 
     private void handle(ChannelHandlerContext ctx, CancelJob msg) {
@@ -550,8 +563,7 @@ public class RemoteDriver {
       // If the main thread throws an exception for some reason, propagate the exception to the
       // client and initiate a safe shutdown
       if (rd.running) {
-        rd.protocol.sendError(e);
-        rd.shutdown(null);
+        rd.shutdown(e);
       }
       throw e;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/aae62d87/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
index 090c628..bd3a7a7 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
@@ -46,6 +46,7 @@ public final class RpcConfiguration {
   private static final Logger LOG = LoggerFactory.getLogger(RpcConfiguration.class);
 
   public static final ImmutableSet<String> HIVE_SPARK_RSC_CONFIGS = ImmutableSet.of(
+    HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT.varname,
     HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname,
     HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname,
     HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname,
@@ -55,6 +56,7 @@ public final class RpcConfiguration {
     HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname
   );
   public static final ImmutableSet<String> HIVE_SPARK_TIME_CONFIGS = ImmutableSet.of(
+    HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT.varname,
     HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname,
     HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname
   );
@@ -71,6 +73,12 @@ public final class RpcConfiguration {
     this.config = Collections.unmodifiableMap(config);
   }
 
+  public long getFutureTimeoutMs() {
+    String value = config.get(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT.varname);
+    return value != null ? Long.parseLong(value) : DEFAULT_CONF.getTimeVar(
+      HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
+  }
+
   long getConnectTimeoutMs() {
     String value = config.get(HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname);
     return value != null ? Long.parseLong(value) : DEFAULT_CONF.getTimeVar(