You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2018/06/05 13:20:46 UTC

[1/2] hive git commit: HIVE-18533: Add option to use InProcessLauncher to submit spark jobs (Sahil Takiar, reviewed by Rui Li)

Repository: hive
Updated Branches:
  refs/heads/master f80cff9ab -> da6638666


http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkLauncherSparkClient.java
----------------------------------------------------------------------
diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkLauncherSparkClient.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkLauncherSparkClient.java
new file mode 100644
index 0000000..8434fa9
--- /dev/null
+++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkLauncherSparkClient.java
@@ -0,0 +1,77 @@
+package org.apache.hive.spark.client;
+
+import org.apache.hive.spark.client.rpc.RpcServer;
+import org.apache.spark.launcher.SparkAppHandle;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestSparkLauncherSparkClient {
+
+  @Test
+  public void testSparkLauncherFutureGet() {
+    testChainOfStates(SparkAppHandle.State.CONNECTED, SparkAppHandle.State.SUBMITTED,
+            SparkAppHandle.State.RUNNING);
+    testChainOfStates(SparkAppHandle.State.CONNECTED, SparkAppHandle.State.SUBMITTED,
+            SparkAppHandle.State.FINISHED);
+    testChainOfStates(SparkAppHandle.State.CONNECTED, SparkAppHandle.State.SUBMITTED,
+            SparkAppHandle.State.FAILED);
+    testChainOfStates(SparkAppHandle.State.CONNECTED, SparkAppHandle.State.SUBMITTED,
+            SparkAppHandle.State.KILLED);
+
+    testChainOfStates(SparkAppHandle.State.LOST);
+    testChainOfStates(SparkAppHandle.State.CONNECTED, SparkAppHandle.State.LOST);
+    testChainOfStates(SparkAppHandle.State.CONNECTED, SparkAppHandle.State.SUBMITTED,
+            SparkAppHandle.State.LOST);
+  }
+
+  private void testChainOfStates(SparkAppHandle.State... states) {
+    SparkAppHandle sparkAppHandle = mock(SparkAppHandle.class);
+    RpcServer rpcServer = mock(RpcServer.class);
+    String clientId = "";
+
+    CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+    SparkLauncherSparkClient.SparkAppListener sparkAppListener = new SparkLauncherSparkClient.SparkAppListener(
+            shutdownLatch, rpcServer, clientId);
+    Future<Void> sparkLauncherFuture = SparkLauncherSparkClient.createSparkLauncherFuture(
+            shutdownLatch, sparkAppHandle, rpcServer, clientId);
+
+    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+      try {
+        sparkLauncherFuture.get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new RuntimeException(e);
+      }
+    });
+
+    for (int i = 0; i < states.length - 1; i++) {
+      when(sparkAppHandle.getState()).thenReturn(states[i]);
+      sparkAppListener.stateChanged(sparkAppHandle);
+      Assert.assertTrue(!future.isDone());
+    }
+
+    when(sparkAppHandle.getState()).thenReturn(states[states.length - 1]);
+    sparkAppListener.stateChanged(sparkAppHandle);
+    try {
+      future.get(60, TimeUnit.SECONDS);
+    } catch (InterruptedException | ExecutionException | TimeoutException e) {
+      throw new RuntimeException("SparkLauncherFuture failed to complete after transitioning to " +
+              "state " + states[states.length - 1], e);
+    }
+    Assert.assertTrue(future.isDone());
+    Assert.assertEquals(shutdownLatch.getCount(), 0);
+    verify(sparkAppHandle).disconnect();
+  }
+}


[2/2] hive git commit: HIVE-18533: Add option to use InProcessLauncher to submit spark jobs (Sahil Takiar, reviewed by Rui Li)

Posted by st...@apache.org.
HIVE-18533: Add option to use InProcessLauncher to submit spark jobs (Sahil Takiar, reviewed by Rui Li)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/da663866
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/da663866
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/da663866

