You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2018/06/05 13:20:47 UTC
[2/2] hive git commit: HIVE-18533: Add option to use
InProcessLauncher to submit spark jobs (Sahil Takiar, reviewed by Rui Li)
HIVE-18533: Add option to use InProcessLauncher to submit spark jobs (Sahil Takiar, reviewed by Rui Li)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/da663866
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/da663866
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/da663866
Branch: refs/heads/master
Commit: da66386662fbbcbde9501b4a7b27d076bcc790d4
Parents: f80cff9
Author: Sahil Takiar <ta...@gmail.com>
Authored: Tue Jun 5 08:00:54 2018 -0500
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Tue Jun 5 08:00:54 2018 -0500
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 12 +-
itests/qtest-spark/pom.xml | 6 +
.../test/resources/testconfiguration.properties | 3 +-
.../clientpositive/spark_in_process_launcher.q | 6 +
.../spark/spark_in_process_launcher.q.out | 96 +++
spark-client/pom.xml | 6 +
.../hive/spark/client/AbstractSparkClient.java | 600 ++++++++++++++++
.../apache/hive/spark/client/JobHandleImpl.java | 8 +-
.../apache/hive/spark/client/SparkClient.java | 7 +
.../hive/spark/client/SparkClientFactory.java | 16 +-
.../hive/spark/client/SparkClientImpl.java | 703 -------------------
.../spark/client/SparkLauncherSparkClient.java | 220 ++++++
.../spark/client/SparkSubmitSparkClient.java | 237 +++++++
.../apache/hive/spark/client/TestJobHandle.java | 2 +-
.../hive/spark/client/TestSparkClient.java | 25 +-
.../client/TestSparkLauncherSparkClient.java | 77 ++
16 files changed, 1306 insertions(+), 718 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 3295d1d..56d2de0 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -316,6 +316,9 @@ public class HiveConf extends Configuration {
public static final String HIVE_SERVER2_AUTHENTICATION_LDAP_USERMEMBERSHIPKEY_NAME =
"hive.server2.authentication.ldap.userMembershipKey";
+ public static final String HIVE_SPARK_SUBMIT_CLIENT = "spark-submit";
+ public static final String HIVE_SPARK_LAUNCHER_CLIENT = "spark-launcher";
+
/**
* dbVars are the parameters can be set per database. If these
* parameters are set as a database property, when switching to that
@@ -4245,6 +4248,11 @@ public class HiveConf extends Configuration {
"If a Spark job contains more tasks than the maximum, it will be cancelled. A value of -1 means no limit."),
SPARK_STAGE_MAX_TASKS("hive.spark.stage.max.tasks", -1, "The maximum number of tasks a stage in a Spark job may have.\n" +
"If a Spark job stage contains more tasks than the maximum, the job will be cancelled. A value of -1 means no limit."),
+ SPARK_CLIENT_TYPE("hive.spark.client.type", HIVE_SPARK_SUBMIT_CLIENT,
+ "Controls how the Spark application is launched. If " + HIVE_SPARK_SUBMIT_CLIENT + " is " +
+ "specified (default) then the spark-submit shell script is used to launch the Spark " +
+ "app. If " + HIVE_SPARK_LAUNCHER_CLIENT + " is specified then Spark's " +
+ "InProcessLauncher is used to programmatically launch the app."),
NWAYJOINREORDER("hive.reorder.nway.joins", true,
"Runs reordering of tables within single n-way join (i.e.: picks streamtable)"),
HIVE_MERGE_NWAY_JOINS("hive.merge.nway.joins", true,
@@ -4335,7 +4343,8 @@ public class HiveConf extends Configuration {
"Comma separated list of variables which are used internally and should not be configurable."),
HIVE_SPARK_RSC_CONF_LIST("hive.spark.rsc.conf.list",
SPARK_OPTIMIZE_SHUFFLE_SERDE.varname + "," +
- SPARK_CLIENT_FUTURE_TIMEOUT.varname,
+ SPARK_CLIENT_FUTURE_TIMEOUT.varname + "," +
+ SPARK_CLIENT_TYPE.varname,
"Comma separated list of variables which are related to remote spark context.\n" +
"Changing these variables will result in re-creating the spark session."),
HIVE_QUERY_TIMEOUT_SECONDS("hive.query.timeout.seconds", "0s",
@@ -5802,5 +5811,4 @@ public class HiveConf extends Configuration {
}
return ret;
}
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/itests/qtest-spark/pom.xml
----------------------------------------------------------------------
diff --git a/itests/qtest-spark/pom.xml b/itests/qtest-spark/pom.xml
index d0e7eb8..8ed3171 100644
--- a/itests/qtest-spark/pom.xml
+++ b/itests/qtest-spark/pom.xml
@@ -64,6 +64,12 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-yarn_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>${jetty.version}</version>
http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index f3cb9de..b6a1c9b 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1586,7 +1586,8 @@ miniSparkOnYarn.only.query.files=spark_combine_equivalent_work.q,\
spark_use_ts_stats_for_mapjoin.q,\
spark_use_op_stats.q,\
spark_explain_groupbyshuffle.q,\
- spark_opt_shuffle_serde.q
+ spark_opt_shuffle_serde.q,\
+ spark_in_process_launcher.q
miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\
bucket4.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/ql/src/test/queries/clientpositive/spark_in_process_launcher.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/spark_in_process_launcher.q b/ql/src/test/queries/clientpositive/spark_in_process_launcher.q
new file mode 100644
index 0000000..368e135
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/spark_in_process_launcher.q
@@ -0,0 +1,6 @@
+--! qt:dataset:src
+
+set hive.spark.client.type=spark-launcher;
+
+explain select key, count(*) from src group by key order by key limit 10;
+select key, count(*) from src group by key order by key limit 10;
http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/ql/src/test/results/clientpositive/spark/spark_in_process_launcher.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/spark_in_process_launcher.q.out b/ql/src/test/results/clientpositive/spark/spark_in_process_launcher.q.out
new file mode 100644
index 0000000..5ae2f26
--- /dev/null
+++ b/ql/src/test/results/clientpositive/spark/spark_in_process_launcher.q.out
@@ -0,0 +1,96 @@
+PREHOOK: query: explain select key, count(*) from src group by key order by key limit 10
+PREHOOK: type: QUERY
+POSTHOOK: query: explain select key, count(*) from src group by key order by key limit 10
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Spark
+ Edges:
+ Reducer 2 <- Map 1 (GROUP, 4)
+ Reducer 3 <- Reducer 2 (SORT, 1)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: src
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string)
+ outputColumnNames: key
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: count()
+ keys: key (type: string)
+ mode: hash
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ TopN Hash Memory Usage: 0.1
+ value expressions: _col1 (type: bigint)
+ Execution mode: vectorized
+ Reducer 2
+ Execution mode: vectorized
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: count(VALUE._col0)
+ keys: KEY._col0 (type: string)
+ mode: mergepartial
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ TopN Hash Memory Usage: 0.1
+ value expressions: _col1 (type: bigint)
+ Reducer 3
+ Execution mode: vectorized
+ Reduce Operator Tree:
+ Select Operator
+ expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: bigint)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Limit
+ Number of rows: 10
+ Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: 10
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: select key, count(*) from src group by key order by key limit 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select key, count(*) from src group by key order by key limit 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+0 3
+10 1
+100 2
+103 2
+104 2
+105 1
+11 1
+111 1
+113 2
+114 1
http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/pom.xml
----------------------------------------------------------------------
diff --git a/spark-client/pom.xml b/spark-client/pom.xml
index c4d8178..9a1e1c2 100644
--- a/spark-client/pom.xml
+++ b/spark-client/pom.xml
@@ -100,6 +100,12 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-yarn_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java
new file mode 100644
index 0000000..ed9222c
--- /dev/null
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static org.apache.hive.spark.client.SparkClientUtilities.HIVE_KRYO_REG_NAME;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import com.google.common.io.Resources;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.Promise;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.io.Writer;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hive.spark.client.rpc.Rpc;
+import org.apache.hive.spark.client.rpc.RpcConfiguration;
+import org.apache.hive.spark.client.rpc.RpcServer;
+import org.apache.spark.SparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An abstract implementation of {@link SparkClient} that allows sub-classes to override how the
+ * spark application is launched. It provides the following functionality: (1) creating the client
+ * connection to the {@link RemoteDriver} and managing its lifecycle, (2) monitoring the thread
+ * used to submit the Spark application, (3) safe shutdown of the {@link RemoteDriver}, and (4)
+ * configuration handling for submitting the Spark application.
+ *
+ * <p>
+ * This class contains the client protocol used to communicate with the {@link RemoteDriver}.
+ * It uses this protocol to submit {@link Job}s to the {@link RemoteDriver}.
+ * </p>
+ */
+abstract class AbstractSparkClient implements SparkClient {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractSparkClient.class);
+
+ private static final long DEFAULT_SHUTDOWN_TIMEOUT = 10000; // In milliseconds
+
+ private static final String OSX_TEST_OPTS = "SPARK_OSX_TEST_OPTS";
+ private static final String DRIVER_OPTS_KEY = "spark.driver.extraJavaOptions";
+ private static final String EXECUTOR_OPTS_KEY = "spark.executor.extraJavaOptions";
+ private static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath";
+ private static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath";
+ private static final String SPARK_DEPLOY_MODE = "spark.submit.deployMode";
+
+ protected final Map<String, String> conf;
+ private final HiveConf hiveConf;
+ private final Future<Void> driverFuture;
+ private final Map<String, JobHandleImpl<?>> jobs;
+ private final Rpc driverRpc;
+ private final ClientProtocol protocol;
+ protected volatile boolean isAlive;
+
+ protected AbstractSparkClient(RpcServer rpcServer, Map<String, String> conf, HiveConf hiveConf,
+ String sessionid) throws IOException {
+ this.conf = conf;
+ this.hiveConf = hiveConf;
+ this.jobs = Maps.newConcurrentMap();
+
+ String secret = rpcServer.createSecret();
+ this.driverFuture = startDriver(rpcServer, sessionid, secret);
+ this.protocol = new ClientProtocol();
+
+ try {
+ // The RPC server will take care of timeouts here.
+ this.driverRpc = rpcServer.registerClient(sessionid, secret, protocol).get();
+ } catch (Throwable e) {
+ String errorMsg;
+ if (e.getCause() instanceof TimeoutException) {
+ errorMsg = "Timed out waiting for Remote Spark Driver to connect to HiveServer2.\nPossible reasons " +
+ "include network issues, errors in remote driver, cluster has no available resources, etc." +
+ "\nPlease check YARN or Spark driver's logs for further information.";
+ } else if (e.getCause() instanceof InterruptedException) {
+ errorMsg = "Interrupted while waiting for Remote Spark Driver to connect to HiveServer2.\nIt is possible " +
+ "that the query was cancelled which would cause the Spark Session to close.";
+ } else {
+ errorMsg = "Error while waiting for Remote Spark Driver to connect back to HiveServer2.";
+ }
+ LOG.error(errorMsg, e);
+ driverFuture.cancel(true);
+ try {
+ driverFuture.get();
+ } catch (InterruptedException ie) {
+ // Give up.
+ LOG.warn("Interrupted before driver thread was finished.", ie);
+ } catch (ExecutionException ee) {
+ LOG.error("Driver thread failed", ee);
+ }
+ throw Throwables.propagate(e);
+ }
+
+ LOG.info("Successfully connected to Remote Spark Driver at: " + this.driverRpc.getRemoteAddress());
+
+ driverRpc.addListener(new Rpc.Listener() {
+ @Override
+ public void rpcClosed(Rpc rpc) {
+ if (isAlive) {
+ LOG.warn("Connection to Remote Spark Driver {} closed unexpectedly", driverRpc.getRemoteAddress());
+ isAlive = false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Connection to Remote Spark Driver Closed Unexpectedly";
+ }
+ });
+ isAlive = true;
+ }
+
+ @Override
+ public <T extends Serializable> JobHandle<T> submit(Job<T> job) {
+ return protocol.submit(job, Collections.<JobHandle.Listener<T>>emptyList());
+ }
+
+ @Override
+ public <T extends Serializable> JobHandle<T> submit(Job<T> job, List<JobHandle.Listener<T>> listeners) {
+ return protocol.submit(job, listeners);
+ }
+
+ @Override
+ public <T extends Serializable> Future<T> run(Job<T> job) {
+ return protocol.run(job);
+ }
+
+ @Override
+ public void stop() {
+ if (isAlive) {
+ isAlive = false;
+ try {
+ protocol.endSession();
+ } catch (Exception e) {
+ LOG.warn("Exception while waiting for end session reply.", e);
+ } finally {
+ driverRpc.close();
+ }
+ }
+
+ try {
+ driverFuture.get(DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ LOG.error("Exception while waiting for driver future to complete", e);
+ } catch (TimeoutException e) {
+ LOG.warn("Timed out shutting down remote driver, cancelling...");
+ driverFuture.cancel(true);
+ } catch (InterruptedException ie) {
+ LOG.debug("Interrupted before driver thread was finished.");
+ driverFuture.cancel(true);
+ }
+ }
+
+ @Override
+ public Future<?> addJar(URI uri) {
+ return run(new AddJarJob(uri.toString()));
+ }
+
+ @Override
+ public Future<?> addFile(URI uri) {
+ return run(new AddFileJob(uri.toString()));
+ }
+
+ @Override
+ public Future<Integer> getExecutorCount() {
+ return run(new GetExecutorCountJob());
+ }
+
+ @Override
+ public Future<Integer> getDefaultParallelism() {
+ return run(new GetDefaultParallelismJob());
+ }
+
+ @Override
+ public boolean isActive() {
+ return isAlive && driverRpc.isActive();
+ }
+
+ @Override
+ public void cancel(String jobId) {
+ protocol.cancel(jobId);
+ }
+
+ private Future<Void> startDriver(final RpcServer rpcServer, final String clientId,
+ final String secret) throws IOException {
+ final String serverAddress = rpcServer.getAddress();
+ final String serverPort = String.valueOf(rpcServer.getPort());
+
+ String sparkHome = getSparkHome();
+
+ String sparkLogDir = conf.get("hive.spark.log.dir");
+ if (sparkLogDir == null) {
+ if (sparkHome == null) {
+ sparkLogDir = "./target/";
+ } else {
+ sparkLogDir = sparkHome + "/logs/";
+ }
+ }
+
+ String osxTestOpts = "";
+ if (Strings.nullToEmpty(System.getProperty("os.name")).toLowerCase().contains("mac")) {
+ osxTestOpts = Strings.nullToEmpty(System.getenv(OSX_TEST_OPTS));
+ }
+
+ String driverJavaOpts = Joiner.on(" ").skipNulls().join(
+ "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(DRIVER_OPTS_KEY));
+ String executorJavaOpts = Joiner.on(" ").skipNulls().join(
+ "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(EXECUTOR_OPTS_KEY));
+
+ // Create a file with all the job properties to be read by spark-submit. Change the
+ // file's permissions so that only the owner can read it. This avoid having the
+ // connection secret show up in the child process's command line.
+ File properties = File.createTempFile("spark-submit.", ".properties");
+ if (!properties.setReadable(false) || !properties.setReadable(true, true)) {
+ throw new IOException("Cannot change permissions of job properties file.");
+ }
+ properties.deleteOnExit();
+
+ Properties allProps = new Properties();
+ // first load the defaults from spark-defaults.conf if available
+ try {
+ URL sparkDefaultsUrl = Thread.currentThread().getContextClassLoader().getResource("spark-defaults.conf");
+ if (sparkDefaultsUrl != null) {
+ LOG.info("Loading spark defaults configs from: " + sparkDefaultsUrl);
+ allProps.load(new ByteArrayInputStream(Resources.toByteArray(sparkDefaultsUrl)));
+ }
+ } catch (Exception e) {
+ String msg = "Exception trying to load spark-defaults.conf: " + e;
+ throw new IOException(msg, e);
+ }
+ // then load the SparkClientImpl config
+ for (Map.Entry<String, String> e : conf.entrySet()) {
+ allProps.put(e.getKey(), conf.get(e.getKey()));
+ }
+ allProps.put(SparkClientFactory.CONF_CLIENT_ID, clientId);
+ allProps.put(SparkClientFactory.CONF_KEY_SECRET, secret);
+ allProps.put(DRIVER_OPTS_KEY, driverJavaOpts);
+ allProps.put(EXECUTOR_OPTS_KEY, executorJavaOpts);
+
+ String isTesting = conf.get("spark.testing");
+ if (isTesting != null && isTesting.equalsIgnoreCase("true")) {
+ String hiveHadoopTestClasspath = Strings.nullToEmpty(System.getenv("HIVE_HADOOP_TEST_CLASSPATH"));
+ if (!hiveHadoopTestClasspath.isEmpty()) {
+ String extraDriverClasspath = Strings.nullToEmpty((String)allProps.get(DRIVER_EXTRA_CLASSPATH));
+ if (extraDriverClasspath.isEmpty()) {
+ allProps.put(DRIVER_EXTRA_CLASSPATH, hiveHadoopTestClasspath);
+ } else {
+ extraDriverClasspath = extraDriverClasspath.endsWith(File.pathSeparator) ? extraDriverClasspath : extraDriverClasspath + File.pathSeparator;
+ allProps.put(DRIVER_EXTRA_CLASSPATH, extraDriverClasspath + hiveHadoopTestClasspath);
+ }
+
+ String extraExecutorClasspath = Strings.nullToEmpty((String)allProps.get(EXECUTOR_EXTRA_CLASSPATH));
+ if (extraExecutorClasspath.isEmpty()) {
+ allProps.put(EXECUTOR_EXTRA_CLASSPATH, hiveHadoopTestClasspath);
+ } else {
+ extraExecutorClasspath = extraExecutorClasspath.endsWith(File.pathSeparator) ? extraExecutorClasspath : extraExecutorClasspath + File.pathSeparator;
+ allProps.put(EXECUTOR_EXTRA_CLASSPATH, extraExecutorClasspath + hiveHadoopTestClasspath);
+ }
+ }
+ }
+
+ Writer writer = new OutputStreamWriter(new FileOutputStream(properties), Charsets.UTF_8);
+ try {
+ allProps.store(writer, "Spark Context configuration");
+ } finally {
+ writer.close();
+ }
+
+ // Define how to pass options to the child process. If launching in client (or local)
+ // mode, the driver options need to be passed directly on the command line. Otherwise,
+ // SparkSubmit will take care of that for us.
+ String master = conf.get("spark.master");
+ Preconditions.checkArgument(master != null, "spark.master is not defined.");
+ String deployMode = conf.get(SPARK_DEPLOY_MODE);
+
+ if (SparkClientUtilities.isYarnClusterMode(master, deployMode)) {
+ String executorCores = conf.get("spark.executor.cores");
+ if (executorCores != null) {
+ addExecutorCores(executorCores);
+ }
+
+ String executorMemory = conf.get("spark.executor.memory");
+ if (executorMemory != null) {
+ addExecutorMemory(executorMemory);
+ }
+
+ String numOfExecutors = conf.get("spark.executor.instances");
+ if (numOfExecutors != null) {
+ addNumExecutors(numOfExecutors);
+ }
+ }
+ // The options --principal/--keypad do not work with --proxy-user in spark-submit.sh
+ // (see HIVE-15485, SPARK-5493, SPARK-19143), so Hive could only support doAs or
+ // delegation token renewal, but not both. Since doAs is a more common case, if both
+ // are needed, we choose to favor doAs. So when doAs is enabled, we use kinit command,
+ // otherwise, we pass the principal/keypad to spark to support the token renewal for
+ // long-running application.
+ if ("kerberos".equals(hiveConf.get(HADOOP_SECURITY_AUTHENTICATION))) {
+ String principal = SecurityUtil.getServerPrincipal(hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL),
+ "0.0.0.0");
+ String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
+ boolean isDoAsEnabled = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
+ if (StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(keyTabFile)) {
+ addKeytabAndPrincipal(isDoAsEnabled, keyTabFile, principal);
+ }
+ }
+ if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
+ try {
+ String currentUser = Utils.getUGI().getShortUserName();
+ // do not do impersonation in CLI mode
+ if (!currentUser.equals(System.getProperty("user.name"))) {
+ LOG.info("Attempting impersonation of " + currentUser);
+ addProxyUser(currentUser);
+ }
+ } catch (Exception e) {
+ String msg = "Cannot obtain username: " + e;
+ throw new IllegalStateException(msg, e);
+ }
+ }
+
+ String regStr = conf.get("spark.kryo.registrator");
+ if (HIVE_KRYO_REG_NAME.equals(regStr)) {
+ addJars(SparkClientUtilities.findKryoRegistratorJar(hiveConf));
+ }
+
+ addPropertiesFile(properties.getAbsolutePath());
+ addClass(RemoteDriver.class.getName());
+
+ String jar = "spark-internal";
+ if (SparkContext.jarOfClass(this.getClass()).isDefined()) {
+ jar = SparkContext.jarOfClass(this.getClass()).get();
+ }
+ addExecutableJar(jar);
+
+
+ addAppArg(RemoteDriver.REMOTE_DRIVER_HOST_CONF);
+ addAppArg(serverAddress);
+ addAppArg(RemoteDriver.REMOTE_DRIVER_PORT_CONF);
+ addAppArg(serverPort);
+
+ //hive.spark.* keys are passed down to the RemoteDriver via REMOTE_DRIVER_CONF
+ // so that they are not used in sparkContext but only in remote driver,
+ //as --properties-file contains the spark.* keys that are meant for SparkConf object.
+ for (String hiveSparkConfKey : RpcConfiguration.HIVE_SPARK_RSC_CONFIGS) {
+ String value = RpcConfiguration.getValue(hiveConf, hiveSparkConfKey);
+ addAppArg(RemoteDriver.REMOTE_DRIVER_CONF);
+ addAppArg(String.format("%s=%s", hiveSparkConfKey, value));
+ }
+
+ return launchDriver(isTesting, rpcServer, clientId);
+ }
+
+ protected abstract Future<Void> launchDriver(String isTesting, RpcServer rpcServer, String
+ clientId) throws IOException;
+
+ protected abstract String getSparkHome();
+
+ protected abstract void addAppArg(String arg);
+
+ protected abstract void addExecutableJar(String jar);
+
+ protected abstract void addPropertiesFile(String absolutePath);
+
+ protected abstract void addClass(String name);
+
+ protected abstract void addJars(String jars);
+
+ protected abstract void addProxyUser(String proxyUser);
+
+ protected abstract void addKeytabAndPrincipal(boolean isDoAsEnabled, String keyTabFile,
+ String principal);
+
+ protected abstract void addNumExecutors(String numOfExecutors);
+
+ protected abstract void addExecutorMemory(String executorMemory);
+
+ protected abstract void addExecutorCores(String executorCores);
+
+ private class ClientProtocol extends BaseProtocol {
+
+ <T extends Serializable> JobHandleImpl<T> submit(Job<T> job, List<JobHandle.Listener<T>> listeners) {
+ final String jobId = UUID.randomUUID().toString();
+ final Promise<T> promise = driverRpc.createPromise();
+ final JobHandleImpl<T> handle =
+ new JobHandleImpl<T>(AbstractSparkClient.this, promise, jobId, listeners);
+ jobs.put(jobId, handle);
+
+ final io.netty.util.concurrent.Future<Void> rpc = driverRpc.call(new JobRequest(jobId, job));
+ LOG.debug("Send JobRequest[{}].", jobId);
+
+ // Link the RPC and the promise so that events from one are propagated to the other as
+ // needed.
+ rpc.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Void>>() {
+ @Override
+ public void operationComplete(io.netty.util.concurrent.Future<Void> f) {
+ if (f.isSuccess()) {
+ // If the spark job finishes before this listener is called, the QUEUED status will not be set
+ handle.changeState(JobHandle.State.QUEUED);
+ } else if (!promise.isDone()) {
+ promise.setFailure(f.cause());
+ }
+ }
+ });
+ promise.addListener(new GenericFutureListener<Promise<T>>() {
+ @Override
+ public void operationComplete(Promise<T> p) {
+ if (jobId != null) {
+ jobs.remove(jobId);
+ }
+ if (p.isCancelled() && !rpc.isDone()) {
+ rpc.cancel(true);
+ }
+ }
+ });
+ return handle;
+ }
+
+ <T extends Serializable> Future<T> run(Job<T> job) {
+ @SuppressWarnings("unchecked")
+ final io.netty.util.concurrent.Future<T> rpc = (io.netty.util.concurrent.Future<T>)
+ driverRpc.call(new SyncJobRequest(job), Serializable.class);
+ return rpc;
+ }
+
+ void cancel(String jobId) {
+ driverRpc.call(new CancelJob(jobId));
+ }
+
+ Future<?> endSession() {
+ return driverRpc.call(new EndSession());
+ }
+
+ private void handle(ChannelHandlerContext ctx, Error msg) {
+ LOG.warn("Error reported from Remote Spark Driver: {}", msg.cause);
+ }
+
+ private void handle(ChannelHandlerContext ctx, JobMetrics msg) {
+ JobHandleImpl<?> handle = jobs.get(msg.jobId);
+ if (handle != null) {
+ handle.getMetrics().addMetrics(msg.sparkJobId, msg.stageId, msg.taskId, msg.metrics);
+ } else {
+ LOG.warn("Received metrics for unknown Spark job {}", msg.sparkJobId);
+ }
+ }
+
+ private void handle(ChannelHandlerContext ctx, JobResult msg) {
+ JobHandleImpl<?> handle = jobs.remove(msg.id);
+ if (handle != null) {
+ LOG.debug("Received result for client job {}", msg.id);
+ handle.setSparkCounters(msg.sparkCounters);
+ Throwable error = msg.error;
+ if (error == null) {
+ handle.setSuccess(msg.result);
+ } else {
+ handle.setFailure(error);
+ }
+ } else {
+ LOG.warn("Received result for unknown client job {}", msg.id);
+ }
+ }
+
+ private void handle(ChannelHandlerContext ctx, JobStarted msg) {
+ JobHandleImpl<?> handle = jobs.get(msg.id);
+ if (handle != null) {
+ handle.changeState(JobHandle.State.STARTED);
+ } else {
+ LOG.warn("Received event for unknown client job {}", msg.id);
+ }
+ }
+
+ private void handle(ChannelHandlerContext ctx, JobSubmitted msg) {
+ JobHandleImpl<?> handle = jobs.get(msg.clientJobId);
+ if (handle != null) {
+ LOG.info("Received Spark job ID: {} for client job {}", msg.sparkJobId, msg.clientJobId);
+ handle.addSparkJobId(msg.sparkJobId);
+ } else {
+ LOG.warn("Received Spark job ID: {} for unknown client job {}", msg.sparkJobId, msg.clientJobId);
+ }
+ }
+
+ @Override
+ protected String name() {
+ return "HiveServer2 to Remote Spark Driver Connection";
+ }
+ }
+
+ private static class AddJarJob implements Job<Serializable> {
+ private static final long serialVersionUID = 1L;
+
+ private final String path;
+
+ AddJarJob() {
+ this(null);
+ }
+
+ AddJarJob(String path) {
+ this.path = path;
+ }
+
+ @Override
+ public Serializable call(JobContext jc) throws Exception {
+ jc.sc().addJar(path);
+ // Following remote job may refer to classes in this jar, and the remote job would be executed
+ // in a different thread, so we add this jar path to JobContext for further usage.
+ jc.getAddedJars().put(path, System.currentTimeMillis());
+ return null;
+ }
+
+ }
+
+ private static class AddFileJob implements Job<Serializable> {
+ private static final long serialVersionUID = 1L;
+
+ private final String path;
+
+ AddFileJob() {
+ this(null);
+ }
+
+ AddFileJob(String path) {
+ this.path = path;
+ }
+
+ @Override
+ public Serializable call(JobContext jc) throws Exception {
+ jc.sc().addFile(path);
+ return null;
+ }
+
+ }
+
+ private static class GetExecutorCountJob implements Job<Integer> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer call(JobContext jc) throws Exception {
+ // minus 1 here otherwise driver is also counted as an executor
+ int count = jc.sc().sc().getExecutorMemoryStatus().size() - 1;
+ return Integer.valueOf(count);
+ }
+
+ }
+
+ private static class GetDefaultParallelismJob implements Job<Integer> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer call(JobContext jc) throws Exception {
+ return jc.sc().sc().defaultParallelism();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java
index 2881252..61489a3 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java
@@ -34,7 +34,7 @@ import org.apache.hive.spark.counter.SparkCounters;
*/
class JobHandleImpl<T extends Serializable> implements JobHandle<T> {
- private final SparkClientImpl client;
+ private final SparkClient client;
private final String jobId;
private final MetricsCollection metrics;
private final Promise<T> promise;
@@ -43,8 +43,8 @@ class JobHandleImpl<T extends Serializable> implements JobHandle<T> {
private volatile State state;
private volatile SparkCounters sparkCounters;
- JobHandleImpl(SparkClientImpl client, Promise<T> promise, String jobId,
- List<Listener<T>> listeners) {
+ JobHandleImpl(SparkClient client, Promise<T> promise, String jobId,
+ List<Listener<T>> listeners) {
this.client = client;
this.jobId = jobId;
this.promise = promise;
@@ -233,7 +233,7 @@ class JobHandleImpl<T extends Serializable> implements JobHandle<T> {
}
}
- /** Last attempt at preventing stray jobs from accumulating in SparkClientImpl. */
+ /** Last attempt at preventing stray jobs from accumulating in SparkClient. */
@Override
protected void finalize() {
if (!isDone()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
index 1922e41..9138899 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
@@ -110,4 +110,11 @@ public interface SparkClient extends Serializable {
* Check if remote context is still active.
*/
boolean isActive();
+
+ /**
+ * Cancel the specified jobId
+ *
+ * @param jobId the jobId to cancel
+ */
+ void cancel(String jobId);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
index 88b5c95..1974e88 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
@@ -18,13 +18,11 @@
package org.apache.hive.spark.client;
import java.io.IOException;
-import java.io.PrintStream;
import java.util.Map;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.spark.client.rpc.RpcServer;
-import org.apache.spark.SparkException;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
@@ -82,10 +80,18 @@ public final class SparkClientFactory {
* @param hiveConf Configuration for Hive, contains hive.* properties.
*/
public static SparkClient createClient(Map<String, String> sparkConf, HiveConf hiveConf,
- String sessionId)
- throws IOException, SparkException {
+ String sessionId) throws IOException {
Preconditions.checkState(server != null,
"Invalid state: Hive on Spark RPC Server has not been initialized");
- return new SparkClientImpl(server, sparkConf, hiveConf, sessionId);
+ switch (hiveConf.getVar(HiveConf.ConfVars.SPARK_CLIENT_TYPE)) {
+ case HiveConf.HIVE_SPARK_SUBMIT_CLIENT:
+ return new SparkSubmitSparkClient(server, sparkConf, hiveConf, sessionId);
+ case HiveConf.HIVE_SPARK_LAUNCHER_CLIENT:
+ return new SparkLauncherSparkClient(server, sparkConf, hiveConf, sessionId);
+ default:
+ throw new IllegalArgumentException("Unknown Hive on Spark launcher type " + hiveConf.getVar(
+ HiveConf.ConfVars.SPARK_CLIENT_TYPE) + " valid options are " +
+ HiveConf.HIVE_SPARK_SUBMIT_CLIENT + " or " + HiveConf.HIVE_SPARK_LAUNCHER_CLIENT);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
deleted file mode 100644
index 847c82b..0000000
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
+++ /dev/null
@@ -1,703 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.spark.client;
-
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
-import static org.apache.hive.spark.client.SparkClientUtilities.HIVE_KRYO_REG_NAME;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Joiner;
-import com.google.common.base.Strings;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.io.Resources;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.Promise;
-
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Serializable;
-import java.io.Writer;
-import java.net.URI;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hive.common.log.LogRedirector;
-import org.apache.hadoop.hive.conf.Constants;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.shims.Utils;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hive.spark.client.rpc.Rpc;
-import org.apache.hive.spark.client.rpc.RpcConfiguration;
-import org.apache.hive.spark.client.rpc.RpcServer;
-import org.apache.spark.SparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-class SparkClientImpl implements SparkClient {
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(SparkClientImpl.class);
-
- private static final long DEFAULT_SHUTDOWN_TIMEOUT = 10000; // In milliseconds
-
- private static final String OSX_TEST_OPTS = "SPARK_OSX_TEST_OPTS";
- private static final String SPARK_HOME_ENV = "SPARK_HOME";
- private static final String SPARK_HOME_KEY = "spark.home";
- private static final String DRIVER_OPTS_KEY = "spark.driver.extraJavaOptions";
- private static final String EXECUTOR_OPTS_KEY = "spark.executor.extraJavaOptions";
- private static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath";
- private static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath";
-
- private final Map<String, String> conf;
- private final HiveConf hiveConf;
- private final Thread driverThread;
- private final Map<String, JobHandleImpl<?>> jobs;
- private final Rpc driverRpc;
- private final ClientProtocol protocol;
- private volatile boolean isAlive;
-
- SparkClientImpl(RpcServer rpcServer, Map<String, String> conf, HiveConf hiveConf,
- String sessionid) throws IOException {
- this.conf = conf;
- this.hiveConf = hiveConf;
- this.jobs = Maps.newConcurrentMap();
-
- String secret = rpcServer.createSecret();
- this.driverThread = startDriver(rpcServer, sessionid, secret);
- this.protocol = new ClientProtocol();
-
- try {
- // The RPC server will take care of timeouts here.
- this.driverRpc = rpcServer.registerClient(sessionid, secret, protocol).get();
- } catch (Throwable e) {
- String errorMsg;
- if (e.getCause() instanceof TimeoutException) {
- errorMsg = "Timed out waiting for Remote Spark Driver to connect to HiveServer2.\nPossible reasons " +
- "include network issues, errors in remote driver, cluster has no available resources, etc." +
- "\nPlease check YARN or Spark driver's logs for further information.";
- } else if (e.getCause() instanceof InterruptedException) {
- errorMsg = "Interrupted while waiting for Remote Spark Driver to connect to HiveServer2.\nIt is possible " +
- "that the query was cancelled which would cause the Spark Session to close.";
- } else {
- errorMsg = "Error while waiting for Remote Spark Driver to connect back to HiveServer2.";
- }
- LOG.error(errorMsg, e);
- driverThread.interrupt();
- try {
- driverThread.join();
- } catch (InterruptedException ie) {
- // Give up.
- LOG.warn("Interrupted before driver thread was finished.", ie);
- }
- throw Throwables.propagate(e);
- }
-
- LOG.info("Successfully connected to Remote Spark Driver at: " + this.driverRpc.getRemoteAddress());
-
- driverRpc.addListener(new Rpc.Listener() {
- @Override
- public void rpcClosed(Rpc rpc) {
- if (isAlive) {
- LOG.warn("Connection to Remote Spark Driver {} closed unexpectedly", driverRpc.getRemoteAddress());
- isAlive = false;
- }
- }
-
- @Override
- public String toString() {
- return "Connection to Remote Spark Driver Closed Unexpectedly";
- }
- });
- isAlive = true;
- }
-
- @Override
- public <T extends Serializable> JobHandle<T> submit(Job<T> job) {
- return protocol.submit(job, Collections.<JobHandle.Listener<T>>emptyList());
- }
-
- @Override
- public <T extends Serializable> JobHandle<T> submit(Job<T> job, List<JobHandle.Listener<T>> listeners) {
- return protocol.submit(job, listeners);
- }
-
- @Override
- public <T extends Serializable> Future<T> run(Job<T> job) {
- return protocol.run(job);
- }
-
- @Override
- public void stop() {
- if (isAlive) {
- isAlive = false;
- try {
- protocol.endSession();
- } catch (Exception e) {
- LOG.warn("Exception while waiting for end session reply.", e);
- } finally {
- driverRpc.close();
- }
- }
-
- long endTime = System.currentTimeMillis() + DEFAULT_SHUTDOWN_TIMEOUT;
- try {
- driverThread.join(DEFAULT_SHUTDOWN_TIMEOUT);
- } catch (InterruptedException ie) {
- LOG.debug("Interrupted before driver thread was finished.");
- }
- if (endTime - System.currentTimeMillis() <= 0) {
- LOG.warn("Timed out shutting down remote driver, interrupting...");
- driverThread.interrupt();
- }
- }
-
- @Override
- public Future<?> addJar(URI uri) {
- return run(new AddJarJob(uri.toString()));
- }
-
- @Override
- public Future<?> addFile(URI uri) {
- return run(new AddFileJob(uri.toString()));
- }
-
- @Override
- public Future<Integer> getExecutorCount() {
- return run(new GetExecutorCountJob());
- }
-
- @Override
- public Future<Integer> getDefaultParallelism() {
- return run(new GetDefaultParallelismJob());
- }
-
- @Override
- public boolean isActive() {
- return isAlive && driverRpc.isActive();
- }
-
- void cancel(String jobId) {
- protocol.cancel(jobId);
- }
-
- private Thread startDriver(final RpcServer rpcServer, final String clientId, final String secret)
- throws IOException {
- Runnable runnable;
- final String serverAddress = rpcServer.getAddress();
- final String serverPort = String.valueOf(rpcServer.getPort());
-
- // If a Spark installation is provided, use the spark-submit script. Otherwise, call the
- // SparkSubmit class directly, which has some caveats (like having to provide a proper
- // version of Guava on the classpath depending on the deploy mode).
- String sparkHome = Strings.emptyToNull(conf.get(SPARK_HOME_KEY));
- if (sparkHome == null) {
- sparkHome = Strings.emptyToNull(System.getenv(SPARK_HOME_ENV));
- }
- if (sparkHome == null) {
- sparkHome = Strings.emptyToNull(System.getProperty(SPARK_HOME_KEY));
- }
- String sparkLogDir = conf.get("hive.spark.log.dir");
- if (sparkLogDir == null) {
- if (sparkHome == null) {
- sparkLogDir = "./target/";
- } else {
- sparkLogDir = sparkHome + "/logs/";
- }
- }
-
- String osxTestOpts = "";
- if (Strings.nullToEmpty(System.getProperty("os.name")).toLowerCase().contains("mac")) {
- osxTestOpts = Strings.nullToEmpty(System.getenv(OSX_TEST_OPTS));
- }
-
- String driverJavaOpts = Joiner.on(" ").skipNulls().join(
- "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(DRIVER_OPTS_KEY));
- String executorJavaOpts = Joiner.on(" ").skipNulls().join(
- "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(EXECUTOR_OPTS_KEY));
-
- // Create a file with all the job properties to be read by spark-submit. Change the
- // file's permissions so that only the owner can read it. This avoid having the
- // connection secret show up in the child process's command line.
- File properties = File.createTempFile("spark-submit.", ".properties");
- if (!properties.setReadable(false) || !properties.setReadable(true, true)) {
- throw new IOException("Cannot change permissions of job properties file.");
- }
- properties.deleteOnExit();
-
- Properties allProps = new Properties();
- // first load the defaults from spark-defaults.conf if available
- try {
- URL sparkDefaultsUrl = Thread.currentThread().getContextClassLoader().getResource("spark-defaults.conf");
- if (sparkDefaultsUrl != null) {
- LOG.info("Loading spark defaults configs from: " + sparkDefaultsUrl);
- allProps.load(new ByteArrayInputStream(Resources.toByteArray(sparkDefaultsUrl)));
- }
- } catch (Exception e) {
- String msg = "Exception trying to load spark-defaults.conf: " + e;
- throw new IOException(msg, e);
- }
- // then load the SparkClientImpl config
- for (Map.Entry<String, String> e : conf.entrySet()) {
- allProps.put(e.getKey(), conf.get(e.getKey()));
- }
- allProps.put(SparkClientFactory.CONF_CLIENT_ID, clientId);
- allProps.put(SparkClientFactory.CONF_KEY_SECRET, secret);
- allProps.put(DRIVER_OPTS_KEY, driverJavaOpts);
- allProps.put(EXECUTOR_OPTS_KEY, executorJavaOpts);
-
- String isTesting = conf.get("spark.testing");
- if (isTesting != null && isTesting.equalsIgnoreCase("true")) {
- String hiveHadoopTestClasspath = Strings.nullToEmpty(System.getenv("HIVE_HADOOP_TEST_CLASSPATH"));
- if (!hiveHadoopTestClasspath.isEmpty()) {
- String extraDriverClasspath = Strings.nullToEmpty((String)allProps.get(DRIVER_EXTRA_CLASSPATH));
- if (extraDriverClasspath.isEmpty()) {
- allProps.put(DRIVER_EXTRA_CLASSPATH, hiveHadoopTestClasspath);
- } else {
- extraDriverClasspath = extraDriverClasspath.endsWith(File.pathSeparator) ? extraDriverClasspath : extraDriverClasspath + File.pathSeparator;
- allProps.put(DRIVER_EXTRA_CLASSPATH, extraDriverClasspath + hiveHadoopTestClasspath);
- }
-
- String extraExecutorClasspath = Strings.nullToEmpty((String)allProps.get(EXECUTOR_EXTRA_CLASSPATH));
- if (extraExecutorClasspath.isEmpty()) {
- allProps.put(EXECUTOR_EXTRA_CLASSPATH, hiveHadoopTestClasspath);
- } else {
- extraExecutorClasspath = extraExecutorClasspath.endsWith(File.pathSeparator) ? extraExecutorClasspath : extraExecutorClasspath + File.pathSeparator;
- allProps.put(EXECUTOR_EXTRA_CLASSPATH, extraExecutorClasspath + hiveHadoopTestClasspath);
- }
- }
- }
-
- Writer writer = new OutputStreamWriter(new FileOutputStream(properties), Charsets.UTF_8);
- try {
- allProps.store(writer, "Spark Context configuration");
- } finally {
- writer.close();
- }
-
- // Define how to pass options to the child process. If launching in client (or local)
- // mode, the driver options need to be passed directly on the command line. Otherwise,
- // SparkSubmit will take care of that for us.
- String master = conf.get("spark.master");
- Preconditions.checkArgument(master != null, "spark.master is not defined.");
- String deployMode = conf.get("spark.submit.deployMode");
-
- List<String> argv = Lists.newLinkedList();
-
- if (sparkHome != null) {
- argv.add(new File(sparkHome, "bin/spark-submit").getAbsolutePath());
- } else {
- LOG.info("No spark.home provided, calling SparkSubmit directly.");
- argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath());
-
- if (master.startsWith("local") || master.startsWith("mesos") ||
- SparkClientUtilities.isYarnClientMode(master, deployMode) ||
- master.startsWith("spark")) {
- String mem = conf.get("spark.driver.memory");
- if (mem != null) {
- argv.add("-Xms" + mem);
- argv.add("-Xmx" + mem);
- }
-
- String cp = conf.get("spark.driver.extraClassPath");
- if (cp != null) {
- argv.add("-classpath");
- argv.add(cp);
- }
-
- String libPath = conf.get("spark.driver.extraLibPath");
- if (libPath != null) {
- argv.add("-Djava.library.path=" + libPath);
- }
-
- String extra = conf.get(DRIVER_OPTS_KEY);
- if (extra != null) {
- for (String opt : extra.split("[ ]")) {
- if (!opt.trim().isEmpty()) {
- argv.add(opt.trim());
- }
- }
- }
- }
-
- argv.add("org.apache.spark.deploy.SparkSubmit");
- }
-
- if (SparkClientUtilities.isYarnClusterMode(master, deployMode)) {
- String executorCores = conf.get("spark.executor.cores");
- if (executorCores != null) {
- argv.add("--executor-cores");
- argv.add(executorCores);
- }
-
- String executorMemory = conf.get("spark.executor.memory");
- if (executorMemory != null) {
- argv.add("--executor-memory");
- argv.add(executorMemory);
- }
-
- String numOfExecutors = conf.get("spark.executor.instances");
- if (numOfExecutors != null) {
- argv.add("--num-executors");
- argv.add(numOfExecutors);
- }
- }
- // The options --principal/--keypad do not work with --proxy-user in spark-submit.sh
- // (see HIVE-15485, SPARK-5493, SPARK-19143), so Hive could only support doAs or
- // delegation token renewal, but not both. Since doAs is a more common case, if both
- // are needed, we choose to favor doAs. So when doAs is enabled, we use kinit command,
- // otherwise, we pass the principal/keypad to spark to support the token renewal for
- // long-running application.
- if ("kerberos".equals(hiveConf.get(HADOOP_SECURITY_AUTHENTICATION))) {
- String principal = SecurityUtil.getServerPrincipal(hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL),
- "0.0.0.0");
- String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
- if (StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(keyTabFile)) {
- if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
- List<String> kinitArgv = Lists.newLinkedList();
- kinitArgv.add("kinit");
- kinitArgv.add(principal);
- kinitArgv.add("-k");
- kinitArgv.add("-t");
- kinitArgv.add(keyTabFile + ";");
- kinitArgv.addAll(argv);
- argv = kinitArgv;
- } else {
- // if doAs is not enabled, we pass the principal/keypad to spark-submit in order to
- // support the possible delegation token renewal in Spark
- argv.add("--principal");
- argv.add(principal);
- argv.add("--keytab");
- argv.add(keyTabFile);
- }
- }
- }
- if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
- try {
- String currentUser = Utils.getUGI().getShortUserName();
- // do not do impersonation in CLI mode
- if (!currentUser.equals(System.getProperty("user.name"))) {
- LOG.info("Attempting impersonation of " + currentUser);
- argv.add("--proxy-user");
- argv.add(currentUser);
- }
- } catch (Exception e) {
- String msg = "Cannot obtain username: " + e;
- throw new IllegalStateException(msg, e);
- }
- }
-
- String regStr = conf.get("spark.kryo.registrator");
- if (HIVE_KRYO_REG_NAME.equals(regStr)) {
- argv.add("--jars");
- argv.add(SparkClientUtilities.findKryoRegistratorJar(hiveConf));
- }
-
- argv.add("--properties-file");
- argv.add(properties.getAbsolutePath());
- argv.add("--class");
- argv.add(RemoteDriver.class.getName());
-
- String jar = "spark-internal";
- if (SparkContext.jarOfClass(this.getClass()).isDefined()) {
- jar = SparkContext.jarOfClass(this.getClass()).get();
- }
- argv.add(jar);
-
- argv.add(RemoteDriver.REMOTE_DRIVER_HOST_CONF);
- argv.add(serverAddress);
- argv.add(RemoteDriver.REMOTE_DRIVER_PORT_CONF);
- argv.add(serverPort);
-
- //hive.spark.* keys are passed down to the RemoteDriver via REMOTE_DRIVER_CONF
- // so that they are not used in sparkContext but only in remote driver,
- //as --properties-file contains the spark.* keys that are meant for SparkConf object.
- for (String hiveSparkConfKey : RpcConfiguration.HIVE_SPARK_RSC_CONFIGS) {
- String value = RpcConfiguration.getValue(hiveConf, hiveSparkConfKey);
- argv.add(RemoteDriver.REMOTE_DRIVER_CONF);
- argv.add(String.format("%s=%s", hiveSparkConfKey, value));
- }
-
- String cmd = Joiner.on(" ").join(argv);
- LOG.info("Running client driver with argv: {}", cmd);
- ProcessBuilder pb = new ProcessBuilder("sh", "-c", cmd);
-
- // Prevent hive configurations from being visible in Spark.
- pb.environment().remove("HIVE_HOME");
- pb.environment().remove("HIVE_CONF_DIR");
- // Add credential provider password to the child process's environment
- // In case of Spark the credential provider location is provided in the jobConf when the job is submitted
- String password = getSparkJobCredentialProviderPassword();
- if(password != null) {
- pb.environment().put(Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR, password);
- }
- if (isTesting != null) {
- pb.environment().put("SPARK_TESTING", isTesting);
- }
-
- final Process child = pb.start();
- String threadName = Thread.currentThread().getName();
- final List<String> childErrorLog = Collections.synchronizedList(new ArrayList<String>());
- final LogRedirector.LogSourceCallback callback = () -> {return isAlive;};
-
- LogRedirector.redirect("spark-submit-stdout-redir-" + threadName,
- new LogRedirector(child.getInputStream(), LOG, callback));
- LogRedirector.redirect("spark-submit-stderr-redir-" + threadName,
- new LogRedirector(child.getErrorStream(), LOG, childErrorLog, callback));
-
- runnable = new Runnable() {
- @Override
- public void run() {
- try {
- int exitCode = child.waitFor();
- if (exitCode != 0) {
- StringBuilder errStr = new StringBuilder();
- synchronized(childErrorLog) {
- Iterator iter = childErrorLog.iterator();
- while(iter.hasNext()){
- errStr.append(iter.next());
- errStr.append('\n');
- }
- }
-
- LOG.warn("Child process exited with code {}", exitCode);
- rpcServer.cancelClient(clientId,
- "Child process (spark-submit) exited before connecting back with error log " + errStr.toString());
- }
- } catch (InterruptedException ie) {
- LOG.warn("Thread waiting on the child process (spark-submit) is interrupted, killing the child process.");
- rpcServer.cancelClient(clientId, "Thread waiting on the child porcess (spark-submit) is interrupted");
- Thread.interrupted();
- child.destroy();
- } catch (Exception e) {
- String errMsg = "Exception while waiting for child process (spark-submit)";
- LOG.warn(errMsg, e);
- rpcServer.cancelClient(clientId, errMsg);
- }
- }
- };
-
- Thread thread = new Thread(runnable);
- thread.setDaemon(true);
- thread.setName("Driver");
- thread.start();
- return thread;
- }
-
- private String getSparkJobCredentialProviderPassword() {
- if (conf.containsKey("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD")) {
- return conf.get("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD");
- } else if (conf.containsKey("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD")) {
- return conf.get("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD");
- }
- return null;
- }
-
- private class ClientProtocol extends BaseProtocol {
-
- <T extends Serializable> JobHandleImpl<T> submit(Job<T> job, List<JobHandle.Listener<T>> listeners) {
- final String jobId = UUID.randomUUID().toString();
- final Promise<T> promise = driverRpc.createPromise();
- final JobHandleImpl<T> handle =
- new JobHandleImpl<T>(SparkClientImpl.this, promise, jobId, listeners);
- jobs.put(jobId, handle);
-
- final io.netty.util.concurrent.Future<Void> rpc = driverRpc.call(new JobRequest(jobId, job));
- LOG.debug("Send JobRequest[{}].", jobId);
-
- // Link the RPC and the promise so that events from one are propagated to the other as
- // needed.
- rpc.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Void>>() {
- @Override
- public void operationComplete(io.netty.util.concurrent.Future<Void> f) {
- if (f.isSuccess()) {
- // If the spark job finishes before this listener is called, the QUEUED status will not be set
- handle.changeState(JobHandle.State.QUEUED);
- } else if (!promise.isDone()) {
- promise.setFailure(f.cause());
- }
- }
- });
- promise.addListener(new GenericFutureListener<Promise<T>>() {
- @Override
- public void operationComplete(Promise<T> p) {
- if (jobId != null) {
- jobs.remove(jobId);
- }
- if (p.isCancelled() && !rpc.isDone()) {
- rpc.cancel(true);
- }
- }
- });
- return handle;
- }
-
- <T extends Serializable> Future<T> run(Job<T> job) {
- @SuppressWarnings("unchecked")
- final io.netty.util.concurrent.Future<T> rpc = (io.netty.util.concurrent.Future<T>)
- driverRpc.call(new SyncJobRequest(job), Serializable.class);
- return rpc;
- }
-
- void cancel(String jobId) {
- driverRpc.call(new CancelJob(jobId));
- }
-
- Future<?> endSession() {
- return driverRpc.call(new EndSession());
- }
-
- private void handle(ChannelHandlerContext ctx, Error msg) {
- LOG.warn("Error reported from Remote Spark Driver: {}", msg.cause);
- }
-
- private void handle(ChannelHandlerContext ctx, JobMetrics msg) {
- JobHandleImpl<?> handle = jobs.get(msg.jobId);
- if (handle != null) {
- handle.getMetrics().addMetrics(msg.sparkJobId, msg.stageId, msg.taskId, msg.metrics);
- } else {
- LOG.warn("Received metrics for unknown Spark job {}", msg.sparkJobId);
- }
- }
-
- private void handle(ChannelHandlerContext ctx, JobResult msg) {
- JobHandleImpl<?> handle = jobs.remove(msg.id);
- if (handle != null) {
- LOG.debug("Received result for client job {}", msg.id);
- handle.setSparkCounters(msg.sparkCounters);
- Throwable error = msg.error;
- if (error == null) {
- handle.setSuccess(msg.result);
- } else {
- handle.setFailure(error);
- }
- } else {
- LOG.warn("Received result for unknown client job {}", msg.id);
- }
- }
-
- private void handle(ChannelHandlerContext ctx, JobStarted msg) {
- JobHandleImpl<?> handle = jobs.get(msg.id);
- if (handle != null) {
- handle.changeState(JobHandle.State.STARTED);
- } else {
- LOG.warn("Received event for unknown client job {}", msg.id);
- }
- }
-
- private void handle(ChannelHandlerContext ctx, JobSubmitted msg) {
- JobHandleImpl<?> handle = jobs.get(msg.clientJobId);
- if (handle != null) {
- LOG.info("Received Spark job ID: {} for client job {}", msg.sparkJobId, msg.clientJobId);
- handle.addSparkJobId(msg.sparkJobId);
- } else {
- LOG.warn("Received Spark job ID: {} for unknown client job {}", msg.sparkJobId, msg.clientJobId);
- }
- }
-
- @Override
- protected String name() {
- return "HiveServer2 to Remote Spark Driver Connection";
- }
- }
-
- private static class AddJarJob implements Job<Serializable> {
- private static final long serialVersionUID = 1L;
-
- private final String path;
-
- AddJarJob() {
- this(null);
- }
-
- AddJarJob(String path) {
- this.path = path;
- }
-
- @Override
- public Serializable call(JobContext jc) throws Exception {
- jc.sc().addJar(path);
- // Following remote job may refer to classes in this jar, and the remote job would be executed
- // in a different thread, so we add this jar path to JobContext for further usage.
- jc.getAddedJars().put(path, System.currentTimeMillis());
- return null;
- }
-
- }
-
- private static class AddFileJob implements Job<Serializable> {
- private static final long serialVersionUID = 1L;
-
- private final String path;
-
- AddFileJob() {
- this(null);
- }
-
- AddFileJob(String path) {
- this.path = path;
- }
-
- @Override
- public Serializable call(JobContext jc) throws Exception {
- jc.sc().addFile(path);
- return null;
- }
-
- }
-
- private static class GetExecutorCountJob implements Job<Integer> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer call(JobContext jc) throws Exception {
- // minus 1 here otherwise driver is also counted as an executor
- int count = jc.sc().sc().getExecutorMemoryStatus().size() - 1;
- return Integer.valueOf(count);
- }
-
- }
-
- private static class GetDefaultParallelismJob implements Job<Integer> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer call(JobContext jc) throws Exception {
- return jc.sc().sc().defaultParallelism();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/main/java/org/apache/hive/spark/client/SparkLauncherSparkClient.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkLauncherSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkLauncherSparkClient.java
new file mode 100644
index 0000000..cf52c4f
--- /dev/null
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkLauncherSparkClient.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.spark.client.rpc.RpcServer;
+
+import org.apache.spark.launcher.AbstractLauncher;
+import org.apache.spark.launcher.InProcessLauncher;
+import org.apache.spark.launcher.SparkAppHandle;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+
+
+/**
+ * Extends the {@link AbstractSparkClient} and uses Spark's
+ * {@link org.apache.spark.launcher.SparkLauncher} to submit the HoS application. Specifically,
+ * it uses the {@link InProcessLauncher} to avoid spawning a sub-process to submit the Spark app.
+ * It uses a {@link Thread} to monitor when the Spark app has been successfully submitted. The
+ * thread can be interrupted, in which case the {@link RpcServer} client will be cancelled and
+ * the Spark app will be stopped.
+ */
+public class SparkLauncherSparkClient extends AbstractSparkClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ SparkLauncherSparkClient.class.getName());
+
+ private static final long serialVersionUID = 2153000661341457380L;
+
+ private static final Set<SparkAppHandle.State> FAILED_SPARK_STATES = Sets.newHashSet(
+ SparkAppHandle.State.FAILED,
+ SparkAppHandle.State.KILLED,
+ SparkAppHandle.State.LOST);
+
+ private AbstractLauncher<InProcessLauncher> sparkLauncher;
+
+ SparkLauncherSparkClient(RpcServer rpcServer,
+ Map<String, String> conf,
+ HiveConf hiveConf,
+ String sessionid) throws IOException {
+ super(rpcServer, conf, hiveConf, sessionid);
+ }
+
+ @Override
+ protected Future<Void> launchDriver(String isTesting, RpcServer rpcServer,
+ String clientId) throws IOException {
+ if (isTesting != null) {
+ System.setProperty("spark.testing", "true");
+ }
+
+ // Only allow the spark.master to be local in unit tests
+ if (isTesting == null) {
+ Preconditions.checkArgument(SparkClientUtilities.isYarnClusterMode(
+ this.conf.get("spark.master"), this.conf.get("spark.submit.deployMode")),
+ getClass().getName() + " is only supported in yarn-cluster mode");
+ }
+
+ // Monitors when the Spark app has been successfully started
+ CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+ // Submit the app
+ SparkAppHandle sparkAppHandle = getSparkLauncher().startApplication(
+ new SparkAppListener(shutdownLatch, rpcServer, clientId));
+
+ return createSparkLauncherFuture(shutdownLatch, sparkAppHandle, rpcServer, clientId);
+ }
+
+ @VisibleForTesting
+ static Future<Void> createSparkLauncherFuture(CountDownLatch shutdownLatch,
+ SparkAppHandle sparkAppHandle, RpcServer rpcServer,
+ String clientId) {
+ // Monitor the countdown latch
+ Callable<Void> runnable = () -> {
+ try {
+ shutdownLatch.await();
+ } catch (InterruptedException e) {
+ rpcServer.cancelClient(clientId, "Spark app launcher interrupted");
+ sparkAppHandle.stop();
+ }
+ return null;
+ };
+
+ FutureTask<Void> futureTask = new FutureTask<>(runnable);
+
+ Thread driverThread = new Thread(futureTask);
+ driverThread.setDaemon(true);
+ driverThread.setName("SparkLauncherMonitor");
+ driverThread.start();
+
+ return futureTask;
+ }
+
+ @Override
+ protected String getSparkHome() {
+ return null;
+ }
+
+ @Override
+ protected void addAppArg(String arg) {
+ getSparkLauncher().addAppArgs(arg);
+ }
+
+ @Override
+ protected void addExecutableJar(String jar) {
+ getSparkLauncher().setAppResource(jar);
+ }
+
+ @Override
+ protected void addPropertiesFile(String absolutePath) {
+ getSparkLauncher().setPropertiesFile(absolutePath);
+ }
+
+ @Override
+ protected void addClass(String name) {
+ getSparkLauncher().setMainClass(name);
+ }
+
+ @Override
+ protected void addJars(String jars) {
+ getSparkLauncher().addJar(jars);
+ }
+
+ @Override
+ protected void addProxyUser(String proxyUser) {
+ throw new UnsupportedOperationException();
+// getSparkLauncher().addSparkArg("--proxy-user", proxyUser);
+ }
+
+ @Override
+ protected void addKeytabAndPrincipal(boolean isDoAsEnabled, String keyTabFile, String principal) {
+ throw new UnsupportedOperationException();
+// getSparkLauncher().addSparkArg("--principal", principal);
+// getSparkLauncher().addSparkArg("--keytab", keyTabFile);
+ }
+
+ @Override
+ protected void addNumExecutors(String numOfExecutors) {
+ getSparkLauncher().addSparkArg("--num-executors", numOfExecutors);
+ }
+
+ @Override
+ protected void addExecutorMemory(String executorMemory) {
+ getSparkLauncher().addSparkArg("--executor-memory", executorMemory);
+ }
+
+ @Override
+ protected void addExecutorCores(String executorCores) {
+ getSparkLauncher().addSparkArg("--executor-cores", executorCores);
+ }
+
+ private AbstractLauncher<InProcessLauncher> getSparkLauncher() {
+ if (this.sparkLauncher == null) {
+ this.sparkLauncher = new InProcessLauncher();
+ }
+ return this.sparkLauncher;
+ }
+
+ @VisibleForTesting
+ static final class SparkAppListener implements SparkAppHandle.Listener {
+
+ private final CountDownLatch shutdownLatch;
+ private final RpcServer rpcServer;
+ private final String clientId;
+
+ SparkAppListener(CountDownLatch shutdownLatch, RpcServer rpcServer, String clientId) {
+ this.shutdownLatch = shutdownLatch;
+ this.rpcServer = rpcServer;
+ this.clientId = clientId;
+ }
+
+ @Override
+ public void stateChanged(SparkAppHandle sparkAppHandle) {
+ LOG.info("Spark app transitioned to state = " + sparkAppHandle.getState());
+ if (sparkAppHandle.getState().isFinal() || sparkAppHandle.getState().equals(
+ SparkAppHandle.State.RUNNING)) {
+ this.shutdownLatch.countDown();
+ sparkAppHandle.disconnect();
+ LOG.info("Successfully disconnected from Spark app handle");
+ }
+ if (FAILED_SPARK_STATES.contains(sparkAppHandle.getState())) {
+ this.rpcServer.cancelClient(this.clientId, "Spark app launcher failed," +
+ " transitioned to state " + sparkAppHandle.getState());
+ }
+ }
+
+ @Override
+ public void infoChanged(SparkAppHandle sparkAppHandle) {
+ // Do nothing
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
new file mode 100644
index 0000000..1a524b9
--- /dev/null
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+
+import org.apache.hadoop.hive.common.log.LogRedirector;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.spark.client.rpc.RpcServer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Extends the {@link AbstractSparkClient} and launches a child process to run Spark's {@code
+ * bin/spark-submit} script. Logs are re-directed from the child process logs.
+ */
+class SparkSubmitSparkClient extends AbstractSparkClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SparkSubmitSparkClient.class);
+
+ private static final String SPARK_HOME_ENV = "SPARK_HOME";
+ private static final String SPARK_HOME_KEY = "spark.home";
+
+ private static final long serialVersionUID = -4272763023516238171L;
+
+ private List<String> argv;
+
+ SparkSubmitSparkClient(RpcServer rpcServer, Map<String, String> conf, HiveConf hiveConf,
+ String sessionid) throws IOException {
+ super(rpcServer, conf, hiveConf, sessionid);
+ }
+
+ @Override
+ protected String getSparkHome() {
+ String sparkHome = Strings.emptyToNull(conf.get(SPARK_HOME_KEY));
+ if (sparkHome == null) {
+ sparkHome = Strings.emptyToNull(System.getenv(SPARK_HOME_ENV));
+ }
+ if (sparkHome == null) {
+ sparkHome = Strings.emptyToNull(System.getProperty(SPARK_HOME_KEY));
+ }
+
+ Preconditions.checkNotNull(sparkHome, "Cannot use " + HiveConf.HIVE_SPARK_SUBMIT_CLIENT +
+ " without setting Spark Home");
+ String master = conf.get("spark.master");
+ Preconditions.checkArgument(master != null, "spark.master is not defined.");
+
+ argv = Lists.newLinkedList();
+ argv.add(new File(sparkHome, "bin/spark-submit").getAbsolutePath());
+
+ return sparkHome;
+ }
+
+ @Override
+ protected void addAppArg(String arg) {
+ argv.add(arg);
+ }
+
+ @Override
+ protected void addExecutableJar(String jar) {
+ argv.add(jar);
+ }
+
+ @Override
+ protected void addPropertiesFile(String absolutePath) {
+ argv.add("--properties-file");
+ argv.add(absolutePath);
+ }
+
+ @Override
+ protected void addClass(String name) {
+ argv.add("--class");
+ argv.add(RemoteDriver.class.getName());
+ }
+
+ @Override
+ protected void addJars(String jars) {
+ argv.add("--jars");
+ argv.add(jars);
+ }
+
+ @Override
+ protected void addProxyUser(String proxyUser) {
+ argv.add("--proxy-user");
+ argv.add(proxyUser);
+ }
+
+ @Override
+ protected void addKeytabAndPrincipal(boolean isDoAsEnabled, String keyTabFile, String principal) {
+ if (isDoAsEnabled) {
+ List<String> kinitArgv = Lists.newLinkedList();
+ kinitArgv.add("kinit");
+ kinitArgv.add(principal);
+ kinitArgv.add("-k");
+ kinitArgv.add("-t");
+ kinitArgv.add(keyTabFile + ";");
+ kinitArgv.addAll(argv);
+ argv = kinitArgv;
+ } else {
+ // if doAs is not enabled, we pass the principal/keypad to spark-submit in order to
+ // support the possible delegation token renewal in Spark
+ argv.add("--principal");
+ argv.add(principal);
+ argv.add("--keytab");
+ argv.add(keyTabFile);
+ }
+ }
+
+ @Override
+ protected void addNumExecutors(String numOfExecutors) {
+ argv.add("--num-executors");
+ argv.add(numOfExecutors);
+ }
+
+ @Override
+ protected void addExecutorMemory(String executorMemory) {
+ argv.add("--executor-memory");
+ argv.add(executorMemory);
+ }
+
+ @Override
+ protected void addExecutorCores(String executorCores) {
+ argv.add("--executor-cores");
+ argv.add(executorCores);
+ }
+
+ private String getSparkJobCredentialProviderPassword() {
+ if (conf.containsKey("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD")) {
+ return conf.get("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD");
+ } else if (conf.containsKey("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD")) {
+ return conf.get("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD");
+ }
+ return null;
+ }
+
+ @Override
+ protected Future<Void> launchDriver(String isTesting, RpcServer rpcServer, String clientId) throws
+ IOException {
+ Callable<Void> runnable;
+
+ String cmd = Joiner.on(" ").join(argv);
+ LOG.info("Running client driver with argv: {}", cmd);
+ ProcessBuilder pb = new ProcessBuilder("sh", "-c", cmd);
+
+ // Prevent hive configurations from being visible in Spark.
+ pb.environment().remove("HIVE_HOME");
+ pb.environment().remove("HIVE_CONF_DIR");
+ // Add credential provider password to the child process's environment
+ // In case of Spark the credential provider location is provided in the jobConf when the job is submitted
+ String password = getSparkJobCredentialProviderPassword();
+ if(password != null) {
+ pb.environment().put(Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR, password);
+ }
+ if (isTesting != null) {
+ pb.environment().put("SPARK_TESTING", isTesting);
+ }
+
+ final Process child = pb.start();
+ String threadName = Thread.currentThread().getName();
+ final List<String> childErrorLog = Collections.synchronizedList(new ArrayList<String>());
+ final LogRedirector.LogSourceCallback callback = () -> isAlive;
+
+ LogRedirector.redirect("spark-submit-stdout-redir-" + threadName,
+ new LogRedirector(child.getInputStream(), LOG, callback));
+ LogRedirector.redirect("spark-submit-stderr-redir-" + threadName,
+ new LogRedirector(child.getErrorStream(), LOG, childErrorLog, callback));
+
+ runnable = () -> {
+ try {
+ int exitCode = child.waitFor();
+ if (exitCode != 0) {
+ StringBuilder errStr = new StringBuilder();
+ synchronized(childErrorLog) {
+ for (Object aChildErrorLog : childErrorLog) {
+ errStr.append(aChildErrorLog);
+ errStr.append('\n');
+ }
+ }
+
+ LOG.warn("Child process exited with code {}", exitCode);
+ rpcServer.cancelClient(clientId,
+ "Child process (spark-submit) exited before connecting back with error log " + errStr.toString());
+ }
+ } catch (InterruptedException ie) {
+ LOG.warn("Thread waiting on the child process (spark-submit) is interrupted, killing the child process.");
+ rpcServer.cancelClient(clientId, "Thread waiting on the child process (spark-submit) is interrupted");
+ Thread.interrupted();
+ child.destroy();
+ } catch (Exception e) {
+ String errMsg = "Exception while waiting for child process (spark-submit)";
+ LOG.warn(errMsg, e);
+ rpcServer.cancelClient(clientId, errMsg);
+ }
+ return null;
+ };
+
+ FutureTask<Void> futureTask = new FutureTask<>(runnable);
+
+ Thread driverThread = new Thread(futureTask);
+ driverThread.setDaemon(true);
+ driverThread.setName("SparkSubmitMonitor");
+ driverThread.start();
+
+ return futureTask;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java
----------------------------------------------------------------------
diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java
index d6b627b..b81a34b 100644
--- a/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java
+++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java
@@ -32,7 +32,7 @@ import static org.mockito.Mockito.*;
@RunWith(MockitoJUnitRunner.class)
public class TestJobHandle {
- @Mock private SparkClientImpl client;
+ @Mock private SparkClient client;
@Mock private Promise<Serializable> promise;
@Mock private JobHandle.Listener<Serializable> listener;
@Mock private JobHandle.Listener<Serializable> listener2;
http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
----------------------------------------------------------------------
diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
index c134625..681463e 100644
--- a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
+++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
@@ -34,7 +34,6 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
-import java.io.PrintStream;
import java.io.Serializable;
import java.net.URI;
import java.nio.file.Paths;
@@ -54,7 +53,7 @@ import com.google.common.base.Strings;
import com.google.common.io.ByteStreams;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.spark.counter.SparkCounters;
-import org.apache.spark.SparkException;
+import org.apache.spark.SparkContext$;
import org.apache.spark.SparkFiles;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaRDD;
@@ -72,6 +71,7 @@ public class TestSparkClient {
static {
HIVECONF.set("hive.spark.client.connect.timeout", "30000ms");
+ HIVECONF.setVar(HiveConf.ConfVars.SPARK_CLIENT_TYPE, HiveConf.HIVE_SPARK_LAUNCHER_CLIENT);
}
private Map<String, String> createConf() {
@@ -82,6 +82,7 @@ public class TestSparkClient {
conf.put("spark.app.name", "SparkClientSuite Remote App");
conf.put("spark.driver.extraClassPath", classpath);
conf.put("spark.executor.extraClassPath", classpath);
+ conf.put("spark.testing", "true");
if (!Strings.isNullOrEmpty(System.getProperty("spark.home"))) {
conf.put("spark.home", System.getProperty("spark.home"));
@@ -342,6 +343,26 @@ public class TestSparkClient {
client.stop();
}
SparkClientFactory.stop();
+ waitForSparkContextShutdown();
+ }
+ }
+
+ /**
+ * This was added to avoid a race condition where we try to create multiple SparkContexts in
+ * the same process. Since spark.master = local everything is run in the same JVM. Since we
+ * don't wait for the RemoteDriver to shutdown it's SparkContext, its possible that we finish a
+ * test before the SparkContext has been shutdown. In order to avoid the multiple SparkContexts
+ * in a single JVM exception, we wait for the SparkContext to shutdown after each test.
+ */
+ private void waitForSparkContextShutdown() throws InterruptedException {
+ for (int i = 0; i < 100; i++) {
+ if (SparkContext$.MODULE$.getActive().isEmpty()) {
+ break;
+ }
+ Thread.sleep(100);
+ }
+ if (!SparkContext$.MODULE$.getActive().isEmpty()) {
+ throw new IllegalStateException("SparkContext did not shutdown in time");
}
}