You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jx...@apache.org on 2015/10/12 18:19:27 UTC
hive git commit: HIVE-12046: Re-create spark client if connection is
dropped (Jimmy, reviewed by Xuefu)
Repository: hive
Updated Branches:
refs/heads/master ec8c793c3 -> 09f5e8436
HIVE-12046: Re-create spark client if connection is dropped (Jimmy, reviewed by Xuefu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/09f5e843
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/09f5e843
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/09f5e843
Branch: refs/heads/master
Commit: 09f5e8436890043135a44ae9ef84625a53ec63ec
Parents: ec8c793
Author: Jimmy Xiang <jx...@cloudera.com>
Authored: Mon Oct 5 14:52:31 2015 -0700
Committer: Jimmy Xiang <jx...@cloudera.com>
Committed: Mon Oct 12 08:43:30 2015 -0700
----------------------------------------------------------------------
.../ql/exec/spark/RemoteHiveSparkClient.java | 22 ++++++++++++++++++++
.../apache/hive/spark/client/SparkClient.java | 5 +++++
.../hive/spark/client/SparkClientImpl.java | 5 +++++
.../org/apache/hive/spark/client/rpc/Rpc.java | 4 ++++
4 files changed, 36 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/09f5e843/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index 7d43160..2e8d1d3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -75,6 +75,7 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
private static final long MAX_PREWARM_TIME = 30000; // 30s
private static final transient Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();
+ private transient Map<String, String> conf;
private transient SparkClient remoteClient;
private transient SparkConf sparkConf;
private transient HiveConf hiveConf;
@@ -89,6 +90,11 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
sparkClientTimtout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT,
TimeUnit.SECONDS);
sparkConf = HiveSparkClientFactory.generateSparkConf(conf);
+ this.conf = conf;
+ createRemoteClient();
+ }
+
+ private void createRemoteClient() throws Exception {
remoteClient = SparkClientFactory.createClient(conf, hiveConf);
if (HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_PREWARM_ENABLED) &&
@@ -155,6 +161,20 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
@Override
public SparkJobRef execute(final DriverContext driverContext, final SparkWork sparkWork) throws Exception {
+ if (hiveConf.get("spark.master").startsWith("yarn-") && !remoteClient.isActive()) {
+ // Re-create the remote client if not active any more
+ close();
+ createRemoteClient();
+ }
+
+ try {
+ return submit(driverContext, sparkWork);
+ } catch (Throwable cause) {
+ throw new Exception("Failed to submit Spark work, please retry later", cause);
+ }
+ }
+
+ private SparkJobRef submit(final DriverContext driverContext, final SparkWork sparkWork) throws Exception {
final Context ctx = driverContext.getCtx();
final HiveConf hiveConf = (HiveConf) ctx.getConf();
refreshLocalResources(sparkWork, hiveConf);
@@ -246,6 +266,8 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
if (remoteClient != null) {
remoteClient.stop();
}
+ localFiles.clear();
+ localJars.clear();
}
private static class JobStatusJob implements Job<Serializable> {
http://git-wip-us.apache.org/repos/asf/hive/blob/09f5e843/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
index 13c2dbc..3e921a5 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
@@ -94,4 +94,9 @@ public interface SparkClient extends Serializable {
* Get default parallelism. For standalone mode, this can be used to get total number of cores.
*/
Future<Integer> getDefaultParallelism();
+
+ /**
+ * Check if remote context is still active.
+ */
+ boolean isActive();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/09f5e843/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
index 2546a46..ceebbb3 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
@@ -178,6 +178,11 @@ class SparkClientImpl implements SparkClient {
return run(new GetDefaultParallelismJob());
}
+ @Override
+ public boolean isActive() {
+ return isAlive && driverRpc.isActive();
+ }
+
void cancel(String jobId) {
protocol.cancel(jobId);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/09f5e843/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
index 6d0b7cc..b2f133b 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
@@ -259,6 +259,10 @@ public class Rpc implements Closeable {
return call(msg, Void.class);
}
+ public boolean isActive() {
+ return channel.isActive();
+ }
+
/**
* Send an RPC call to the remote endpoint and returns a future that can be used to monitor the
* operation.