Branch: refs/heads/master
Commit: da66386662fbbcbde9501b4a7b27d076bcc790d4
Parents: f80cff9
Author: Sahil Takiar <ta...@gmail.com>
Authored: Tue Jun 5 08:00:54 2018 -0500
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Tue Jun 5 08:00:54 2018 -0500

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  12 +-
 itests/qtest-spark/pom.xml                      |   6 +
 .../test/resources/testconfiguration.properties |   3 +-
 .../clientpositive/spark_in_process_launcher.q  |   6 +
 .../spark/spark_in_process_launcher.q.out       |  96 +++
 spark-client/pom.xml                            |   6 +
 .../hive/spark/client/AbstractSparkClient.java  | 600 ++++++++++++++++
 .../apache/hive/spark/client/JobHandleImpl.java |   8 +-
 .../apache/hive/spark/client/SparkClient.java   |   7 +
 .../hive/spark/client/SparkClientFactory.java   |  16 +-
 .../hive/spark/client/SparkClientImpl.java      | 703 -------------------
 .../spark/client/SparkLauncherSparkClient.java  | 220 ++++++
 .../spark/client/SparkSubmitSparkClient.java    | 237 +++++++
 .../apache/hive/spark/client/TestJobHandle.java |   2 +-
 .../hive/spark/client/TestSparkClient.java      |  25 +-
 .../client/TestSparkLauncherSparkClient.java    |  77 ++
 16 files changed, 1306 insertions(+), 718 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 3295d1d..56d2de0 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -316,6 +316,9 @@ public class HiveConf extends Configuration {
   public static final String HIVE_SERVER2_AUTHENTICATION_LDAP_USERMEMBERSHIPKEY_NAME =
       "hive.server2.authentication.ldap.userMembershipKey";
 
+  public static final String HIVE_SPARK_SUBMIT_CLIENT = "spark-submit";
+  public static final String HIVE_SPARK_LAUNCHER_CLIENT = "spark-launcher";
+
   /**
    * dbVars are the parameters can be set per database. If these
    * parameters are set as a database property, when switching to that
@@ -4245,6 +4248,11 @@ public class HiveConf extends Configuration {
             "If a Spark job contains more tasks than the maximum, it will be cancelled. A value of -1 means no limit."),
     SPARK_STAGE_MAX_TASKS("hive.spark.stage.max.tasks", -1, "The maximum number of tasks a stage in a Spark job may have.\n" +
         "If a Spark job stage contains more tasks than the maximum, the job will be cancelled. A value of -1 means no limit."),
+    SPARK_CLIENT_TYPE("hive.spark.client.type", HIVE_SPARK_SUBMIT_CLIENT,
+        "Controls how the Spark application is launched. If " + HIVE_SPARK_SUBMIT_CLIENT  + " is " +
+        "specified (default) then the spark-submit shell script is used to launch the Spark " +
+        "app. If " + HIVE_SPARK_LAUNCHER_CLIENT + " is specified then Spark's " +
+        "InProcessLauncher is used to programmatically launch the app."),
     NWAYJOINREORDER("hive.reorder.nway.joins", true,
       "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"),
     HIVE_MERGE_NWAY_JOINS("hive.merge.nway.joins", true,
@@ -4335,7 +4343,8 @@ public class HiveConf extends Configuration {
         "Comma separated list of variables which are used internally and should not be configurable."),
     HIVE_SPARK_RSC_CONF_LIST("hive.spark.rsc.conf.list",
         SPARK_OPTIMIZE_SHUFFLE_SERDE.varname + "," +
-            SPARK_CLIENT_FUTURE_TIMEOUT.varname,
+        SPARK_CLIENT_FUTURE_TIMEOUT.varname + "," +
+        SPARK_CLIENT_TYPE.varname,
         "Comma separated list of variables which are related to remote spark context.\n" +
             "Changing these variables will result in re-creating the spark session."),
     HIVE_QUERY_TIMEOUT_SECONDS("hive.query.timeout.seconds", "0s",
@@ -5802,5 +5811,4 @@ public class HiveConf extends Configuration {
     }
     return ret;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/itests/qtest-spark/pom.xml
----------------------------------------------------------------------
diff --git a/itests/qtest-spark/pom.xml b/itests/qtest-spark/pom.xml
index d0e7eb8..8ed3171 100644
--- a/itests/qtest-spark/pom.xml
+++ b/itests/qtest-spark/pom.xml
@@ -64,6 +64,12 @@
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-yarn_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.eclipse.jetty</groupId>
       <artifactId>jetty-util</artifactId>
       <version>${jetty.version}</version>

http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index f3cb9de..b6a1c9b 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1586,7 +1586,8 @@ miniSparkOnYarn.only.query.files=spark_combine_equivalent_work.q,\
   spark_use_ts_stats_for_mapjoin.q,\
   spark_use_op_stats.q,\
   spark_explain_groupbyshuffle.q,\
-  spark_opt_shuffle_serde.q
+  spark_opt_shuffle_serde.q,\
+  spark_in_process_launcher.q
 
 miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\
   bucket4.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/ql/src/test/queries/clientpositive/spark_in_process_launcher.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/spark_in_process_launcher.q b/ql/src/test/queries/clientpositive/spark_in_process_launcher.q
new file mode 100644
index 0000000..368e135
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/spark_in_process_launcher.q
@@ -0,0 +1,6 @@
+--! qt:dataset:src
+
+set hive.spark.client.type=spark-launcher;
+
+explain select key, count(*) from src group by key order by key limit 10;
+select key, count(*) from src group by key order by key limit 10;

http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/ql/src/test/results/clientpositive/spark/spark_in_process_launcher.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/spark_in_process_launcher.q.out b/ql/src/test/results/clientpositive/spark/spark_in_process_launcher.q.out
new file mode 100644
index 0000000..5ae2f26
--- /dev/null
+++ b/ql/src/test/results/clientpositive/spark/spark_in_process_launcher.q.out
@@ -0,0 +1,96 @@
+PREHOOK: query: explain select key, count(*) from src group by key order by key limit 10
+PREHOOK: type: QUERY
+POSTHOOK: query: explain select key, count(*) from src group by key order by key limit 10
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Spark
+      Edges:
+        Reducer 2 <- Map 1 (GROUP, 4)
+        Reducer 3 <- Reducer 2 (SORT, 1)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: src
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: key (type: string)
+                    outputColumnNames: key
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                    Group By Operator
+                      aggregations: count()
+                      keys: key (type: string)
+                      mode: hash
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                        TopN Hash Memory Usage: 0.1
+                        value expressions: _col1 (type: bigint)
+            Execution mode: vectorized
+        Reducer 2 
+            Execution mode: vectorized
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0)
+                keys: KEY._col0 (type: string)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+                Reduce Output Operator
+                  key expressions: _col0 (type: string)
+                  sort order: +
+                  Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+                  TopN Hash Memory Usage: 0.1
+                  value expressions: _col1 (type: bigint)
+        Reducer 3 
+            Execution mode: vectorized
+            Reduce Operator Tree:
+              Select Operator
+                expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: bigint)
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+                Limit
+                  Number of rows: 10
+                  Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: 10
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select key, count(*) from src group by key order by key limit 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select key, count(*) from src group by key order by key limit 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+0	3
+10	1
+100	2
+103	2
+104	2
+105	1
+11	1
+111	1
+113	2
+114	1

http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/pom.xml
----------------------------------------------------------------------
diff --git a/spark-client/pom.xml b/spark-client/pom.xml
index c4d8178..9a1e1c2 100644
--- a/spark-client/pom.xml
+++ b/spark-client/pom.xml
@@ -100,6 +100,12 @@
       </exclusions>
    </dependency>
     <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-yarn_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java
new file mode 100644
index 0000000..ed9222c
--- /dev/null
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static org.apache.hive.spark.client.SparkClientUtilities.HIVE_KRYO_REG_NAME;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import com.google.common.io.Resources;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.Promise;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.io.Writer;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hive.spark.client.rpc.Rpc;
+import org.apache.hive.spark.client.rpc.RpcConfiguration;
+import org.apache.hive.spark.client.rpc.RpcServer;
+import org.apache.spark.SparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An abstract implementation of {@link SparkClient} that allows sub-classes to override how the
+ * spark application is launched. It provides the following functionality: (1) creating the client
+ * connection to the {@link RemoteDriver} and managing its lifecycle, (2) monitoring the thread
+ * used to submit the Spark application, (3) safe shutdown of the {@link RemoteDriver}, and (4)
+ * configuration handling for submitting the Spark application.
+ *
+ * <p>
+ *   This class contains the client protocol used to communicate with the {@link RemoteDriver}.
+ *   It uses this protocol to submit {@link Job}s to the {@link RemoteDriver}.
+ * </p>
+ */
+abstract class AbstractSparkClient implements SparkClient {
+
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractSparkClient.class);
+
+  private static final long DEFAULT_SHUTDOWN_TIMEOUT = 10000; // In milliseconds
+
+  private static final String OSX_TEST_OPTS = "SPARK_OSX_TEST_OPTS";
+  private static final String DRIVER_OPTS_KEY = "spark.driver.extraJavaOptions";
+  private static final String EXECUTOR_OPTS_KEY = "spark.executor.extraJavaOptions";
+  private static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath";
+  private static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath";
+  private static final String SPARK_DEPLOY_MODE = "spark.submit.deployMode";
+
+  protected final Map<String, String> conf;
+  private final HiveConf hiveConf;
+  private final Future<Void> driverFuture;
+  private final Map<String, JobHandleImpl<?>> jobs;
+  private final Rpc driverRpc;
+  private final ClientProtocol protocol;
+  protected volatile boolean isAlive;
+
+  protected AbstractSparkClient(RpcServer rpcServer, Map<String, String> conf, HiveConf hiveConf,
+                  String sessionid) throws IOException {
+    this.conf = conf;
+    this.hiveConf = hiveConf;
+    this.jobs = Maps.newConcurrentMap();
+
+    String secret = rpcServer.createSecret();
+    this.driverFuture = startDriver(rpcServer, sessionid, secret);
+    this.protocol = new ClientProtocol();
+
+    try {
+      // The RPC server will take care of timeouts here.
+      this.driverRpc = rpcServer.registerClient(sessionid, secret, protocol).get();
+    } catch (Throwable e) {
+      String errorMsg;
+      if (e.getCause() instanceof TimeoutException) {
+        errorMsg = "Timed out waiting for Remote Spark Driver to connect to HiveServer2.\nPossible reasons " +
+            "include network issues, errors in remote driver, cluster has no available resources, etc." +
+            "\nPlease check YARN or Spark driver's logs for further information.";
+      } else if (e.getCause() instanceof InterruptedException) {
+        errorMsg = "Interrupted while waiting for Remote Spark Driver to connect to HiveServer2.\nIt is possible " +
+            "that the query was cancelled which would cause the Spark Session to close.";
+      } else {
+        errorMsg = "Error while waiting for Remote Spark Driver to connect back to HiveServer2.";
+      }
+      LOG.error(errorMsg, e);
+      driverFuture.cancel(true);
+      try {
+        driverFuture.get();
+      } catch (InterruptedException ie) {
+        // Give up.
+        LOG.warn("Interrupted before driver thread was finished.", ie);
+      } catch (ExecutionException ee) {
+        LOG.error("Driver thread failed", ee);
+      }
+      throw Throwables.propagate(e);
+    }
+
+    LOG.info("Successfully connected to Remote Spark Driver at: " + this.driverRpc.getRemoteAddress());
+
+    driverRpc.addListener(new Rpc.Listener() {
+        @Override
+        public void rpcClosed(Rpc rpc) {
+          if (isAlive) {
+            LOG.warn("Connection to Remote Spark Driver {} closed unexpectedly", driverRpc.getRemoteAddress());
+            isAlive = false;
+          }
+        }
+
+        @Override
+        public String toString() {
+          return "Connection to Remote Spark Driver Closed Unexpectedly";
+        }
+    });
+    isAlive = true;
+  }
+
+  @Override
+  public <T extends Serializable> JobHandle<T> submit(Job<T> job) {
+    return protocol.submit(job, Collections.<JobHandle.Listener<T>>emptyList());
+  }
+
+  @Override
+  public <T extends Serializable> JobHandle<T> submit(Job<T> job, List<JobHandle.Listener<T>> listeners) {
+    return protocol.submit(job, listeners);
+  }
+
+  @Override
+  public <T extends Serializable> Future<T> run(Job<T> job) {
+    return protocol.run(job);
+  }
+
+  @Override
+  public void stop() {
+    if (isAlive) {
+      isAlive = false;
+      try {
+        protocol.endSession();
+      } catch (Exception e) {
+        LOG.warn("Exception while waiting for end session reply.", e);
+      } finally {
+        driverRpc.close();
+      }
+    }
+
+    try {
+      driverFuture.get(DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+    } catch (ExecutionException e) {
+      LOG.error("Exception while waiting for driver future to complete", e);
+    } catch (TimeoutException e) {
+      LOG.warn("Timed out shutting down remote driver, cancelling...");
+      driverFuture.cancel(true);
+    } catch (InterruptedException ie) {
+      LOG.debug("Interrupted before driver thread was finished.");
+      driverFuture.cancel(true);
+    }
+  }
+
+  @Override
+  public Future<?> addJar(URI uri) {
+    return run(new AddJarJob(uri.toString()));
+  }
+
+  @Override
+  public Future<?> addFile(URI uri) {
+    return run(new AddFileJob(uri.toString()));
+  }
+
+  @Override
+  public Future<Integer> getExecutorCount() {
+    return run(new GetExecutorCountJob());
+  }
+
+  @Override
+  public Future<Integer> getDefaultParallelism() {
+    return run(new GetDefaultParallelismJob());
+  }
+
+  @Override
+  public boolean isActive() {
+    return isAlive && driverRpc.isActive();
+  }
+
+  @Override
+  public void cancel(String jobId) {
+    protocol.cancel(jobId);
+  }
+
+  private Future<Void> startDriver(final RpcServer rpcServer, final String clientId,
+                                   final String secret) throws IOException {
+    final String serverAddress = rpcServer.getAddress();
+    final String serverPort = String.valueOf(rpcServer.getPort());
+
+    String sparkHome = getSparkHome();
+
+    String sparkLogDir = conf.get("hive.spark.log.dir");
+    if (sparkLogDir == null) {
+      if (sparkHome == null) {
+        sparkLogDir = "./target/";
+      } else {
+        sparkLogDir = sparkHome + "/logs/";
+      }
+    }
+
+    String osxTestOpts = "";
+    if (Strings.nullToEmpty(System.getProperty("os.name")).toLowerCase().contains("mac")) {
+      osxTestOpts = Strings.nullToEmpty(System.getenv(OSX_TEST_OPTS));
+    }
+
+    String driverJavaOpts = Joiner.on(" ").skipNulls().join(
+        "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(DRIVER_OPTS_KEY));
+    String executorJavaOpts = Joiner.on(" ").skipNulls().join(
+        "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(EXECUTOR_OPTS_KEY));
+
+    // Create a file with all the job properties to be read by spark-submit. Change the
+    // file's permissions so that only the owner can read it. This avoid having the
+    // connection secret show up in the child process's command line.
+    File properties = File.createTempFile("spark-submit.", ".properties");
+    if (!properties.setReadable(false) || !properties.setReadable(true, true)) {
+      throw new IOException("Cannot change permissions of job properties file.");
+    }
+    properties.deleteOnExit();
+
+    Properties allProps = new Properties();
+    // first load the defaults from spark-defaults.conf if available
+    try {
+      URL sparkDefaultsUrl = Thread.currentThread().getContextClassLoader().getResource("spark-defaults.conf");
+      if (sparkDefaultsUrl != null) {
+        LOG.info("Loading spark defaults configs from: " + sparkDefaultsUrl);
+        allProps.load(new ByteArrayInputStream(Resources.toByteArray(sparkDefaultsUrl)));
+      }
+    } catch (Exception e) {
+      String msg = "Exception trying to load spark-defaults.conf: " + e;
+      throw new IOException(msg, e);
+    }
+    // then load the SparkClientImpl config
+    for (Map.Entry<String, String> e : conf.entrySet()) {
+      allProps.put(e.getKey(), conf.get(e.getKey()));
+    }
+    allProps.put(SparkClientFactory.CONF_CLIENT_ID, clientId);
+    allProps.put(SparkClientFactory.CONF_KEY_SECRET, secret);
+    allProps.put(DRIVER_OPTS_KEY, driverJavaOpts);
+    allProps.put(EXECUTOR_OPTS_KEY, executorJavaOpts);
+
+    String isTesting = conf.get("spark.testing");
+    if (isTesting != null && isTesting.equalsIgnoreCase("true")) {
+      String hiveHadoopTestClasspath = Strings.nullToEmpty(System.getenv("HIVE_HADOOP_TEST_CLASSPATH"));
+      if (!hiveHadoopTestClasspath.isEmpty()) {
+        String extraDriverClasspath = Strings.nullToEmpty((String)allProps.get(DRIVER_EXTRA_CLASSPATH));
+        if (extraDriverClasspath.isEmpty()) {
+          allProps.put(DRIVER_EXTRA_CLASSPATH, hiveHadoopTestClasspath);
+        } else {
+          extraDriverClasspath = extraDriverClasspath.endsWith(File.pathSeparator) ? extraDriverClasspath : extraDriverClasspath + File.pathSeparator;
+          allProps.put(DRIVER_EXTRA_CLASSPATH, extraDriverClasspath + hiveHadoopTestClasspath);
+        }
+
+        String extraExecutorClasspath = Strings.nullToEmpty((String)allProps.get(EXECUTOR_EXTRA_CLASSPATH));
+        if (extraExecutorClasspath.isEmpty()) {
+          allProps.put(EXECUTOR_EXTRA_CLASSPATH, hiveHadoopTestClasspath);
+        } else {
+          extraExecutorClasspath = extraExecutorClasspath.endsWith(File.pathSeparator) ? extraExecutorClasspath : extraExecutorClasspath + File.pathSeparator;
+          allProps.put(EXECUTOR_EXTRA_CLASSPATH, extraExecutorClasspath + hiveHadoopTestClasspath);
+        }
+      }
+    }
+
+    Writer writer = new OutputStreamWriter(new FileOutputStream(properties), Charsets.UTF_8);
+    try {
+      allProps.store(writer, "Spark Context configuration");
+    } finally {
+      writer.close();
+    }
+
+    // Define how to pass options to the child process. If launching in client (or local)
+    // mode, the driver options need to be passed directly on the command line. Otherwise,
+    // SparkSubmit will take care of that for us.
+    String master = conf.get("spark.master");
+    Preconditions.checkArgument(master != null, "spark.master is not defined.");
+    String deployMode = conf.get(SPARK_DEPLOY_MODE);
+
+    if (SparkClientUtilities.isYarnClusterMode(master, deployMode)) {
+      String executorCores = conf.get("spark.executor.cores");
+      if (executorCores != null) {
+        addExecutorCores(executorCores);
+      }
+
+      String executorMemory = conf.get("spark.executor.memory");
+      if (executorMemory != null) {
+        addExecutorMemory(executorMemory);
+      }
+
+      String numOfExecutors = conf.get("spark.executor.instances");
+      if (numOfExecutors != null) {
+        addNumExecutors(numOfExecutors);
+      }
+    }
+    // The options --principal/--keypad do not work with --proxy-user in spark-submit.sh
+    // (see HIVE-15485, SPARK-5493, SPARK-19143), so Hive could only support doAs or
+    // delegation token renewal, but not both. Since doAs is a more common case, if both
+    // are needed, we choose to favor doAs. So when doAs is enabled, we use kinit command,
+    // otherwise, we pass the principal/keypad to spark to support the token renewal for
+    // long-running application.
+    if ("kerberos".equals(hiveConf.get(HADOOP_SECURITY_AUTHENTICATION))) {
+      String principal = SecurityUtil.getServerPrincipal(hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL),
+          "0.0.0.0");
+      String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
+      boolean isDoAsEnabled = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
+      if (StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(keyTabFile)) {
+        addKeytabAndPrincipal(isDoAsEnabled, keyTabFile, principal);
+      }
+    }
+    if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
+      try {
+        String currentUser = Utils.getUGI().getShortUserName();
+        // do not do impersonation in CLI mode
+        if (!currentUser.equals(System.getProperty("user.name"))) {
+          LOG.info("Attempting impersonation of " + currentUser);
+          addProxyUser(currentUser);
+        }
+      } catch (Exception e) {
+        String msg = "Cannot obtain username: " + e;
+        throw new IllegalStateException(msg, e);
+      }
+    }
+
+    String regStr = conf.get("spark.kryo.registrator");
+    if (HIVE_KRYO_REG_NAME.equals(regStr)) {
+      addJars(SparkClientUtilities.findKryoRegistratorJar(hiveConf));
+    }
+
+    addPropertiesFile(properties.getAbsolutePath());
+    addClass(RemoteDriver.class.getName());
+
+    String jar = "spark-internal";
+    if (SparkContext.jarOfClass(this.getClass()).isDefined()) {
+      jar = SparkContext.jarOfClass(this.getClass()).get();
+    }
+    addExecutableJar(jar);
+
+
+    addAppArg(RemoteDriver.REMOTE_DRIVER_HOST_CONF);
+    addAppArg(serverAddress);
+    addAppArg(RemoteDriver.REMOTE_DRIVER_PORT_CONF);
+    addAppArg(serverPort);
+
+    //hive.spark.* keys are passed down to the RemoteDriver via REMOTE_DRIVER_CONF
+    // so that they are not used in sparkContext but only in remote driver,
+    //as --properties-file contains the spark.* keys that are meant for SparkConf object.
+    for (String hiveSparkConfKey : RpcConfiguration.HIVE_SPARK_RSC_CONFIGS) {
+      String value = RpcConfiguration.getValue(hiveConf, hiveSparkConfKey);
+      addAppArg(RemoteDriver.REMOTE_DRIVER_CONF);
+      addAppArg(String.format("%s=%s", hiveSparkConfKey, value));
+    }
+
+    return launchDriver(isTesting, rpcServer, clientId);
+  }
+
+  protected abstract Future<Void> launchDriver(String isTesting, RpcServer rpcServer, String
+          clientId) throws IOException;
+
+  protected abstract String getSparkHome();
+
+  protected abstract void addAppArg(String arg);
+
+  protected abstract void addExecutableJar(String jar);
+
+  protected abstract void addPropertiesFile(String absolutePath);
+
+  protected abstract void addClass(String name);
+
+  protected abstract void addJars(String jars);
+
+  protected abstract void addProxyUser(String proxyUser);
+
+  protected abstract void addKeytabAndPrincipal(boolean isDoAsEnabled, String keyTabFile,
+                                                String principal);
+
+  protected abstract void addNumExecutors(String numOfExecutors);
+
+  protected abstract void addExecutorMemory(String executorMemory);
+
+  protected abstract void addExecutorCores(String executorCores);
+
+  private class ClientProtocol extends BaseProtocol {
+
+    <T extends Serializable> JobHandleImpl<T> submit(Job<T> job, List<JobHandle.Listener<T>> listeners) {
+      final String jobId = UUID.randomUUID().toString();
+      final Promise<T> promise = driverRpc.createPromise();
+      final JobHandleImpl<T> handle =
+          new JobHandleImpl<T>(AbstractSparkClient.this, promise, jobId, listeners);
+      jobs.put(jobId, handle);
+
+      final io.netty.util.concurrent.Future<Void> rpc = driverRpc.call(new JobRequest(jobId, job));
+      LOG.debug("Send JobRequest[{}].", jobId);
+
+      // Link the RPC and the promise so that events from one are propagated to the other as
+      // needed.
+      rpc.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Void>>() {
+        @Override
+        public void operationComplete(io.netty.util.concurrent.Future<Void> f) {
+          if (f.isSuccess()) {
+            // If the spark job finishes before this listener is called, the QUEUED status will not be set
+            handle.changeState(JobHandle.State.QUEUED);
+          } else if (!promise.isDone()) {
+            promise.setFailure(f.cause());
+          }
+        }
+      });
+      promise.addListener(new GenericFutureListener<Promise<T>>() {
+        @Override
+        public void operationComplete(Promise<T> p) {
+          if (jobId != null) {
+            jobs.remove(jobId);
+          }
+          if (p.isCancelled() && !rpc.isDone()) {
+            rpc.cancel(true);
+          }
+        }
+      });
+      return handle;
+    }
+
+    <T extends Serializable> Future<T> run(Job<T> job) {
+      @SuppressWarnings("unchecked")
+      final io.netty.util.concurrent.Future<T> rpc = (io.netty.util.concurrent.Future<T>)
+        driverRpc.call(new SyncJobRequest(job), Serializable.class);
+      return rpc;
+    }
+
+    void cancel(String jobId) {
+      driverRpc.call(new CancelJob(jobId));
+    }
+
+    Future<?> endSession() {
+      return driverRpc.call(new EndSession());
+    }
+
+    private void handle(ChannelHandlerContext ctx, Error msg) {
+      LOG.warn("Error reported from Remote Spark Driver: {}", msg.cause);
+    }
+
+    private void handle(ChannelHandlerContext ctx, JobMetrics msg) {
+      JobHandleImpl<?> handle = jobs.get(msg.jobId);
+      if (handle != null) {
+        handle.getMetrics().addMetrics(msg.sparkJobId, msg.stageId, msg.taskId, msg.metrics);
+      } else {
+        LOG.warn("Received metrics for unknown Spark job {}", msg.sparkJobId);
+      }
+    }
+
+    private void handle(ChannelHandlerContext ctx, JobResult msg) {
+      JobHandleImpl<?> handle = jobs.remove(msg.id);
+      if (handle != null) {
+        LOG.debug("Received result for client job {}", msg.id);
+        handle.setSparkCounters(msg.sparkCounters);
+        Throwable error = msg.error;
+        if (error == null) {
+          handle.setSuccess(msg.result);
+        } else {
+          handle.setFailure(error);
+        }
+      } else {
+        LOG.warn("Received result for unknown client job {}", msg.id);
+      }
+    }
+
+    private void handle(ChannelHandlerContext ctx, JobStarted msg) {
+      JobHandleImpl<?> handle = jobs.get(msg.id);
+      if (handle != null) {
+        handle.changeState(JobHandle.State.STARTED);
+      } else {
+        LOG.warn("Received event for unknown client job {}", msg.id);
+      }
+    }
+
+    private void handle(ChannelHandlerContext ctx, JobSubmitted msg) {
+      JobHandleImpl<?> handle = jobs.get(msg.clientJobId);
+      if (handle != null) {
+        LOG.info("Received Spark job ID: {} for client job {}", msg.sparkJobId, msg.clientJobId);
+        handle.addSparkJobId(msg.sparkJobId);
+      } else {
+        LOG.warn("Received Spark job ID: {} for unknown client job {}", msg.sparkJobId, msg.clientJobId);
+      }
+    }
+
+    @Override
+    protected String name() {
+      return "HiveServer2 to Remote Spark Driver Connection";
+    }
+  }
+
+  private static class AddJarJob implements Job<Serializable> {
+    private static final long serialVersionUID = 1L;
+
+    private final String path;
+
+    AddJarJob() {
+      this(null);
+    }
+
+    AddJarJob(String path) {
+      this.path = path;
+    }
+
+    @Override
+    public Serializable call(JobContext jc) throws Exception {
+      jc.sc().addJar(path);
+      // Following remote job may refer to classes in this jar, and the remote job would be executed
+      // in a different thread, so we add this jar path to JobContext for further usage.
+      jc.getAddedJars().put(path, System.currentTimeMillis());
+      return null;
+    }
+
+  }
+
+  private static class AddFileJob implements Job<Serializable> {
+    private static final long serialVersionUID = 1L;
+
+    private final String path;
+
+    AddFileJob() {
+      this(null);
+    }
+
+    AddFileJob(String path) {
+      this.path = path;
+    }
+
+    @Override
+    public Serializable call(JobContext jc) throws Exception {
+      jc.sc().addFile(path);
+      return null;
+    }
+
+  }
+
+  private static class GetExecutorCountJob implements Job<Integer> {
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Integer call(JobContext jc) throws Exception {
+        // minus 1 here otherwise driver is also counted as an executor
+        int count = jc.sc().sc().getExecutorMemoryStatus().size() - 1;
+        return Integer.valueOf(count);
+      }
+
+  }
+
+  private static class GetDefaultParallelismJob implements Job<Integer> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public Integer call(JobContext jc) throws Exception {
+      return jc.sc().sc().defaultParallelism();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java
index 2881252..61489a3 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java
@@ -34,7 +34,7 @@ import org.apache.hive.spark.counter.SparkCounters;
  */
 class JobHandleImpl<T extends Serializable> implements JobHandle<T> {
 
-  private final SparkClientImpl client;
+  private final SparkClient client;
   private final String jobId;
   private final MetricsCollection metrics;
   private final Promise<T> promise;
@@ -43,8 +43,8 @@ class JobHandleImpl<T extends Serializable> implements JobHandle<T> {
   private volatile State state;
   private volatile SparkCounters sparkCounters;
 
-  JobHandleImpl(SparkClientImpl client, Promise<T> promise, String jobId,
-                    List<Listener<T>> listeners) {
+  JobHandleImpl(SparkClient client, Promise<T> promise, String jobId,
+                List<Listener<T>> listeners) {
     this.client = client;
     this.jobId = jobId;
     this.promise = promise;
@@ -233,7 +233,7 @@ class JobHandleImpl<T extends Serializable> implements JobHandle<T> {
     }
   }
 
-  /** Last attempt at preventing stray jobs from accumulating in SparkClientImpl. */
+  /** Last attempt at preventing stray jobs from accumulating in SparkClient. */
   @Override
   protected void finalize() {
     if (!isDone()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
index 1922e41..9138899 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
@@ -110,4 +110,11 @@ public interface SparkClient extends Serializable {
    * Check if remote context is still active.
    */
   boolean isActive();
+
+  /**
+   * Cancel the specified jobId
+   *
+   * @param jobId the jobId to cancel
+   */
+  void cancel(String jobId);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
index 88b5c95..1974e88 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
@@ -18,13 +18,11 @@
 package org.apache.hive.spark.client;
 
 import java.io.IOException;
-import java.io.PrintStream;
 import java.util.Map;
 
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hive.spark.client.rpc.RpcServer;
-import org.apache.spark.SparkException;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
@@ -82,10 +80,18 @@ public final class SparkClientFactory {
    * @param hiveConf Configuration for Hive, contains hive.* properties.
    */
   public static SparkClient createClient(Map<String, String> sparkConf, HiveConf hiveConf,
-                                         String sessionId)
-          throws IOException, SparkException {
+                                         String sessionId) throws IOException {
     Preconditions.checkState(server != null,
             "Invalid state: Hive on Spark RPC Server has not been initialized");
-    return new SparkClientImpl(server, sparkConf, hiveConf, sessionId);
+    switch (hiveConf.getVar(HiveConf.ConfVars.SPARK_CLIENT_TYPE)) {
+    case HiveConf.HIVE_SPARK_SUBMIT_CLIENT:
+      return new SparkSubmitSparkClient(server, sparkConf, hiveConf, sessionId);
+    case HiveConf.HIVE_SPARK_LAUNCHER_CLIENT:
+      return new SparkLauncherSparkClient(server, sparkConf, hiveConf, sessionId);
+    default:
+      throw new IllegalArgumentException("Unknown Hive on Spark launcher type " + hiveConf.getVar(
+              HiveConf.ConfVars.SPARK_CLIENT_TYPE) + " valid options are " +
+              HiveConf.HIVE_SPARK_SUBMIT_CLIENT + " or " + HiveConf.HIVE_SPARK_LAUNCHER_CLIENT);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
deleted file mode 100644
index 847c82b..0000000
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
+++ /dev/null
@@ -1,703 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.spark.client;
-
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
-import static org.apache.hive.spark.client.SparkClientUtilities.HIVE_KRYO_REG_NAME;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Joiner;
-import com.google.common.base.Strings;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.io.Resources;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.Promise;
-
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Serializable;
-import java.io.Writer;
-import java.net.URI;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hive.common.log.LogRedirector;
-import org.apache.hadoop.hive.conf.Constants;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.shims.Utils;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hive.spark.client.rpc.Rpc;
-import org.apache.hive.spark.client.rpc.RpcConfiguration;
-import org.apache.hive.spark.client.rpc.RpcServer;
-import org.apache.spark.SparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-class SparkClientImpl implements SparkClient {
-  private static final long serialVersionUID = 1L;
-
-  private static final Logger LOG = LoggerFactory.getLogger(SparkClientImpl.class);
-
-  private static final long DEFAULT_SHUTDOWN_TIMEOUT = 10000; // In milliseconds
-
-  private static final String OSX_TEST_OPTS = "SPARK_OSX_TEST_OPTS";
-  private static final String SPARK_HOME_ENV = "SPARK_HOME";
-  private static final String SPARK_HOME_KEY = "spark.home";
-  private static final String DRIVER_OPTS_KEY = "spark.driver.extraJavaOptions";
-  private static final String EXECUTOR_OPTS_KEY = "spark.executor.extraJavaOptions";
-  private static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath";
-  private static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath";
-
-  private final Map<String, String> conf;
-  private final HiveConf hiveConf;
-  private final Thread driverThread;
-  private final Map<String, JobHandleImpl<?>> jobs;
-  private final Rpc driverRpc;
-  private final ClientProtocol protocol;
-  private volatile boolean isAlive;
-
-  SparkClientImpl(RpcServer rpcServer, Map<String, String> conf, HiveConf hiveConf,
-                  String sessionid) throws IOException {
-    this.conf = conf;
-    this.hiveConf = hiveConf;
-    this.jobs = Maps.newConcurrentMap();
-
-    String secret = rpcServer.createSecret();
-    this.driverThread = startDriver(rpcServer, sessionid, secret);
-    this.protocol = new ClientProtocol();
-
-    try {
-      // The RPC server will take care of timeouts here.
-      this.driverRpc = rpcServer.registerClient(sessionid, secret, protocol).get();
-    } catch (Throwable e) {
-      String errorMsg;
-      if (e.getCause() instanceof TimeoutException) {
-        errorMsg = "Timed out waiting for Remote Spark Driver to connect to HiveServer2.\nPossible reasons " +
-            "include network issues, errors in remote driver, cluster has no available resources, etc." +
-            "\nPlease check YARN or Spark driver's logs for further information.";
-      } else if (e.getCause() instanceof InterruptedException) {
-        errorMsg = "Interrupted while waiting for Remote Spark Driver to connect to HiveServer2.\nIt is possible " +
-            "that the query was cancelled which would cause the Spark Session to close.";
-      } else {
-        errorMsg = "Error while waiting for Remote Spark Driver to connect back to HiveServer2.";
-      }
-      LOG.error(errorMsg, e);
-      driverThread.interrupt();
-      try {
-        driverThread.join();
-      } catch (InterruptedException ie) {
-        // Give up.
-        LOG.warn("Interrupted before driver thread was finished.", ie);
-      }
-      throw Throwables.propagate(e);
-    }
-
-    LOG.info("Successfully connected to Remote Spark Driver at: " + this.driverRpc.getRemoteAddress());
-
-    driverRpc.addListener(new Rpc.Listener() {
-        @Override
-        public void rpcClosed(Rpc rpc) {
-          if (isAlive) {
-            LOG.warn("Connection to Remote Spark Driver {} closed unexpectedly", driverRpc.getRemoteAddress());
-            isAlive = false;
-          }
-        }
-
-        @Override
-        public String toString() {
-          return "Connection to Remote Spark Driver Closed Unexpectedly";
-        }
-    });
-    isAlive = true;
-  }
-
-  @Override
-  public <T extends Serializable> JobHandle<T> submit(Job<T> job) {
-    return protocol.submit(job, Collections.<JobHandle.Listener<T>>emptyList());
-  }
-
-  @Override
-  public <T extends Serializable> JobHandle<T> submit(Job<T> job, List<JobHandle.Listener<T>> listeners) {
-    return protocol.submit(job, listeners);
-  }
-
-  @Override
-  public <T extends Serializable> Future<T> run(Job<T> job) {
-    return protocol.run(job);
-  }
-
-  @Override
-  public void stop() {
-    if (isAlive) {
-      isAlive = false;
-      try {
-        protocol.endSession();
-      } catch (Exception e) {
-        LOG.warn("Exception while waiting for end session reply.", e);
-      } finally {
-        driverRpc.close();
-      }
-    }
-
-    long endTime = System.currentTimeMillis() + DEFAULT_SHUTDOWN_TIMEOUT;
-    try {
-      driverThread.join(DEFAULT_SHUTDOWN_TIMEOUT);
-    } catch (InterruptedException ie) {
-      LOG.debug("Interrupted before driver thread was finished.");
-    }
-    if (endTime - System.currentTimeMillis() <= 0) {
-      LOG.warn("Timed out shutting down remote driver, interrupting...");
-      driverThread.interrupt();
-    }
-  }
-
-  @Override
-  public Future<?> addJar(URI uri) {
-    return run(new AddJarJob(uri.toString()));
-  }
-
-  @Override
-  public Future<?> addFile(URI uri) {
-    return run(new AddFileJob(uri.toString()));
-  }
-
-  @Override
-  public Future<Integer> getExecutorCount() {
-    return run(new GetExecutorCountJob());
-  }
-
-  @Override
-  public Future<Integer> getDefaultParallelism() {
-    return run(new GetDefaultParallelismJob());
-  }
-
-  @Override
-  public boolean isActive() {
-    return isAlive && driverRpc.isActive();
-  }
-
-  void cancel(String jobId) {
-    protocol.cancel(jobId);
-  }
-
-  private Thread startDriver(final RpcServer rpcServer, final String clientId, final String secret)
-      throws IOException {
-    Runnable runnable;
-    final String serverAddress = rpcServer.getAddress();
-    final String serverPort = String.valueOf(rpcServer.getPort());
-
-    // If a Spark installation is provided, use the spark-submit script. Otherwise, call the
-    // SparkSubmit class directly, which has some caveats (like having to provide a proper
-    // version of Guava on the classpath depending on the deploy mode).
-    String sparkHome = Strings.emptyToNull(conf.get(SPARK_HOME_KEY));
-    if (sparkHome == null) {
-      sparkHome = Strings.emptyToNull(System.getenv(SPARK_HOME_ENV));
-    }
-    if (sparkHome == null) {
-      sparkHome = Strings.emptyToNull(System.getProperty(SPARK_HOME_KEY));
-    }
-    String sparkLogDir = conf.get("hive.spark.log.dir");
-    if (sparkLogDir == null) {
-      if (sparkHome == null) {
-        sparkLogDir = "./target/";
-      } else {
-        sparkLogDir = sparkHome + "/logs/";
-      }
-    }
-
-    String osxTestOpts = "";
-    if (Strings.nullToEmpty(System.getProperty("os.name")).toLowerCase().contains("mac")) {
-      osxTestOpts = Strings.nullToEmpty(System.getenv(OSX_TEST_OPTS));
-    }
-
-    String driverJavaOpts = Joiner.on(" ").skipNulls().join(
-        "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(DRIVER_OPTS_KEY));
-    String executorJavaOpts = Joiner.on(" ").skipNulls().join(
-        "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(EXECUTOR_OPTS_KEY));
-
-    // Create a file with all the job properties to be read by spark-submit. Change the
-    // file's permissions so that only the owner can read it. This avoid having the
-    // connection secret show up in the child process's command line.
-    File properties = File.createTempFile("spark-submit.", ".properties");
-    if (!properties.setReadable(false) || !properties.setReadable(true, true)) {
-      throw new IOException("Cannot change permissions of job properties file.");
-    }
-    properties.deleteOnExit();
-
-    Properties allProps = new Properties();
-    // first load the defaults from spark-defaults.conf if available
-    try {
-      URL sparkDefaultsUrl = Thread.currentThread().getContextClassLoader().getResource("spark-defaults.conf");
-      if (sparkDefaultsUrl != null) {
-        LOG.info("Loading spark defaults configs from: " + sparkDefaultsUrl);
-        allProps.load(new ByteArrayInputStream(Resources.toByteArray(sparkDefaultsUrl)));
-      }
-    } catch (Exception e) {
-      String msg = "Exception trying to load spark-defaults.conf: " + e;
-      throw new IOException(msg, e);
-    }
-    // then load the SparkClientImpl config
-    for (Map.Entry<String, String> e : conf.entrySet()) {
-      allProps.put(e.getKey(), conf.get(e.getKey()));
-    }
-    allProps.put(SparkClientFactory.CONF_CLIENT_ID, clientId);
-    allProps.put(SparkClientFactory.CONF_KEY_SECRET, secret);
-    allProps.put(DRIVER_OPTS_KEY, driverJavaOpts);
-    allProps.put(EXECUTOR_OPTS_KEY, executorJavaOpts);
-
-    String isTesting = conf.get("spark.testing");
-    if (isTesting != null && isTesting.equalsIgnoreCase("true")) {
-      String hiveHadoopTestClasspath = Strings.nullToEmpty(System.getenv("HIVE_HADOOP_TEST_CLASSPATH"));
-      if (!hiveHadoopTestClasspath.isEmpty()) {
-        String extraDriverClasspath = Strings.nullToEmpty((String)allProps.get(DRIVER_EXTRA_CLASSPATH));
-        if (extraDriverClasspath.isEmpty()) {
-          allProps.put(DRIVER_EXTRA_CLASSPATH, hiveHadoopTestClasspath);
-        } else {
-          extraDriverClasspath = extraDriverClasspath.endsWith(File.pathSeparator) ? extraDriverClasspath : extraDriverClasspath + File.pathSeparator;
-          allProps.put(DRIVER_EXTRA_CLASSPATH, extraDriverClasspath + hiveHadoopTestClasspath);
-        }
-
-        String extraExecutorClasspath = Strings.nullToEmpty((String)allProps.get(EXECUTOR_EXTRA_CLASSPATH));
-        if (extraExecutorClasspath.isEmpty()) {
-          allProps.put(EXECUTOR_EXTRA_CLASSPATH, hiveHadoopTestClasspath);
-        } else {
-          extraExecutorClasspath = extraExecutorClasspath.endsWith(File.pathSeparator) ? extraExecutorClasspath : extraExecutorClasspath + File.pathSeparator;
-          allProps.put(EXECUTOR_EXTRA_CLASSPATH, extraExecutorClasspath + hiveHadoopTestClasspath);
-        }
-      }
-    }
-
-    Writer writer = new OutputStreamWriter(new FileOutputStream(properties), Charsets.UTF_8);
-    try {
-      allProps.store(writer, "Spark Context configuration");
-    } finally {
-      writer.close();
-    }
-
-    // Define how to pass options to the child process. If launching in client (or local)
-    // mode, the driver options need to be passed directly on the command line. Otherwise,
-    // SparkSubmit will take care of that for us.
-    String master = conf.get("spark.master");
-    Preconditions.checkArgument(master != null, "spark.master is not defined.");
-    String deployMode = conf.get("spark.submit.deployMode");
-
-    List<String> argv = Lists.newLinkedList();
-
-    if (sparkHome != null) {
-      argv.add(new File(sparkHome, "bin/spark-submit").getAbsolutePath());
-    } else {
-      LOG.info("No spark.home provided, calling SparkSubmit directly.");
-      argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath());
-
-      if (master.startsWith("local") || master.startsWith("mesos") ||
-          SparkClientUtilities.isYarnClientMode(master, deployMode) ||
-          master.startsWith("spark")) {
-        String mem = conf.get("spark.driver.memory");
-        if (mem != null) {
-          argv.add("-Xms" + mem);
-          argv.add("-Xmx" + mem);
-        }
-
-        String cp = conf.get("spark.driver.extraClassPath");
-        if (cp != null) {
-          argv.add("-classpath");
-          argv.add(cp);
-        }
-
-        String libPath = conf.get("spark.driver.extraLibPath");
-        if (libPath != null) {
-          argv.add("-Djava.library.path=" + libPath);
-        }
-
-        String extra = conf.get(DRIVER_OPTS_KEY);
-        if (extra != null) {
-          for (String opt : extra.split("[ ]")) {
-            if (!opt.trim().isEmpty()) {
-              argv.add(opt.trim());
-            }
-          }
-        }
-      }
-
-      argv.add("org.apache.spark.deploy.SparkSubmit");
-    }
-
-    if (SparkClientUtilities.isYarnClusterMode(master, deployMode)) {
-      String executorCores = conf.get("spark.executor.cores");
-      if (executorCores != null) {
-        argv.add("--executor-cores");
-        argv.add(executorCores);
-      }
-
-      String executorMemory = conf.get("spark.executor.memory");
-      if (executorMemory != null) {
-        argv.add("--executor-memory");
-        argv.add(executorMemory);
-      }
-
-      String numOfExecutors = conf.get("spark.executor.instances");
-      if (numOfExecutors != null) {
-        argv.add("--num-executors");
-        argv.add(numOfExecutors);
-      }
-    }
-    // The options --principal/--keypad do not work with --proxy-user in spark-submit.sh
-    // (see HIVE-15485, SPARK-5493, SPARK-19143), so Hive could only support doAs or
-    // delegation token renewal, but not both. Since doAs is a more common case, if both
-    // are needed, we choose to favor doAs. So when doAs is enabled, we use kinit command,
-    // otherwise, we pass the principal/keypad to spark to support the token renewal for
-    // long-running application.
-    if ("kerberos".equals(hiveConf.get(HADOOP_SECURITY_AUTHENTICATION))) {
-      String principal = SecurityUtil.getServerPrincipal(hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL),
-          "0.0.0.0");
-      String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
-      if (StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(keyTabFile)) {
-        if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
-          List<String> kinitArgv = Lists.newLinkedList();
-          kinitArgv.add("kinit");
-          kinitArgv.add(principal);
-          kinitArgv.add("-k");
-          kinitArgv.add("-t");
-          kinitArgv.add(keyTabFile + ";");
-          kinitArgv.addAll(argv);
-          argv = kinitArgv;
-        } else {
-          // if doAs is not enabled, we pass the principal/keypad to spark-submit in order to
-          // support the possible delegation token renewal in Spark
-          argv.add("--principal");
-          argv.add(principal);
-          argv.add("--keytab");
-          argv.add(keyTabFile);
-        }
-      }
-    }
-    if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
-      try {
-        String currentUser = Utils.getUGI().getShortUserName();
-        // do not do impersonation in CLI mode
-        if (!currentUser.equals(System.getProperty("user.name"))) {
-          LOG.info("Attempting impersonation of " + currentUser);
-          argv.add("--proxy-user");
-          argv.add(currentUser);
-        }
-      } catch (Exception e) {
-        String msg = "Cannot obtain username: " + e;
-        throw new IllegalStateException(msg, e);
-      }
-    }
-
-    String regStr = conf.get("spark.kryo.registrator");
-    if (HIVE_KRYO_REG_NAME.equals(regStr)) {
-      argv.add("--jars");
-      argv.add(SparkClientUtilities.findKryoRegistratorJar(hiveConf));
-    }
-
-    argv.add("--properties-file");
-    argv.add(properties.getAbsolutePath());
-    argv.add("--class");
-    argv.add(RemoteDriver.class.getName());
-
-    String jar = "spark-internal";
-    if (SparkContext.jarOfClass(this.getClass()).isDefined()) {
-      jar = SparkContext.jarOfClass(this.getClass()).get();
-    }
-    argv.add(jar);
-
-    argv.add(RemoteDriver.REMOTE_DRIVER_HOST_CONF);
-    argv.add(serverAddress);
-    argv.add(RemoteDriver.REMOTE_DRIVER_PORT_CONF);
-    argv.add(serverPort);
-
-    //hive.spark.* keys are passed down to the RemoteDriver via REMOTE_DRIVER_CONF
-    // so that they are not used in sparkContext but only in remote driver,
-    //as --properties-file contains the spark.* keys that are meant for SparkConf object.
-    for (String hiveSparkConfKey : RpcConfiguration.HIVE_SPARK_RSC_CONFIGS) {
-      String value = RpcConfiguration.getValue(hiveConf, hiveSparkConfKey);
-      argv.add(RemoteDriver.REMOTE_DRIVER_CONF);
-      argv.add(String.format("%s=%s", hiveSparkConfKey, value));
-    }
-
-    String cmd = Joiner.on(" ").join(argv);
-    LOG.info("Running client driver with argv: {}", cmd);
-    ProcessBuilder pb = new ProcessBuilder("sh", "-c", cmd);
-
-    // Prevent hive configurations from being visible in Spark.
-    pb.environment().remove("HIVE_HOME");
-    pb.environment().remove("HIVE_CONF_DIR");
-    // Add credential provider password to the child process's environment
-    // In case of Spark the credential provider location is provided in the jobConf when the job is submitted
-    String password = getSparkJobCredentialProviderPassword();
-    if(password != null) {
-      pb.environment().put(Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR, password);
-    }
-    if (isTesting != null) {
-      pb.environment().put("SPARK_TESTING", isTesting);
-    }
-
-    final Process child = pb.start();
-    String threadName = Thread.currentThread().getName();
-    final List<String> childErrorLog = Collections.synchronizedList(new ArrayList<String>());
-    final LogRedirector.LogSourceCallback callback = () -> {return isAlive;};
-
-    LogRedirector.redirect("spark-submit-stdout-redir-" + threadName,
-        new LogRedirector(child.getInputStream(), LOG, callback));
-    LogRedirector.redirect("spark-submit-stderr-redir-" + threadName,
-        new LogRedirector(child.getErrorStream(), LOG, childErrorLog, callback));
-
-    runnable = new Runnable() {
-      @Override
-      public void run() {
-        try {
-          int exitCode = child.waitFor();
-          if (exitCode != 0) {
-            StringBuilder errStr = new StringBuilder();
-            synchronized(childErrorLog) {
-              Iterator iter = childErrorLog.iterator();
-              while(iter.hasNext()){
-                errStr.append(iter.next());
-                errStr.append('\n');
-              }
-            }
-
-            LOG.warn("Child process exited with code {}", exitCode);
-            rpcServer.cancelClient(clientId,
-                "Child process (spark-submit) exited before connecting back with error log " + errStr.toString());
-          }
-        } catch (InterruptedException ie) {
-          LOG.warn("Thread waiting on the child process (spark-submit) is interrupted, killing the child process.");
-          rpcServer.cancelClient(clientId, "Thread waiting on the child porcess (spark-submit) is interrupted");
-          Thread.interrupted();
-          child.destroy();
-        } catch (Exception e) {
-          String errMsg = "Exception while waiting for child process (spark-submit)";
-          LOG.warn(errMsg, e);
-          rpcServer.cancelClient(clientId, errMsg);
-        }
-      }
-    };
-
-    Thread thread = new Thread(runnable);
-    thread.setDaemon(true);
-    thread.setName("Driver");
-    thread.start();
-    return thread;
-  }
-
-  private String getSparkJobCredentialProviderPassword() {
-    if (conf.containsKey("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD")) {
-      return conf.get("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD");
-    } else if (conf.containsKey("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD")) {
-      return conf.get("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD");
-    }
-    return null;
-  }
-
-  private class ClientProtocol extends BaseProtocol {
-
-    <T extends Serializable> JobHandleImpl<T> submit(Job<T> job, List<JobHandle.Listener<T>> listeners) {
-      final String jobId = UUID.randomUUID().toString();
-      final Promise<T> promise = driverRpc.createPromise();
-      final JobHandleImpl<T> handle =
-          new JobHandleImpl<T>(SparkClientImpl.this, promise, jobId, listeners);
-      jobs.put(jobId, handle);
-
-      final io.netty.util.concurrent.Future<Void> rpc = driverRpc.call(new JobRequest(jobId, job));
-      LOG.debug("Send JobRequest[{}].", jobId);
-
-      // Link the RPC and the promise so that events from one are propagated to the other as
-      // needed.
-      rpc.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Void>>() {
-        @Override
-        public void operationComplete(io.netty.util.concurrent.Future<Void> f) {
-          if (f.isSuccess()) {
-            // If the spark job finishes before this listener is called, the QUEUED status will not be set
-            handle.changeState(JobHandle.State.QUEUED);
-          } else if (!promise.isDone()) {
-            promise.setFailure(f.cause());
-          }
-        }
-      });
-      promise.addListener(new GenericFutureListener<Promise<T>>() {
-        @Override
-        public void operationComplete(Promise<T> p) {
-          if (jobId != null) {
-            jobs.remove(jobId);
-          }
-          if (p.isCancelled() && !rpc.isDone()) {
-            rpc.cancel(true);
-          }
-        }
-      });
-      return handle;
-    }
-
-    <T extends Serializable> Future<T> run(Job<T> job) {
-      @SuppressWarnings("unchecked")
-      final io.netty.util.concurrent.Future<T> rpc = (io.netty.util.concurrent.Future<T>)
-        driverRpc.call(new SyncJobRequest(job), Serializable.class);
-      return rpc;
-    }
-
-    void cancel(String jobId) {
-      driverRpc.call(new CancelJob(jobId));
-    }
-
-    Future<?> endSession() {
-      return driverRpc.call(new EndSession());
-    }
-
-    private void handle(ChannelHandlerContext ctx, Error msg) {
-      LOG.warn("Error reported from Remote Spark Driver: {}", msg.cause);
-    }
-
-    private void handle(ChannelHandlerContext ctx, JobMetrics msg) {
-      JobHandleImpl<?> handle = jobs.get(msg.jobId);
-      if (handle != null) {
-        handle.getMetrics().addMetrics(msg.sparkJobId, msg.stageId, msg.taskId, msg.metrics);
-      } else {
-        LOG.warn("Received metrics for unknown Spark job {}", msg.sparkJobId);
-      }
-    }
-
-    private void handle(ChannelHandlerContext ctx, JobResult msg) {
-      JobHandleImpl<?> handle = jobs.remove(msg.id);
-      if (handle != null) {
-        LOG.debug("Received result for client job {}", msg.id);
-        handle.setSparkCounters(msg.sparkCounters);
-        Throwable error = msg.error;
-        if (error == null) {
-          handle.setSuccess(msg.result);
-        } else {
-          handle.setFailure(error);
-        }
-      } else {
-        LOG.warn("Received result for unknown client job {}", msg.id);
-      }
-    }
-
-    private void handle(ChannelHandlerContext ctx, JobStarted msg) {
-      JobHandleImpl<?> handle = jobs.get(msg.id);
-      if (handle != null) {
-        handle.changeState(JobHandle.State.STARTED);
-      } else {
-        LOG.warn("Received event for unknown client job {}", msg.id);
-      }
-    }
-
-    private void handle(ChannelHandlerContext ctx, JobSubmitted msg) {
-      JobHandleImpl<?> handle = jobs.get(msg.clientJobId);
-      if (handle != null) {
-        LOG.info("Received Spark job ID: {} for client job {}", msg.sparkJobId, msg.clientJobId);
-        handle.addSparkJobId(msg.sparkJobId);
-      } else {
-        LOG.warn("Received Spark job ID: {} for unknown client job {}", msg.sparkJobId, msg.clientJobId);
-      }
-    }
-
-    @Override
-    protected String name() {
-      return "HiveServer2 to Remote Spark Driver Connection";
-    }
-  }
-
-  private static class AddJarJob implements Job<Serializable> {
-    private static final long serialVersionUID = 1L;
-
-    private final String path;
-
-    AddJarJob() {
-      this(null);
-    }
-
-    AddJarJob(String path) {
-      this.path = path;
-    }
-
-    @Override
-    public Serializable call(JobContext jc) throws Exception {
-      jc.sc().addJar(path);
-      // Following remote job may refer to classes in this jar, and the remote job would be executed
-      // in a different thread, so we add this jar path to JobContext for further usage.
-      jc.getAddedJars().put(path, System.currentTimeMillis());
-      return null;
-    }
-
-  }
-
-  private static class AddFileJob implements Job<Serializable> {
-    private static final long serialVersionUID = 1L;
-
-    private final String path;
-
-    AddFileJob() {
-      this(null);
-    }
-
-    AddFileJob(String path) {
-      this.path = path;
-    }
-
-    @Override
-    public Serializable call(JobContext jc) throws Exception {
-      jc.sc().addFile(path);
-      return null;
-    }
-
-  }
-
-  private static class GetExecutorCountJob implements Job<Integer> {
-      private static final long serialVersionUID = 1L;
-
-      @Override
-      public Integer call(JobContext jc) throws Exception {
-        // minus 1 here otherwise driver is also counted as an executor
-        int count = jc.sc().sc().getExecutorMemoryStatus().size() - 1;
-        return Integer.valueOf(count);
-      }
-
-  }
-
-  private static class GetDefaultParallelismJob implements Job<Integer> {
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public Integer call(JobContext jc) throws Exception {
-      return jc.sc().sc().defaultParallelism();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/main/java/org/apache/hive/spark/client/SparkLauncherSparkClient.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkLauncherSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkLauncherSparkClient.java
new file mode 100644
index 0000000..cf52c4f
--- /dev/null
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkLauncherSparkClient.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.spark.client.rpc.RpcServer;
+
+import org.apache.spark.launcher.AbstractLauncher;
+import org.apache.spark.launcher.InProcessLauncher;
+import org.apache.spark.launcher.SparkAppHandle;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+
+
+/**
+ * Extends the {@link AbstractSparkClient} and uses Spark's
+ * {@link org.apache.spark.launcher.SparkLauncher} to submit the HoS application. Specifically,
+ * it uses the {@link InProcessLauncher} to avoid spawning a sub-process to submit the Spark app.
+ * It uses a {@link Thread} to monitor when the Spark app has been successfully submitted. The
+ * thread can be interrupted, in which case the {@link RpcServer} client will be cancelled and
+ * the Spark app will be stopped.
+ */
+public class SparkLauncherSparkClient extends AbstractSparkClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+          SparkLauncherSparkClient.class.getName());
+
+  private static final long serialVersionUID = 2153000661341457380L;
+
+  private static final Set<SparkAppHandle.State> FAILED_SPARK_STATES = Sets.newHashSet(
+          SparkAppHandle.State.FAILED,
+          SparkAppHandle.State.KILLED,
+          SparkAppHandle.State.LOST);
+
+  private AbstractLauncher<InProcessLauncher> sparkLauncher;
+
+  SparkLauncherSparkClient(RpcServer rpcServer,
+                                   Map<String, String> conf,
+                                   HiveConf hiveConf,
+                                   String sessionid) throws IOException {
+    super(rpcServer, conf, hiveConf, sessionid);
+  }
+
+  @Override
+  protected Future<Void> launchDriver(String isTesting, RpcServer rpcServer,
+                                      String clientId) throws IOException {
+    if (isTesting != null) {
+      System.setProperty("spark.testing", "true");
+    }
+
+    // Only allow the spark.master to be local in unit tests
+    if (isTesting == null) {
+      Preconditions.checkArgument(SparkClientUtilities.isYarnClusterMode(
+              this.conf.get("spark.master"), this.conf.get("spark.submit.deployMode")),
+              getClass().getName() + " is only supported in yarn-cluster mode");
+    }
+
+    // Monitors when the Spark app has been successfully started
+    CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+    // Submit the app
+    SparkAppHandle sparkAppHandle = getSparkLauncher().startApplication(
+            new SparkAppListener(shutdownLatch, rpcServer, clientId));
+
+    return createSparkLauncherFuture(shutdownLatch, sparkAppHandle, rpcServer, clientId);
+  }
+
+  @VisibleForTesting
+  static Future<Void> createSparkLauncherFuture(CountDownLatch shutdownLatch,
+                                                SparkAppHandle sparkAppHandle, RpcServer rpcServer,
+                                                String clientId) {
+    // Monitor the countdown latch
+    Callable<Void> runnable = () -> {
+      try {
+        shutdownLatch.await();
+      } catch (InterruptedException e) {
+        rpcServer.cancelClient(clientId, "Spark app launcher interrupted");
+        sparkAppHandle.stop();
+      }
+      return null;
+    };
+
+    FutureTask<Void> futureTask = new FutureTask<>(runnable);
+
+    Thread driverThread = new Thread(futureTask);
+    driverThread.setDaemon(true);
+    driverThread.setName("SparkLauncherMonitor");
+    driverThread.start();
+
+    return futureTask;
+  }
+
+  @Override
+  protected String getSparkHome() {
+    return null;
+  }
+
+  @Override
+  protected void addAppArg(String arg) {
+    getSparkLauncher().addAppArgs(arg);
+  }
+
+  @Override
+  protected void addExecutableJar(String jar) {
+    getSparkLauncher().setAppResource(jar);
+  }
+
+  @Override
+  protected void addPropertiesFile(String absolutePath) {
+    getSparkLauncher().setPropertiesFile(absolutePath);
+  }
+
+  @Override
+  protected void addClass(String name) {
+    getSparkLauncher().setMainClass(name);
+  }
+
+  @Override
+  protected void addJars(String jars) {
+    getSparkLauncher().addJar(jars);
+  }
+
+  @Override
+  protected void addProxyUser(String proxyUser) {
+    throw new UnsupportedOperationException();
+//    getSparkLauncher().addSparkArg("--proxy-user", proxyUser);
+  }
+
+  @Override
+  protected void addKeytabAndPrincipal(boolean isDoAsEnabled, String keyTabFile, String principal) {
+    throw new UnsupportedOperationException();
+//    getSparkLauncher().addSparkArg("--principal", principal);
+//    getSparkLauncher().addSparkArg("--keytab", keyTabFile);
+  }
+
+  @Override
+  protected void addNumExecutors(String numOfExecutors) {
+    getSparkLauncher().addSparkArg("--num-executors", numOfExecutors);
+  }
+
+  @Override
+  protected void addExecutorMemory(String executorMemory) {
+    getSparkLauncher().addSparkArg("--executor-memory", executorMemory);
+  }
+
+  @Override
+  protected void addExecutorCores(String executorCores) {
+    getSparkLauncher().addSparkArg("--executor-cores", executorCores);
+  }
+
+  private AbstractLauncher<InProcessLauncher> getSparkLauncher() {
+    if (this.sparkLauncher == null) {
+      this.sparkLauncher = new InProcessLauncher();
+    }
+    return this.sparkLauncher;
+  }
+
+  @VisibleForTesting
+  static final class SparkAppListener implements SparkAppHandle.Listener {
+
+    private final CountDownLatch shutdownLatch;
+    private final RpcServer rpcServer;
+    private final String clientId;
+
+    SparkAppListener(CountDownLatch shutdownLatch, RpcServer rpcServer, String clientId) {
+      this.shutdownLatch = shutdownLatch;
+      this.rpcServer = rpcServer;
+      this.clientId = clientId;
+    }
+
+    @Override
+    public void stateChanged(SparkAppHandle sparkAppHandle) {
+      LOG.info("Spark app transitioned to state = " + sparkAppHandle.getState());
+      if (sparkAppHandle.getState().isFinal() || sparkAppHandle.getState().equals(
+              SparkAppHandle.State.RUNNING)) {
+        this.shutdownLatch.countDown();
+        sparkAppHandle.disconnect();
+        LOG.info("Successfully disconnected from Spark app handle");
+      }
+      if (FAILED_SPARK_STATES.contains(sparkAppHandle.getState())) {
+        this.rpcServer.cancelClient(this.clientId, "Spark app launcher failed," +
+                " transitioned to state " + sparkAppHandle.getState());
+      }
+    }
+
+    @Override
+    public void infoChanged(SparkAppHandle sparkAppHandle) {
+      // Do nothing
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
new file mode 100644
index 0000000..1a524b9
--- /dev/null
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+
+import org.apache.hadoop.hive.common.log.LogRedirector;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.spark.client.rpc.RpcServer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Extends the {@link AbstractSparkClient} and launches a child process to run Spark's {@code
+ * bin/spark-submit} script. Logs are re-directed from the child process logs.
+ */
+class SparkSubmitSparkClient extends AbstractSparkClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkSubmitSparkClient.class);
+
+  private static final String SPARK_HOME_ENV = "SPARK_HOME";
+  private static final String SPARK_HOME_KEY = "spark.home";
+
+  private static final long serialVersionUID = -4272763023516238171L;
+
+  private List<String> argv;
+
+  SparkSubmitSparkClient(RpcServer rpcServer, Map<String, String> conf, HiveConf hiveConf,
+                         String sessionid) throws IOException {
+    super(rpcServer, conf, hiveConf, sessionid);
+  }
+
+  @Override
+  protected String getSparkHome() {
+    String sparkHome = Strings.emptyToNull(conf.get(SPARK_HOME_KEY));
+    if (sparkHome == null) {
+      sparkHome = Strings.emptyToNull(System.getenv(SPARK_HOME_ENV));
+    }
+    if (sparkHome == null) {
+      sparkHome = Strings.emptyToNull(System.getProperty(SPARK_HOME_KEY));
+    }
+
+    Preconditions.checkNotNull(sparkHome, "Cannot use " + HiveConf.HIVE_SPARK_SUBMIT_CLIENT +
+            " without setting Spark Home");
+    String master = conf.get("spark.master");
+    Preconditions.checkArgument(master != null, "spark.master is not defined.");
+
+    argv = Lists.newLinkedList();
+    argv.add(new File(sparkHome, "bin/spark-submit").getAbsolutePath());
+
+    return sparkHome;
+  }
+
+  @Override
+  protected void addAppArg(String arg) {
+    argv.add(arg);
+  }
+
+  @Override
+  protected void addExecutableJar(String jar) {
+    argv.add(jar);
+  }
+
+  @Override
+  protected void addPropertiesFile(String absolutePath) {
+    argv.add("--properties-file");
+    argv.add(absolutePath);
+  }
+
+  @Override
+  protected void addClass(String name) {
+    argv.add("--class");
+    argv.add(RemoteDriver.class.getName());
+  }
+
+  @Override
+  protected void addJars(String jars) {
+    argv.add("--jars");
+    argv.add(jars);
+  }
+
+  @Override
+  protected void addProxyUser(String proxyUser) {
+    argv.add("--proxy-user");
+    argv.add(proxyUser);
+  }
+
+  @Override
+  protected void addKeytabAndPrincipal(boolean isDoAsEnabled, String keyTabFile, String principal) {
+    if (isDoAsEnabled) {
+      List<String> kinitArgv = Lists.newLinkedList();
+      kinitArgv.add("kinit");
+      kinitArgv.add(principal);
+      kinitArgv.add("-k");
+      kinitArgv.add("-t");
+      kinitArgv.add(keyTabFile + ";");
+      kinitArgv.addAll(argv);
+      argv = kinitArgv;
+    } else {
+      // if doAs is not enabled, we pass the principal/keypad to spark-submit in order to
+      // support the possible delegation token renewal in Spark
+      argv.add("--principal");
+      argv.add(principal);
+      argv.add("--keytab");
+      argv.add(keyTabFile);
+    }
+  }
+
+  @Override
+  protected void addNumExecutors(String numOfExecutors) {
+    argv.add("--num-executors");
+    argv.add(numOfExecutors);
+  }
+
+  @Override
+  protected void addExecutorMemory(String executorMemory) {
+    argv.add("--executor-memory");
+    argv.add(executorMemory);
+  }
+
+  @Override
+  protected void addExecutorCores(String executorCores) {
+    argv.add("--executor-cores");
+    argv.add(executorCores);
+  }
+
+  private String getSparkJobCredentialProviderPassword() {
+    if (conf.containsKey("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD")) {
+      return conf.get("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD");
+    } else if (conf.containsKey("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD")) {
+      return conf.get("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD");
+    }
+    return null;
+  }
+
+  @Override
+  protected Future<Void> launchDriver(String isTesting, RpcServer rpcServer, String clientId) throws
+          IOException {
+    Callable<Void> runnable;
+
+    String cmd = Joiner.on(" ").join(argv);
+    LOG.info("Running client driver with argv: {}", cmd);
+    ProcessBuilder pb = new ProcessBuilder("sh", "-c", cmd);
+
+    // Prevent hive configurations from being visible in Spark.
+    pb.environment().remove("HIVE_HOME");
+    pb.environment().remove("HIVE_CONF_DIR");
+    // Add credential provider password to the child process's environment
+    // In case of Spark the credential provider location is provided in the jobConf when the job is submitted
+    String password = getSparkJobCredentialProviderPassword();
+    if(password != null) {
+      pb.environment().put(Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR, password);
+    }
+    if (isTesting != null) {
+      pb.environment().put("SPARK_TESTING", isTesting);
+    }
+
+    final Process child = pb.start();
+    String threadName = Thread.currentThread().getName();
+    final List<String> childErrorLog = Collections.synchronizedList(new ArrayList<String>());
+    final LogRedirector.LogSourceCallback callback = () -> isAlive;
+
+    LogRedirector.redirect("spark-submit-stdout-redir-" + threadName,
+        new LogRedirector(child.getInputStream(), LOG, callback));
+    LogRedirector.redirect("spark-submit-stderr-redir-" + threadName,
+        new LogRedirector(child.getErrorStream(), LOG, childErrorLog, callback));
+
+    runnable = () -> {
+      try {
+        int exitCode = child.waitFor();
+        if (exitCode != 0) {
+          StringBuilder errStr = new StringBuilder();
+          synchronized(childErrorLog) {
+            for (Object aChildErrorLog : childErrorLog) {
+              errStr.append(aChildErrorLog);
+              errStr.append('\n');
+            }
+          }
+
+          LOG.warn("Child process exited with code {}", exitCode);
+          rpcServer.cancelClient(clientId,
+              "Child process (spark-submit) exited before connecting back with error log " + errStr.toString());
+        }
+      } catch (InterruptedException ie) {
+        LOG.warn("Thread waiting on the child process (spark-submit) is interrupted, killing the child process.");
+        rpcServer.cancelClient(clientId, "Thread waiting on the child process (spark-submit) is interrupted");
+        Thread.interrupted();
+        child.destroy();
+      } catch (Exception e) {
+        String errMsg = "Exception while waiting for child process (spark-submit)";
+        LOG.warn(errMsg, e);
+        rpcServer.cancelClient(clientId, errMsg);
+      }
+      return null;
+    };
+
+    FutureTask<Void> futureTask = new FutureTask<>(runnable);
+
+    Thread driverThread = new Thread(futureTask);
+    driverThread.setDaemon(true);
+    driverThread.setName("SparkSubmitMonitor");
+    driverThread.start();
+
+    return futureTask;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java
----------------------------------------------------------------------
diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java
index d6b627b..b81a34b 100644
--- a/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java
+++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java
@@ -32,7 +32,7 @@ import static org.mockito.Mockito.*;
 @RunWith(MockitoJUnitRunner.class)
 public class TestJobHandle {
 
-  @Mock private SparkClientImpl client;
+  @Mock private SparkClient client;
   @Mock private Promise<Serializable> promise;
   @Mock private JobHandle.Listener<Serializable> listener;
   @Mock private JobHandle.Listener<Serializable> listener2;

http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
----------------------------------------------------------------------
diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
index c134625..681463e 100644
--- a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
+++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
@@ -34,7 +34,6 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.InputStream;
-import java.io.PrintStream;
 import java.io.Serializable;
 import java.net.URI;
 import java.nio.file.Paths;
@@ -54,7 +53,7 @@ import com.google.common.base.Strings;
 import com.google.common.io.ByteStreams;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hive.spark.counter.SparkCounters;
-import org.apache.spark.SparkException;
+import org.apache.spark.SparkContext$;
 import org.apache.spark.SparkFiles;
 import org.apache.spark.api.java.JavaFutureAction;
 import org.apache.spark.api.java.JavaRDD;
@@ -72,6 +71,7 @@ public class TestSparkClient {
 
   static {
     HIVECONF.set("hive.spark.client.connect.timeout", "30000ms");
+    HIVECONF.setVar(HiveConf.ConfVars.SPARK_CLIENT_TYPE, HiveConf.HIVE_SPARK_LAUNCHER_CLIENT);
   }
 
   private Map<String, String> createConf() {
@@ -82,6 +82,7 @@ public class TestSparkClient {
     conf.put("spark.app.name", "SparkClientSuite Remote App");
     conf.put("spark.driver.extraClassPath", classpath);
     conf.put("spark.executor.extraClassPath", classpath);
+    conf.put("spark.testing", "true");
 
     if (!Strings.isNullOrEmpty(System.getProperty("spark.home"))) {
       conf.put("spark.home", System.getProperty("spark.home"));
@@ -342,6 +343,26 @@ public class TestSparkClient {
         client.stop();
       }
       SparkClientFactory.stop();
+      waitForSparkContextShutdown();
+    }
+  }
+
+  /**
+   * This was added to avoid a race condition where we try to create multiple SparkContexts in
+   * the same process. Since spark.master = local everything is run in the same JVM. Since we
+   * don't wait for the RemoteDriver to shutdown it's SparkContext, its possible that we finish a
+   * test before the SparkContext has been shutdown. In order to avoid the multiple SparkContexts
+   * in a single JVM exception, we wait for the SparkContext to shutdown after each test.
+   */
+  private void waitForSparkContextShutdown() throws InterruptedException {
+    for (int i = 0; i < 100; i++) {
+      if (SparkContext$.MODULE$.getActive().isEmpty()) {
+        break;
+      }
+      Thread.sleep(100);
+    }
+    if (!SparkContext$.MODULE$.getActive().isEmpty()) {
+      throw new IllegalStateException("SparkContext did not shutdown in time");
     }
   }