You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jx...@apache.org on 2015/10/12 18:19:27 UTC

hive git commit: HIVE-12046: Re-create spark client if connection is dropped (Jimmy, reviewed by Xuefu)

Repository: hive
Updated Branches:
  refs/heads/master ec8c793c3 -> 09f5e8436


HIVE-12046: Re-create spark client if connection is dropped (Jimmy, reviewed by Xuefu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/09f5e843
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/09f5e843
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/09f5e843

Branch: refs/heads/master
Commit: 09f5e8436890043135a44ae9ef84625a53ec63ec
Parents: ec8c793
Author: Jimmy Xiang <jx...@cloudera.com>
Authored: Mon Oct 5 14:52:31 2015 -0700
Committer: Jimmy Xiang <jx...@cloudera.com>
Committed: Mon Oct 12 08:43:30 2015 -0700

----------------------------------------------------------------------
 .../ql/exec/spark/RemoteHiveSparkClient.java    | 22 ++++++++++++++++++++
 .../apache/hive/spark/client/SparkClient.java   |  5 +++++
 .../hive/spark/client/SparkClientImpl.java      |  5 +++++
 .../org/apache/hive/spark/client/rpc/Rpc.java   |  4 ++++
 4 files changed, 36 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/09f5e843/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index 7d43160..2e8d1d3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -75,6 +75,7 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
   private static final long MAX_PREWARM_TIME = 30000; // 30s
   private static final transient Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();
 
+  private transient Map<String, String> conf;
   private transient SparkClient remoteClient;
   private transient SparkConf sparkConf;
   private transient HiveConf hiveConf;
@@ -89,6 +90,11 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
     sparkClientTimtout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT,
         TimeUnit.SECONDS);
     sparkConf = HiveSparkClientFactory.generateSparkConf(conf);
+    this.conf = conf;
+    createRemoteClient();
+  }
+
+  private void createRemoteClient() throws Exception {
     remoteClient = SparkClientFactory.createClient(conf, hiveConf);
 
     if (HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_PREWARM_ENABLED) &&
@@ -155,6 +161,20 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
 
   @Override
   public SparkJobRef execute(final DriverContext driverContext, final SparkWork sparkWork) throws Exception {
+    if (hiveConf.get("spark.master").startsWith("yarn-") && !remoteClient.isActive()) {
+      // Re-create the remote client if not active any more
+      close();
+      createRemoteClient();
+    }
+
+    try {
+      return submit(driverContext, sparkWork);
+    } catch (Throwable cause) {
+      throw new Exception("Failed to submit Spark work, please retry later", cause);
+    }
+  }
+
+  private SparkJobRef submit(final DriverContext driverContext, final SparkWork sparkWork) throws Exception {
     final Context ctx = driverContext.getCtx();
     final HiveConf hiveConf = (HiveConf) ctx.getConf();
     refreshLocalResources(sparkWork, hiveConf);
@@ -246,6 +266,8 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
     if (remoteClient != null) {
       remoteClient.stop();
     }
+    localFiles.clear();
+    localJars.clear();
   }
 
   private static class JobStatusJob implements Job<Serializable> {

http://git-wip-us.apache.org/repos/asf/hive/blob/09f5e843/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
index 13c2dbc..3e921a5 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
@@ -94,4 +94,9 @@ public interface SparkClient extends Serializable {
    * Get default parallelism. For standalone mode, this can be used to get total number of cores.
    */
   Future<Integer> getDefaultParallelism();
+
+  /**
+   * Check if remote context is still active.
+   */
+  boolean isActive();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/09f5e843/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
index 2546a46..ceebbb3 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
@@ -178,6 +178,11 @@ class SparkClientImpl implements SparkClient {
     return run(new GetDefaultParallelismJob());
   }
 
+  @Override
+  public boolean isActive() {
+    return isAlive && driverRpc.isActive();
+  }
+
   void cancel(String jobId) {
     protocol.cancel(jobId);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/09f5e843/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
index 6d0b7cc..b2f133b 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
@@ -259,6 +259,10 @@ public class Rpc implements Closeable {
     return call(msg, Void.class);
   }
 
+  public boolean isActive() {
+    return channel.isActive();
+  }
+
   /**
    * Send an RPC call to the remote endpoint and returns a future that can be used to monitor the
    * operation.