You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/10/23 20:47:53 UTC

svn commit: r1633917 [1/2] - in /hive/branches/spark: ./ spark-client/ spark-client/src/ spark-client/src/main/ spark-client/src/main/java/ spark-client/src/main/java/org/ spark-client/src/main/java/org/apache/ spark-client/src/main/java/org/apache/hiv...

Author: xuefu
Date: Thu Oct 23 18:47:52 2014
New Revision: 1633917

URL: http://svn.apache.org/r1633917
Log:
HIVE-8528: Add remote Spark client to Hive [Spark Branch] (Marcelo via Xuefu)

Added:
    hive/branches/spark/spark-client/
    hive/branches/spark/spark-client/pom.xml
    hive/branches/spark/spark-client/src/
    hive/branches/spark/spark-client/src/main/
    hive/branches/spark/spark-client/src/main/java/
    hive/branches/spark/spark-client/src/main/java/org/
    hive/branches/spark/spark-client/src/main/java/org/apache/
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/ClientUtils.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Job.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MonitorCallback.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/DataReadMethod.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
    hive/branches/spark/spark-client/src/test/
    hive/branches/spark/spark-client/src/test/java/
    hive/branches/spark/spark-client/src/test/java/org/
    hive/branches/spark/spark-client/src/test/java/org/apache/
    hive/branches/spark/spark-client/src/test/java/org/apache/hive/
    hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/
    hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/
    hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
    hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
    hive/branches/spark/spark-client/src/test/resources/
    hive/branches/spark/spark-client/src/test/resources/log4j.properties
Modified:
    hive/branches/spark/pom.xml

Modified: hive/branches/spark/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/pom.xml?rev=1633917&r1=1633916&r2=1633917&view=diff
==============================================================================
--- hive/branches/spark/pom.xml (original)
+++ hive/branches/spark/pom.xml Thu Oct 23 18:47:52 2014
@@ -47,6 +47,7 @@
     <module>serde</module>
     <module>service</module>
     <module>shims</module>
+    <module>spark-client</module>
     <module>testutils</module>
     <module>packaging</module>
   </modules>

Added: hive/branches/spark/spark-client/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/pom.xml?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/pom.xml (added)
+++ hive/branches/spark/spark-client/pom.xml Thu Oct 23 18:47:52 2014
@@ -0,0 +1,118 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.hive</groupId>
+    <artifactId>hive</artifactId>
+    <version>0.14.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.hive</groupId>
+  <artifactId>spark-client</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Remote Client</name>
+  <version>0.14.0-SNAPSHOT</version>
+
+  <properties>
+    <hive.path.to.root>..</hive.path.to.root>
+    <scala.binary.version>2.10</scala.binary.version>
+
+    <!-- Should match Spark. -->
+    <akka.group>org.spark-project.akka</akka.group>
+    <akka.version>2.3.4-spark</akka.version>
+    <test.redirectToFile>true</test.redirectToFile>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>${akka.group}</groupId>
+      <artifactId>akka-actor_${scala.binary.version}</artifactId>
+      <version>${akka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <!--
+              Spark depends on Guava 14, while Hive depends on Guava 11. The APIs used by
+              spark-client do not depend on Guava 14, but when running unit tests that
+              trigger Spark jobs, that will trigger the dependency. So, when running tests,
+              make sure Guava 14, and not Guava 11, is on the classpath.
+            -->
+            <id>copy-guava-14</id>
+            <phase>test-compile</phase>
+            <goals>
+              <goal>copy</goal>
+            </goals>
+            <configuration>
+              <artifactItems>
+                <artifactItem>
+                  <groupId>com.google.guava</groupId>
+                  <artifactId>guava</artifactId>
+                  <version>14.0.1</version>
+                </artifactItem>
+              </artifactItems>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <additionalClasspathElements>
+            <!-- Note: wildcards don't work. Thankfully there's just one jar we care about. -->
+            <additionalClasspathElement>${project.build.directory}/dependency/guava-14.0.1.jar</additionalClasspathElement>
+          </additionalClasspathElements>
+          <classpathDependencyExcludes>
+            <classpathDependencyExclude>com.google.guava:guava</classpathDependencyExclude>
+          </classpathDependencyExcludes>
+          <systemPropertyVariables>
+            <java.awt.headless>true</java.awt.headless>
+            <spark.home>${spark.home}</spark.home>
+          </systemPropertyVariables>
+          <redirectTestOutputToFile>${test.redirectToFile}</redirectTestOutputToFile>
+          <useFile>${test.redirectToFile}</useFile>
+          <argLine>-Xmx4096m -XX:MaxPermSize=512m</argLine>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/ClientUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/ClientUtils.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/ClientUtils.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/ClientUtils.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.UUID;
+
+import akka.actor.ActorSystem;
+import akka.actor.ExtendedActorSystem;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ClientUtils {
+
+  private final static Logger LOG = LoggerFactory.getLogger(ClientUtils.class);
+
+  static final String CONF_KEY_SECRET = "spark.client.authentication.secret";
+  static final String CONF_KEY_IN_PROCESS = "spark.client.do_not_use_this.run_driver_in_process";
+  static final String CONF_KEY_SERIALIZER = "spark-remote.akka.serializer";
+
+  /**
+   * Create a new ActorSystem based on the given configuration.
+   *
+   * The akka configs are the same used to configure Akka in Spark.
+   *
+   * @param conf Configuration.
+   * @return 2-tuple (actor system, akka root url)
+   */
+  static ActorSystemInfo createActorSystem(Map<String, String> conf) throws IOException {
+    int akkaThreads = toInt(conf.get("spark.akka.threads"), 4);
+    int akkaBatchSize = toInt(conf.get("spark.akka.batchSize"), 15);
+    int akkaTimeout = toInt(conf.get("spark.akka.timeout"), 100);
+    int akkaFrameSize = toInt(conf.get("spark.akka.frameSize"), 10) * 1024 * 1024;
+    String lifecycleEvents = toBoolean(conf.get("spark.akka.logLifecycleEvents")) ? "on" : "off";
+    String logAkkaConfig = toBoolean(conf.get("spark.akka.logAkkaConfig")) ? "on" : "off";
+
+    int akkaHeartBeatPauses = toInt(conf.get("spark.akka.heartbeat.pauses"), 600);
+    double akkaFailureDetector =
+      toDouble(conf.get("spark.akka.failure-detector.threshold"), 300.0);
+    int akkaHeartBeatInterval = toInt(conf.get("spark.akka.heartbeat.interval"), 1000);
+
+     // Disabled due to chill-akka depending on kryo 2.21, which is incompatible with 2.22
+     // due to packaging changes (relocated org.objenesis classes).
+     // String akkaSerializer = Optional.fromNullable(conf.get(CONF_KEY_SERIALIZER)).or("java");
+     String akkaSerializer = "java";
+
+    String host = findLocalIpAddress();
+    String secret = conf.get(CONF_KEY_SECRET);
+    Preconditions.checkArgument(secret != null, "%s not set.", CONF_KEY_SECRET);
+
+    Map<String, String> sparkConf = Maps.newHashMap();
+    for (Map.Entry<String, String> e : sparkConf.entrySet()) {
+      if (e.getKey().startsWith("akka.")) {
+        sparkConf.put(e.getKey(), e.getValue());
+      }
+    }
+
+    Config fallback = ConfigFactory.parseString(""
+        + "akka.daemonic = on\n"
+        + "akka.loggers = [ \"akka.event.slf4j.Slf4jLogger\" ]\n"
+        + "akka.stdout-loglevel = \"ERROR\"\n"
+        + "akka.jvm-exit-on-fatal-error = off\n"
+        + "akka.actor.default-dispatcher.throughput = " + akkaBatchSize + "\n"
+        + "akka.actor.serializers.java = \"akka.serialization.JavaSerializer\"\n"
+        /* Disabled due to chill-akka depending on kryo 2.21, which is incompatible with 2.22
+           due to packaging changes (relocated org.objenesis classes).
+        + "akka.actor.serializers.kryo = \"com.twitter.chill.akka.AkkaSerializer\"\n"
+        */
+        + String.format(
+              "akka.actor.serialization-bindings = { \"java.io.Serializable\" = \"%s\" }\n",
+              akkaSerializer)
+        + "akka.log-config-on-start = " + logAkkaConfig + "\n"
+        + "akka.log-dead-letters = " + lifecycleEvents + "\n"
+        + "akka.log-dead-letters-during-shutdown = " + lifecycleEvents + "\n"
+        + "akka.actor.provider = \"akka.remote.RemoteActorRefProvider\"\n"
+        + "akka.remote.log-remote-lifecycle-events = " + lifecycleEvents + "\n"
+        + String.format("akka.remote.netty.tcp.connection-timeout = %d s\n", akkaTimeout)
+        + "akka.remote.netty.tcp.execution-pool-size = " + akkaThreads + "\n"
+        + "akka.remote.netty.tcp.hostname = \"" + host + "\"\n"
+        + String.format("akka.remote.netty.tcp.maximum-frame-size = %d B\n", akkaFrameSize)
+        + "akka.remote.netty.tcp.port = 0\n"
+        + "akka.remote.netty.tcp.tcp-nodelay = on\n"
+        + "akka.remote.netty.tcp.transport-class = \"akka.remote.transport.netty.NettyTransport\"\n"
+        + "akka.remote.require-cookie = on\n"
+        + "akka.remote.secure-cookie = \"" + secret + "\"\n"
+        + String.format(
+              "akka.remote.transport-failure-detector.acceptable-heartbeat-pause = %d s\n",
+              akkaHeartBeatPauses)
+        + String.format(
+              "akka.remote.transport-failure-detector.heartbeat-interval = %d s\n",
+              akkaHeartBeatInterval)
+        + "akka.remote.transport-failure-detector.threshold = " +
+            String.valueOf(akkaFailureDetector) + "\n");
+
+    String name = randomName();
+    Config akkaConf = ConfigFactory.parseMap(sparkConf).withFallback(fallback);
+    ActorSystem actorSystem = ActorSystem.create(name, akkaConf);
+    ExtendedActorSystem extActorSystem = (ExtendedActorSystem) actorSystem;
+    int boundPort =
+      ((Integer)extActorSystem.provider().getDefaultAddress().port().get()).intValue();
+    return new ActorSystemInfo(actorSystem,
+        String.format("akka.tcp://%s@%s:%d/user", name, host, boundPort));
+  }
+
+  static String randomName() {
+    return  UUID.randomUUID().toString();
+  }
+
+  private static boolean toBoolean(String value) {
+    return Boolean.parseBoolean(Optional.fromNullable(value).or("false"));
+  }
+
+  private static double toDouble(String value, double defaultValue) {
+    return Double.parseDouble(Optional.fromNullable(value).or(String.valueOf(defaultValue)));
+  }
+
+  private static int toInt(String value, int defaultValue) {
+    return Integer.parseInt(Optional.fromNullable(value).or(String.valueOf(defaultValue)));
+  }
+
+  // Copied from Utils.scala.
+  private static String findLocalIpAddress() throws IOException {
+    String ip = System.getenv("SPARK_LOCAL_IP");
+    if (ip == null) {
+      InetAddress address = InetAddress.getLocalHost();
+      if (address.isLoopbackAddress()) {
+        // Address resolves to something like 127.0.1.1, which happens on Debian; try to find
+        // a better address using the local network interfaces
+        Enumeration<NetworkInterface> ifaces = NetworkInterface.getNetworkInterfaces();
+        while (ifaces.hasMoreElements()) {
+          NetworkInterface ni = ifaces.nextElement();
+          Enumeration<InetAddress> addrs = ni.getInetAddresses();
+          while (addrs.hasMoreElements()) {
+            InetAddress addr = addrs.nextElement();
+            if (!addr.isLinkLocalAddress() &&
+                !addr.isLoopbackAddress() &&
+                addr instanceof Inet4Address) {
+              // We've found an address that looks reasonable!
+              LOG.warn("Your hostname, {}, resolves to a loopback address; using {} " +
+                " instead (on interface {})",
+                address.getHostName(),
+                addr.getHostAddress(),
+                ni.getName());
+              LOG.warn("Set SPARK_LOCAL_IP if you need to bind to another address");
+              return addr.getHostAddress();
+            }
+          }
+        }
+        LOG.warn("Your hostname, {}, resolves to, but we couldn't find any external IP address!",
+          address.getHostName());
+        LOG.warn("Set SPARK_LOCAL_IP if you need to bind to another address");
+      }
+      return address.getHostAddress();
+    }
+    return ip;
+  }
+
+  static class ActorSystemInfo {
+    final ActorSystem system;
+    final String url;
+
+    private ActorSystemInfo(ActorSystem system, String url) {
+      this.system = system;
+      this.url = url;
+    }
+
+  }
+
+}

Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Job.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Job.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Job.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Job.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import java.io.Serializable;
+
+/**
+ * Interface for a Spark remote job.
+ */
+interface Job<T extends Serializable> extends Serializable {
+
+  T call(JobContext jc) throws Exception;
+
+}

Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import org.apache.spark.api.java.JavaFutureAction;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * Holds runtime information about the job execution context.
+ *
+ * An instance of this class is kept on the node hosting a remote Spark context and is made
+ * available to jobs being executed via RemoteSparkContext#submit().
+ */
+public interface JobContext {
+
+  /** The shared SparkContext instance. */
+  JavaSparkContext sc();
+
+  /**
+   * Monitor a job. This allows job-related information (such as metrics) to be communicated
+   * back to the client.
+   *
+   * @return The job (unmodified).
+   */
+  <T> JavaFutureAction<T> monitor(JavaFutureAction<T> job);
+
+}

Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import org.apache.spark.api.java.JavaFutureAction;
+import org.apache.spark.api.java.JavaSparkContext;
+
+class JobContextImpl implements JobContext {
+
+  private final JavaSparkContext sc;
+  private final ThreadLocal<MonitorCallback> monitorCb;
+
+  public JobContextImpl(JavaSparkContext sc) {
+    this.sc = sc;
+    this.monitorCb = new ThreadLocal<MonitorCallback>();
+  }
+
+
+  @Override
+  public JavaSparkContext sc() {
+    return sc;
+  }
+
+  @Override
+  public <T> JavaFutureAction<T> monitor(JavaFutureAction<T> job) {
+    monitorCb.get().call(job);
+    return job;
+  }
+
+  void setMonitorCb(MonitorCallback cb) {
+    monitorCb.set(cb);
+  }
+
+  void stop() {
+    sc.stop();
+  }
+
+}

Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import java.io.Serializable;
+import java.util.concurrent.Future;
+
+/**
+ * A handle to a submitted job. Allows for monitoring and controlling of the running remote job.
+ */
+interface JobHandle<T extends Serializable> extends Future<T> {
+
+  /**
+   * The client job ID. This is unrelated to any Spark jobs that might be triggered by the
+   * submitted job.
+   */
+  String getClientJobId();
+
+  /**
+   * A collection of metrics collected from the Spark jobs triggered by this job.
+   *
+   * To collect job metrics on the client, Spark jobs must be registered with JobContext::monitor()
+   * on the remote end.
+   */
+  MetricsCollection getMetrics();
+
+  // TODO: expose job status?
+
+}

Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import java.io.Serializable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A handle to a submitted job. Allows for monitoring and controlling of the running remote job.
+ */
+class JobHandleImpl<T extends Serializable> implements JobHandle<T> {
+
+  private final SparkClientImpl client;
+  private final String jobId;
+  private final MetricsCollection metrics;
+  private final Object monitor;
+
+  private AtomicBoolean cancelled;
+  private boolean completed;
+  private T result;
+  private Throwable error;
+
+  JobHandleImpl(SparkClientImpl client, String jobId) {
+    this.client = client;
+    this.jobId = jobId;
+    this.monitor = new Object();
+    this.metrics = new MetricsCollection();
+    this.cancelled = new AtomicBoolean();
+    this.completed = false;
+  }
+
+  /** Requests a running job to be cancelled. */
+  @Override
+  public boolean cancel(boolean unused) {
+    if (cancelled.compareAndSet(false, true)) {
+      client.cancel(jobId);
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public T get() throws ExecutionException, InterruptedException {
+    try {
+      return get(-1);
+    } catch (TimeoutException te) {
+      // Shouldn't really happen.
+      throw new ExecutionException(te);
+    }
+  }
+
+  @Override
+  public T get(long timeout, TimeUnit unit)
+      throws ExecutionException, InterruptedException, TimeoutException {
+    return get(unit.toMillis(timeout));
+  }
+
+  @Override
+  public boolean isCancelled() {
+    return cancelled.get();
+  }
+
+  @Override
+  public boolean isDone() {
+    return completed;
+  }
+
+  /**
+   * The client job ID. This is unrelated to any Spark jobs that might be triggered by the
+   * submitted job.
+   */
+  @Override
+  public String getClientJobId() {
+    return jobId;
+  }
+
+  /**
+   * A collection of metrics collected from the Spark jobs triggered by this job.
+   *
+   * To collect job metrics on the client, Spark jobs must be registered with JobContext::monitor()
+   * on the remote end.
+   */
+  @Override
+  public MetricsCollection getMetrics() {
+    return metrics;
+  }
+
+  private T get(long timeout) throws ExecutionException, InterruptedException, TimeoutException {
+    long deadline = System.currentTimeMillis() + timeout;
+    synchronized (monitor) {
+      while (!completed && !cancelled.get()) {
+        if (timeout >= 0) {
+          monitor.wait(timeout);
+        } else {
+          monitor.wait();
+        }
+        if (timeout >= 0 && System.currentTimeMillis() >= deadline) {
+          throw new TimeoutException();
+        }
+      }
+    }
+
+    if (error != null) {
+      throw new ExecutionException(error);
+    }
+
+    return result;
+  }
+
+  // TODO: expose job status?
+
+  @SuppressWarnings("unchecked")
+  void complete(Object result, Throwable error) {
+    if (result != null && error != null) {
+      throw new IllegalArgumentException("Either result or error should be set.");
+    }
+    synchronized (monitor) {
+      this.result = (T) result;
+      this.error = error;
+      this.completed = true;
+      monitor.notifyAll();
+    }
+  }
+
+}

Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.apache.hive.spark.client.metrics.DataReadMethod;
+import org.apache.hive.spark.client.metrics.InputMetrics;
+import org.apache.hive.spark.client.metrics.Metrics;
+import org.apache.hive.spark.client.metrics.ShuffleReadMetrics;
+import org.apache.hive.spark.client.metrics.ShuffleWriteMetrics;
+
+/**
+ * Provides metrics collected for a submitted job.
+ *
+ * The collected metrics can be analysed at different levels of granularity:
+ * - Global (all Spark jobs triggered by client job)
+ * - Spark job
+ * - Stage
+ * - Task
+ *
+ * Only successful, non-speculative tasks are considered. Metrics are updated as tasks finish,
+ * so snapshots can be retrieved before the whole job completes.
+ */
+public class MetricsCollection {
+
+  private final List<TaskInfo> taskMetrics = Lists.newArrayList();
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+  public Metrics getAllMetrics() {
+    return aggregate(Predicates.<TaskInfo>alwaysTrue());
+  }
+
+  public Set<Integer> getJobIds() {
+    Function<TaskInfo, Integer> fun = new Function<TaskInfo, Integer>() {
+      @Override
+      public Integer apply(TaskInfo input) {
+        return input.jobId;
+      }
+    };
+    return transform(Predicates.<TaskInfo>alwaysTrue(), fun);
+  }
+
+  public Metrics getJobMetrics(int jobId) {
+    return aggregate(new JobFilter(jobId));
+  }
+
+  public Set<Integer> getStageIds(int jobId) {
+    Function<TaskInfo, Integer> fun = new Function<TaskInfo, Integer>() {
+      @Override
+      public Integer apply(TaskInfo input) {
+        return input.stageId;
+      }
+    };
+    return transform(new JobFilter(jobId), fun);
+  }
+
+  public Metrics getStageMetrics(final int jobId, final int stageId) {
+    return aggregate(new StageFilter(jobId, stageId));
+  }
+
+  public Set<Long> getTaskIds(int jobId, int stageId) {
+    Function<TaskInfo, Long> fun = new Function<TaskInfo, Long>() {
+      @Override
+      public Long apply(TaskInfo input) {
+        return input.taskId;
+      }
+    };
+    return transform(new StageFilter(jobId, stageId), fun);
+  }
+
+  public Metrics getTaskMetrics(final int jobId, final int stageId, final long taskId) {
+    Predicate<TaskInfo> filter = new Predicate<TaskInfo>() {
+      @Override
+      public boolean apply(TaskInfo input) {
+        return jobId == input.jobId && stageId == input.stageId && taskId == input.taskId;
+      }
+    };
+    lock.readLock().lock();
+    try {
+      Iterator<TaskInfo> it = Collections2.filter(taskMetrics, filter).iterator();
+      if (it.hasNext()) {
+        return it.next().metrics;
+      } else {
+        throw new NoSuchElementException("Task not found.");
+      }
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  void addMetrics(int jobId, int stageId, long taskId, Metrics metrics) {
+    lock.writeLock().lock();
+    try {
+      taskMetrics.add(new TaskInfo(jobId, stageId, taskId, metrics));
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  private <T> Set<T> transform(Predicate<TaskInfo> filter, Function<TaskInfo, T> fun) {
+    lock.readLock().lock();
+    try {
+      Collection<TaskInfo> filtered = Collections2.filter(taskMetrics, filter);
+      return Sets.newHashSet(Collections2.transform(filtered, fun));
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  private Metrics aggregate(Predicate<TaskInfo> filter) {
+    lock.readLock().lock();
+    try {
+      // Task metrics.
+      long executorDeserializeTime = 0L;
+      long executorRunTime = 0L;
+      long resultSize = 0L;
+      long jvmGCTime = 0L;
+      long resultSerializationTime = 0L;
+      long memoryBytesSpilled = 0L;
+      long diskBytesSpilled = 0L;
+
+      // Input metrics.
+      boolean hasInputMetrics = false;
+      DataReadMethod readMethod = null;
+      long bytesRead = 0L;
+
+      // Shuffle read metrics.
+      boolean hasShuffleReadMetrics = false;
+      int remoteBlocksFetched = 0;
+      int localBlocksFetched = 0;
+      long fetchWaitTime = 0L;
+      long remoteBytesRead = 0L;
+
+      // Shuffle write metrics.
+      boolean hasShuffleWriteMetrics = false;
+      long shuffleBytesWritten = 0L;
+      long shuffleWriteTime = 0L;
+
+      for (TaskInfo info : Collections2.filter(taskMetrics, filter)) {
+        Metrics m = info.metrics;
+        executorDeserializeTime += m.executorDeserializeTime;
+        executorRunTime += m.executorRunTime;
+        resultSize += m.resultSize;
+        jvmGCTime += m.jvmGCTime;
+        resultSerializationTime += m.resultSerializationTime;
+        memoryBytesSpilled += m.memoryBytesSpilled;
+        diskBytesSpilled += m.diskBytesSpilled;
+
+        if (m.inputMetrics.isPresent()) {
+          hasInputMetrics = true;
+          InputMetrics im = m.inputMetrics.get();
+          if (readMethod == null) {
+            readMethod = im.readMethod;
+          } else if (readMethod != im.readMethod) {
+            readMethod = DataReadMethod.Multiple;
+          }
+          bytesRead += im.bytesRead;
+        }
+
+        if (m.shuffleReadMetrics.isPresent()) {
+          ShuffleReadMetrics srm = m.shuffleReadMetrics.get();
+          hasShuffleReadMetrics = true;
+          remoteBlocksFetched += srm.remoteBlocksFetched;
+          localBlocksFetched += srm.localBlocksFetched;
+          fetchWaitTime += srm.fetchWaitTime;
+          remoteBytesRead += srm.remoteBytesRead;
+        }
+
+        if (m.shuffleWriteMetrics.isPresent()) {
+          ShuffleWriteMetrics swm = m.shuffleWriteMetrics.get();
+          hasShuffleWriteMetrics = true;
+          shuffleBytesWritten += swm.shuffleBytesWritten;
+          shuffleWriteTime += swm.shuffleWriteTime;
+        }
+      }
+
+      Optional<InputMetrics> inputMetrics = Optional.absent();
+      if (hasInputMetrics) {
+        inputMetrics = Optional.of(new InputMetrics(readMethod, bytesRead));
+      }
+
+      Optional<ShuffleReadMetrics> shuffleReadMetrics = Optional.absent();
+      if (hasShuffleReadMetrics) {
+        shuffleReadMetrics = Optional.of(new ShuffleReadMetrics(
+          remoteBlocksFetched,
+          localBlocksFetched,
+          fetchWaitTime,
+          remoteBytesRead));
+      }
+
+      Optional<ShuffleWriteMetrics> shuffleWriteMetrics = Optional.absent();
+      if (hasShuffleReadMetrics) {
+        shuffleWriteMetrics = Optional.of(new ShuffleWriteMetrics(
+          shuffleBytesWritten,
+          shuffleWriteTime));
+      }
+
+      return new Metrics(
+        executorDeserializeTime,
+        executorRunTime,
+        resultSize,
+        jvmGCTime,
+        resultSerializationTime,
+        memoryBytesSpilled,
+        diskBytesSpilled,
+        inputMetrics,
+        shuffleReadMetrics,
+        shuffleWriteMetrics);
+    } finally {
+        lock.readLock().unlock();
+    }
+  }
+
+  private static class TaskInfo {
+    final int jobId;
+    final int stageId;
+    final long taskId;
+    final Metrics metrics;
+
+    TaskInfo(int jobId, int stageId, long taskId, Metrics metrics) {
+      this.jobId = jobId;
+      this.stageId = stageId;
+      this.taskId = taskId;
+      this.metrics = metrics;
+    }
+
+  }
+
+  private static class JobFilter implements Predicate<TaskInfo> {
+
+    private final int jobId;
+
+    JobFilter(int jobId) {
+      this.jobId = jobId;
+    }
+
+    @Override
+    public boolean apply(TaskInfo input) {
+      return jobId == input.jobId;
+    }
+
+  }
+
+  private static class StageFilter implements Predicate<TaskInfo> {
+
+    private final int jobId;
+    private final int stageId;
+
+    StageFilter(int jobId, int stageId) {
+      this.jobId = jobId;
+      this.stageId = stageId;
+    }
+
+    @Override
+    public boolean apply(TaskInfo input) {
+      return jobId == input.jobId && stageId == input.stageId;
+    }
+
+  }
+
+}

Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MonitorCallback.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MonitorCallback.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MonitorCallback.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MonitorCallback.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import org.apache.spark.api.java.JavaFutureAction;
+
+interface MonitorCallback {
+
+  void call(JavaFutureAction<?> future);
+
+}

Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import java.io.Serializable;
+
+import org.apache.hive.spark.client.metrics.Metrics;
+
+final class Protocol {
+
+  static class CancelJob implements Serializable {
+
+    final String id;
+
+    CancelJob(String id) {
+      this.id = id;
+    }
+
+    CancelJob() {
+      this(null);
+    }
+
+  }
+
+  static class EndSession implements Serializable {
+
+  }
+
+  static class Error implements Serializable {
+
+    final Exception cause;
+
+    Error(Exception cause) {
+      this.cause = cause;
+    }
+
+    Error() {
+      this(null);
+    }
+
+  }
+
+  static class Hello implements Serializable {
+
+    final String remoteUrl;
+
+    Hello(String remoteUrl) {
+      this.remoteUrl = remoteUrl;
+    }
+
+    Hello() {
+      this(null);
+    }
+
+  }
+
+  static class JobMetrics implements Serializable {
+
+    final String jobId;
+    final int sparkJobId;
+    final int stageId;
+    final long taskId;
+    final Metrics metrics;
+
+    JobMetrics(String jobId, int sparkJobId, int stageId, long taskId, Metrics metrics) {
+      this.jobId = jobId;
+      this.sparkJobId = sparkJobId;
+      this.stageId = stageId;
+      this.taskId = taskId;
+      this.metrics = metrics;
+    }
+
+    JobMetrics() {
+      this(null, -1, -1, -1, null);
+    }
+
+  }
+
+  static class JobRequest<T extends Serializable> implements Serializable {
+
+    final String id;
+    final Job<T> job;
+
+    JobRequest(String id, Job<T> job) {
+      this.id = id;
+      this.job = job;
+    }
+
+    JobRequest() {
+      this(null, null);
+    }
+
+  }
+
+  static class JobResult<T extends Serializable> implements Serializable {
+
+    final String id;
+    final T result;
+    final Throwable error;
+
+    JobResult(String id, T result, Throwable error) {
+      this.id = id;
+      this.result = result;
+      this.error = error;
+    }
+
+    JobResult() {
+      this(null, null, null);
+    }
+
+  }
+
+}

Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.Tuple2;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.remote.DisassociatedEvent;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.spark.SparkConf;
+import org.apache.spark.scheduler.*;
+import org.apache.spark.api.java.JavaFutureAction;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hive.spark.client.metrics.Metrics;
+
+/**
+ * Driver code for the Spark client library.
+ */
+public class RemoteDriver {
+
+  private final static Logger LOG = LoggerFactory.getLogger(RemoteDriver.class);
+
+  private final Map<String, JobWrapper<?>> activeJobs;
+  private final Object shutdownLock;
+  private final ActorSystem system;
+  private final ActorRef actor;
+  private final ActorSelection client;
+  private final ExecutorService executor;
+  private final JobContextImpl jc;
+
+  private boolean running;
+
+  private RemoteDriver(String[] args) throws Exception {
+    this.activeJobs = Maps.newConcurrentMap();
+    this.shutdownLock = new Object();
+
+    SparkConf conf = new SparkConf();
+    String remote = null;
+    for (int idx = 0; idx < args.length; idx += 2) {
+      String key = args[idx];
+      if (key.equals("--remote")) {
+        remote = getArg(args, idx);
+      } else if (key.equals("--secret")) {
+        conf.set(ClientUtils.CONF_KEY_SECRET, getArg(args, idx));
+      } else if (key.equals("--conf")) {
+        String[] val = getArg(args, idx).split("[=]", 2);
+        conf.set(val[0], val[1]);
+      } else {
+        throw new IllegalArgumentException("Invalid command line: " +
+            Joiner.on(" ").join(args));
+      }
+    }
+
+    executor = Executors.newCachedThreadPool();
+
+    LOG.info("Connecting to: {}", remote);
+
+    Map<String, String> mapConf = Maps.newHashMap();
+    for (Tuple2<String, String> e : conf.getAll()) {
+      mapConf.put(e._1(), e._2());
+    }
+
+    ClientUtils.ActorSystemInfo info = ClientUtils.createActorSystem(mapConf);
+    this.system = info.system;
+    this.actor = system.actorOf(Props.create(ServerActor.class, this), "RemoteDriver");
+    this.client = system.actorSelection(remote);
+
+    try {
+      JavaSparkContext sc = new JavaSparkContext(conf);
+      sc.sc().addSparkListener(new ClientListener());
+      jc = new JobContextImpl(sc);
+    } catch (Exception e) {
+      LOG.error("Failed to start SparkContext.", e);
+      shutdown(new Protocol.Error(e));
+      throw e;
+    }
+
+    client.tell(new Protocol.Hello(info.url + "/RemoteDriver"), actor);
+    running = true;
+  }
+
+  private void run() throws InterruptedException {
+    synchronized (shutdownLock) {
+      while (running) {
+        shutdownLock.wait();
+      }
+    }
+    executor.shutdownNow();
+  }
+
+  private synchronized void shutdown(Object msg) {
+    if (running) {
+      LOG.info("Shutting down remote driver.");
+      running = false;
+      for (JobWrapper<?> job : activeJobs.values()) {
+        cancelJob(job);
+      }
+
+      if (msg != null) {
+        client.tell(msg, actor);
+      }
+      if (jc != null) {
+        jc.stop();
+      }
+      system.shutdown();
+      synchronized (shutdownLock) {
+        shutdownLock.notifyAll();
+      }
+    }
+  }
+
+  private boolean cancelJob(JobWrapper<?> job) {
+    boolean cancelled = false;
+    for (JavaFutureAction<?> action : job.jobs) {
+      cancelled |= action.cancel(true);
+    }
+    return cancelled | job.future.cancel(true);
+  }
+
+  private String getArg(String[] args, int keyIdx) {
+    int valIdx = keyIdx + 1;
+    if (args.length <= valIdx) {
+      throw new IllegalArgumentException("Invalid command line: " +
+          Joiner.on(" ").join(args));
+    }
+    return args[valIdx];
+  }
+
+  private class ServerActor extends UntypedActor {
+
+    @Override
+    public void onReceive(Object message) throws Exception {
+      if (message instanceof Protocol.CancelJob) {
+        Protocol.CancelJob cj = (Protocol.CancelJob) message;
+        JobWrapper<?> job = activeJobs.get(cj.id);
+        if (job == null || !cancelJob(job)) {
+          LOG.info("Requested to cancel an already finished job.");
+        }
+      } else if (message instanceof DisassociatedEvent) {
+        LOG.debug("Shutting down due to DisassociatedEvent.");
+        shutdown(null);
+      } else if (message instanceof Protocol.EndSession) {
+        LOG.debug("Shutting down due to EndSession request.");
+        shutdown(null);
+      } else if (message instanceof Protocol.JobRequest) {
+        Protocol.JobRequest req = (Protocol.JobRequest) message;
+        LOG.info("Received job request {}", req.id);
+        JobWrapper<?> wrapper = new JobWrapper<Serializable>(req);
+        activeJobs.put(req.id, wrapper);
+        wrapper.submit();
+      }
+    }
+
+  }
+
+  private class JobWrapper<T extends Serializable> implements Callable<Void> {
+
+    private final Protocol.JobRequest<T> req;
+    private final List<JavaFutureAction<?>> jobs;
+    private final AtomicInteger completed;
+
+    private Future<?> future;
+
+    JobWrapper(Protocol.JobRequest<T> req) {
+      this.req = req;
+      this.jobs = Lists.newArrayList();
+      this.completed = new AtomicInteger();
+    }
+
+    @Override
+    public Void call() throws Exception {
+      try {
+        jc.setMonitorCb(new MonitorCallback() {
+          @Override
+          public void call(JavaFutureAction<?> future) {
+            monitorJob(future);
+          }
+        });
+
+        T result = req.job.call(jc);
+        synchronized (completed) {
+          while (completed.get() != jobs.size()) {
+            LOG.debug("Client job {} finished, {} of {} Spark jobs finished.",
+                req.id, completed.get(), jobs.size());
+            completed.wait();
+          }
+        }
+        client.tell(new Protocol.JobResult(req.id, result, null), actor);
+      } catch (Throwable t) {
+          // Catch throwables in a best-effort to report job status back to the client. It's
+          // re-thrown so that the executor can destroy the affected thread (or the JVM can
+          // die or whatever would happen if the throwable bubbled up).
+          client.tell(new Protocol.JobResult(req.id, null, t), actor);
+          throw new ExecutionException(t);
+      } finally {
+        jc.setMonitorCb(null);
+        activeJobs.remove(req.id);
+      }
+      return null;
+    }
+
+    void submit() {
+      this.future = executor.submit(this);
+    }
+
+    void jobDone() {
+      synchronized (completed) {
+        completed.incrementAndGet();
+        completed.notifyAll();
+      }
+    }
+
+    private void monitorJob(JavaFutureAction<?> job) {
+      jobs.add(job);
+    }
+
+  }
+
+  private class ClientListener implements SparkListener {
+
+    private final Map<Integer, Integer> stageToJobId = Maps.newHashMap();
+
+    @Override
+    public void onJobStart(SparkListenerJobStart jobStart) {
+      // TODO: are stage IDs unique? Otherwise this won't work.
+      synchronized (stageToJobId) {
+        for (int i = 0; i < jobStart.stageIds().length(); i++) {
+          stageToJobId.put((Integer) jobStart.stageIds().apply(i), jobStart.jobId());
+        }
+      }
+    }
+
+    @Override
+    public void onJobEnd(SparkListenerJobEnd jobEnd) {
+      synchronized (stageToJobId) {
+        for (Iterator<Map.Entry<Integer, Integer>> it = stageToJobId.entrySet().iterator();
+            it.hasNext(); ) {
+          Map.Entry<Integer, Integer> e = it.next();
+          if (e.getValue() == jobEnd.jobId()) {
+            it.remove();
+          }
+        }
+      }
+
+      String clientId = getClientId(jobEnd.jobId());
+      if (clientId != null) {
+        activeJobs.get(clientId).jobDone();
+      }
+    }
+
+    @Override
+    public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
+      if (taskEnd.reason() instanceof org.apache.spark.Success$ &&
+          !taskEnd.taskInfo().speculative()) {
+        Metrics metrics = new Metrics(taskEnd.taskMetrics());
+        Integer jobId;
+        synchronized (stageToJobId) {
+          jobId = stageToJobId.get(taskEnd.stageId());
+        }
+
+        // TODO: implement implicit AsyncRDDActions conversion instead of jc.monitor()?
+        // TODO: how to handle stage failures?
+
+        String clientId = getClientId(jobId);
+        if (clientId != null) {
+          client.tell(new Protocol.JobMetrics(clientId, jobId, taskEnd.stageId(),
+              taskEnd.taskInfo().taskId(), metrics), actor);
+        }
+      }
+    }
+
+    @Override
+    public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { }
+
+    @Override
+    public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { }
+
+    @Override
+    public void onTaskStart(SparkListenerTaskStart taskStart) { }
+
+    @Override
+    public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { }
+
+    @Override
+    public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { }
+
+    @Override
+    public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { }
+
+    @Override
+    public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { }
+
+    @Override
+    public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { }
+
+    @Override
+    public void onApplicationStart(SparkListenerApplicationStart applicationStart) { }
+
+    @Override
+    public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { }
+
+    @Override
+    public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { }
+
+    /**
+     * Returns the client job ID for the given Spark job ID.
+     *
+     * This will only work for jobs monitored via JobContext#monitor(). Other jobs won't be
+     * matched, and this method will return `None`.
+     */
+    private String getClientId(Integer jobId) {
+      for (Map.Entry<String, JobWrapper<?>> e : activeJobs.entrySet()) {
+        for (JavaFutureAction<?> future : e.getValue().jobs) {
+          if (future.jobIds().contains(jobId)) {
+            return e.getKey();
+          }
+        }
+      }
+      return null;
+    }
+
+  }
+
+  public static void main(String[] args) throws Exception {
+    new RemoteDriver(args).run();
+  }
+
+}
+

Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import java.net.URL;
+import java.io.Serializable;
+import java.util.concurrent.Future;
+
+/**
+ * Defines the API for the Spark remote client.
+ */
+public interface SparkClient {
+
+  /**
+   * Submits a job for asynchronous execution.
+   *
+   * @param job The job to execute.
+   * @return A handle that be used to monitor the job.
+   */
+  <T extends Serializable> JobHandle<T> submit(Job<T> job);
+
+  /**
+   * Stops the remote context.
+   *
+   * Any pending jobs will be cancelled, and the remote context will be torn down.
+   */
+  void stop();
+
+  /**
+   * Adds a jar file to the running remote context.
+   *
+   * Note that the URL should be reachable by the Spark driver process. If running the driver
+   * in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist
+   * on that node (and not on the client machine).
+   *
+   * @param url The location of the jar file.
+   * @return A future that can be used to monitor the operation.
+   */
+  Future<?> addJar(URL url);
+
+  /**
+   * Adds a file to the running remote context.
+   *
+   * Note that the URL should be reachable by the Spark driver process. If running the driver
+   * in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist
+   * on that node (and not on the client machine).
+   *
+   * @param url The location of the file.
+   * @return A future that can be used to monitor the operation.
+   */
+  Future<?> addFile(URL url);
+
+}

Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import java.io.IOException;
+import java.util.Map;
+
+import akka.actor.ActorSystem;
+import com.google.common.collect.Maps;
+import org.apache.spark.SparkException;
+
+/**
+ * Factory for SparkClient instances.
+ */
+public final class SparkClientFactory {
+
+  static ActorSystem actorSystem = null;
+  static String akkaUrl = null;
+  static String secret = null;
+
+  private static boolean initialized = false;
+
+  /**
+   * Initializes the SparkClient library. Must be called before creating client instances.
+   *
+   * @param conf Map containing configuration parameters for the client.
+   */
+  public static synchronized void initialize(Map<String, String> conf) throws IOException {
+    secret = akka.util.Crypt.generateSecureCookie();
+
+    Map<String, String> akkaConf = Maps.newHashMap(conf);
+    akkaConf.put(ClientUtils.CONF_KEY_SECRET, secret);
+
+    ClientUtils.ActorSystemInfo info = ClientUtils.createActorSystem(akkaConf);
+    actorSystem = info.system;
+    akkaUrl = info.url;
+    initialized = true;
+  }
+
+  /** Stops the SparkClient library. */
+  public static synchronized void stop() {
+    if (initialized) {
+      actorSystem.shutdown();
+      actorSystem = null;
+      akkaUrl = null;
+      secret = null;
+      initialized = false;
+    }
+  }
+
+  /**
+   * Instantiates a new Spark client.
+   *
+   * @param conf Configuration for the remote Spark application.
+   */
+  public static synchronized SparkClient createClient(Map<String, String> conf)
+      throws IOException, SparkException {
+    if (!initialized) {
+      throw new IllegalStateException("Library is not initialized. Call initialize() first.");
+    }
+    return new SparkClientImpl(conf);
+  }
+
+}

Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.io.Writer;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.spark.SparkContext;
+import org.apache.spark.SparkException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class SparkClientImpl implements SparkClient {
+
+  private final static Logger LOG = LoggerFactory.getLogger(SparkClientImpl.class);
+
+  private final Map<String, String> conf;
+  private final AtomicInteger childIdGenerator;
+  private final String name;
+  private final ActorRef clientRef;
+  private final Thread driverThread;
+  private final Map<String, JobHandleImpl<?>> jobs;
+
+  private volatile ActorSelection remoteRef;
+
+  SparkClientImpl(Map<String, String> conf) throws IOException, SparkException {
+    this.conf = conf;
+    this.childIdGenerator = new AtomicInteger();
+    this.name = "SparkClient-" + ClientUtils.randomName();
+    this.clientRef = bind(Props.create(ClientActor.class, this), name);
+    this.jobs = Maps.newConcurrentMap();
+    this.driverThread = startDriver();
+
+    long connectTimeout = Integer.parseInt(
+        Optional.fromNullable(conf.get("spark.client.connectTimeout")).or("10")) * 1000;
+    long endTime = System.currentTimeMillis() + connectTimeout;
+
+    synchronized (this) {
+      while (remoteRef == null) {
+        try {
+          wait(connectTimeout);
+        } catch (InterruptedException ie) {
+          throw new SparkException("Interrupted.", ie);
+        }
+
+        connectTimeout = endTime - System.currentTimeMillis();
+        if (remoteRef == null && connectTimeout <= 0) {
+          throw new SparkException("Timed out waiting for remote driver to connect.");
+        }
+      }
+    }
+  }
+
+  @Override
+  public <T extends Serializable> JobHandle<T> submit(Job<T> job) {
+    String jobId = ClientUtils.randomName();
+    remoteRef.tell(new Protocol.JobRequest(jobId, job), clientRef);
+
+    JobHandleImpl<T> handle = new JobHandleImpl<T>(this, jobId);
+    jobs.put(jobId, handle);
+    return handle;
+  }
+
+  @Override
+  public void stop() {
+    if (remoteRef != null) {
+      LOG.info("Sending EndSession to remote actor.");
+      remoteRef.tell(new Protocol.EndSession(), clientRef);
+    }
+    unbind(clientRef);
+    try {
+      driverThread.join(); // TODO: timeout?
+    } catch (InterruptedException ie) {
+      LOG.debug("Interrupted before driver thread was finished.");
+    }
+  }
+
+  @Override
+  public Future<?> addJar(URL url) {
+    return submit(new AddJarJob(url.toString()));
+  }
+
+  @Override
+  public Future<?> addFile(URL url) {
+    return submit(new AddFileJob(url.toString()));
+  }
+
+  void cancel(String jobId) {
+    remoteRef.tell(new Protocol.CancelJob(jobId), clientRef);
+  }
+
+  private Thread startDriver() throws IOException {
+    Runnable runnable;
+    if (conf.containsKey(ClientUtils.CONF_KEY_IN_PROCESS)) {
+      // Mostly for testing things quickly. Do not do this in production.
+      LOG.warn("!!!! Running remote driver in-process. !!!!");
+      runnable = new Runnable() {
+        @Override
+        public void run() {
+          List<String> args = Lists.newArrayList();
+          args.add("--remote");
+          args.add(String.format("%s/%s", SparkClientFactory.akkaUrl, name));
+          args.add("--secret");
+          args.add(SparkClientFactory.secret);
+
+          for (Map.Entry<String, String> e : conf.entrySet()) {
+            args.add("--conf");
+            args.add(String.format("%s=%s", e.getKey(), e.getValue()));
+          }
+          try {
+            RemoteDriver.main(args.toArray(new String[args.size()]));
+          } catch (Exception e) {
+            LOG.error("Error running driver.", e);
+          }
+        }
+      };
+    } else {
+      // Create a file with all the job properties to be read by spark-submit. Change the
+      // file's permissions so that only the owner can read it. This avoid having the
+      // connection secret show up in the child process's command line.
+      File properties = File.createTempFile("spark-submit.", ".properties");
+      if (!properties.setReadable(false) || !properties.setReadable(true, true)) {
+        throw new IOException("Cannot change permissions of job properties file.");
+      }
+
+      Properties allProps = new Properties();
+      for (Map.Entry<String, String> e : conf.entrySet()) {
+        allProps.put(e.getKey(), e.getValue());
+      }
+      allProps.put(ClientUtils.CONF_KEY_SECRET, SparkClientFactory.secret);
+
+      Writer writer = new OutputStreamWriter(new FileOutputStream(properties), Charsets.UTF_8);
+      try {
+        allProps.store(writer, "Spark Context configuration");
+      } finally {
+        writer.close();
+      }
+
+      // Define how to pass options to the child process. If launching in client (or local)
+      // mode, the driver options need to be passed directly on the command line. Otherwise,
+      // SparkSubmit will take care of that for us.
+      String master = conf.get("spark.master");
+      Preconditions.checkArgument(master != null, "spark.master is not defined.");
+
+      List<String> argv = Lists.newArrayList();
+
+      // If a Spark installation is provided, use the spark-submit script. Otherwise, call the
+      // SparkSubmit class directly, which has some caveats (like having to provide a proper
+      // version of Guava on the classpath depending on the deploy mode).
+      if (conf.get("spark.home") != null) {
+        argv.add(new File(conf.get("spark.home"), "bin/spark-submit").getAbsolutePath());
+      } else {
+        LOG.info("No spark.home provided, calling SparkSubmit directly.");
+        argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath());
+
+        if (master.startsWith("local") || master.startsWith("mesos") || master.endsWith("-client")) {
+          String mem = conf.get("spark.driver.memory");
+          if (mem != null) {
+            argv.add("-Xms" + mem);
+            argv.add("-Xmx" + mem);
+          }
+
+          String cp = conf.get("spark.driver.extraClassPath");
+          if (cp != null) {
+            argv.add("-classpath");
+            argv.add(cp);
+          }
+
+          String libPath = conf.get("spark.driver.extraLibPath");
+          if (libPath != null) {
+            argv.add("-Djava.library.path=" + libPath);
+          }
+
+          String extra = conf.get("spark.driver.extraJavaOptions");
+          if (extra != null) {
+            for (String opt : extra.split("[ ]")) {
+              if (!opt.trim().isEmpty()) {
+                argv.add(opt.trim());
+              }
+            }
+          }
+        }
+
+        argv.add("org.apache.spark.deploy.SparkSubmit");
+      }
+
+
+      argv.add("--properties-file");
+      argv.add(properties.getAbsolutePath());
+      argv.add("--class");
+      argv.add(RemoteDriver.class.getName());
+
+      String jar = "spark-internal";
+      if (SparkContext.jarOfClass(this.getClass()).isDefined()) {
+        jar = SparkContext.jarOfClass(this.getClass()).get();
+      }
+      argv.add(jar);
+
+
+      argv.add("--remote");
+      argv.add(String.format("%s/%s", SparkClientFactory.akkaUrl, name));
+
+      LOG.debug("Running client driver with argv: {}", Joiner.on(" ").join(argv));
+
+      ProcessBuilder pb = new ProcessBuilder(argv.toArray(new String[argv.size()]));
+      pb.environment().clear();
+      final Process child = pb.start();
+
+      int childId = childIdGenerator.incrementAndGet();
+      redirect("stdout-redir-" + childId, child.getInputStream(), System.out);
+      redirect("stderr-redir-" + childId, child.getErrorStream(), System.err);
+
+      runnable = new Runnable() {
+        @Override
+        public void run() {
+          try {
+            int exitCode = child.waitFor();
+            if (exitCode != 0) {
+              LOG.warn("Child process exited with code {}.", exitCode);
+            }
+          } catch (Exception e) {
+            LOG.warn("Exception while waiting for child process.", e);
+          }
+        }
+      };
+    }
+
+    Thread thread = new Thread(runnable);
+    thread.setDaemon(true);
+    thread.setName("Driver");
+    thread.start();
+    return thread;
+  }
+
+  private void redirect(String name, InputStream in, OutputStream out) {
+    Thread thread = new Thread(new Redirector(in, out));
+    thread.setName(name);
+    thread.setDaemon(true);
+    thread.start();
+  }
+
+  private ActorRef bind(Props props, String name) {
+    return SparkClientFactory.actorSystem.actorOf(props, name);
+  }
+
+  private void unbind(ActorRef actor) {
+    SparkClientFactory.actorSystem.stop(actor);
+  }
+
+  private ActorSelection select(String url) {
+    return SparkClientFactory.actorSystem.actorSelection(url);
+  }
+
+  private class ClientActor extends UntypedActor {
+
+    @Override
+    public void onReceive(Object message) throws Exception {
+      if (message instanceof Protocol.Error) {
+        Protocol.Error e = (Protocol.Error) message;
+        LOG.error("Error report from remote driver.", e.cause);
+      } else if (message instanceof Protocol.Hello) {
+        Protocol.Hello hello = (Protocol.Hello) message;
+        LOG.info("Received hello from {}", hello.remoteUrl);
+        remoteRef = select(hello.remoteUrl);
+        synchronized (SparkClientImpl.this) {
+          SparkClientImpl.this.notifyAll();
+        }
+      } else if (message instanceof Protocol.JobMetrics) {
+        Protocol.JobMetrics jm = (Protocol.JobMetrics) message;
+        JobHandleImpl<?> handle = jobs.get(jm.jobId);
+        if (handle != null) {
+          handle.getMetrics().addMetrics(jm.sparkJobId, jm.stageId, jm.taskId, jm.metrics);
+        } else {
+          LOG.warn("Received metrics for unknown job {}", jm.jobId);
+        }
+      } else if (message instanceof Protocol.JobResult) {
+        Protocol.JobResult jr = (Protocol.JobResult) message;
+        JobHandleImpl<?> handle = jobs.remove(jr.id);
+        if (handle != null) {
+          LOG.info("Received result for {}", jr.id);
+          handle.complete(jr.result, jr.error);
+        } else {
+          LOG.warn("Received result for unknown job {}", jr.id);
+        }
+      }
+    }
+
+  }
+
+  private class Redirector implements Runnable {
+
+    private final InputStream in;
+    private final OutputStream out;
+
+    Redirector(InputStream in, OutputStream out) {
+      this.in = in;
+      this.out = out;
+    }
+
+    @Override
+    public void run() {
+      try {
+        byte[] buf = new byte[1024];
+        int len = in.read(buf);
+        while (len != -1) {
+          out.write(buf, 0, len);
+          out.flush();
+          len = in.read(buf);
+        }
+      } catch (Exception e) {
+        LOG.warn("Error in redirector thread.", e);
+      }
+    }
+
+  }
+
+  private static class AddJarJob implements Job<Serializable> {
+
+    private final String path;
+
+    AddJarJob(String path) {
+      this.path = path;
+    }
+
+    @Override
+    public Serializable call(JobContext jc) throws Exception {
+      jc.sc().addJar(path);
+      return null;
+    }
+
+  }
+
+  private static class AddFileJob implements Job<Serializable> {
+
+    private final String path;
+
+    AddFileJob(String path) {
+      this.path = path;
+    }
+
+    @Override
+    public Serializable call(JobContext jc) throws Exception {
+      jc.sc().addFile(path);
+      return null;
+    }
+
+  }
+
+}

Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/DataReadMethod.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/DataReadMethod.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/DataReadMethod.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/DataReadMethod.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client.metrics;
+
+/**
+ * Method by which input data was read. Network means that the data was read over the network
+ * from a remote block manager (which may have stored the data on-disk or in-memory).
+ */
+public enum DataReadMethod {
+  Memory, Disk, Hadoop, Network, Multiple
+}

Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client.metrics;
+
+import java.io.Serializable;
+
+import org.apache.spark.executor.TaskMetrics;
+
+/**
+ * Metrics pertaining to reading input data.
+ */
+public class InputMetrics implements Serializable {
+
+  public final DataReadMethod readMethod;
+  public final long bytesRead;
+
+  public InputMetrics(
+      DataReadMethod readMethod,
+      long bytesRead) {
+    this.readMethod = readMethod;
+    this.bytesRead = bytesRead;
+  }
+
+  public InputMetrics(TaskMetrics metrics) {
+    this(DataReadMethod.valueOf(metrics.inputMetrics().get().readMethod().toString()),
+      metrics.inputMetrics().get().bytesRead());
+  }
+
+}

Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client.metrics;
+
+import java.io.Serializable;
+
+import com.google.common.base.Optional;
+import org.apache.spark.executor.TaskMetrics;
+
+/**
+ * Metrics tracked during the execution of a job.
+ *
+ * Depending on how the metrics object is obtained (by calling methods in the `MetricsCollection`
+ * class), metrics will refer to one or more tasks.
+ */
+public class Metrics implements Serializable {
+
+  /** Time taken on the executor to deserialize tasks. */
+  public final long executorDeserializeTime;
+  /** Time the executor spends actually running the task (including fetching shuffle data). */
+  public final long executorRunTime;
+  /** The number of bytes sent back to the driver by tasks. */
+  public final long resultSize;
+  /** Amount of time the JVM spent in garbage collection while executing tasks. */
+  public final long jvmGCTime;
+  /** Amount of time spent serializing task results. */
+  public final long resultSerializationTime;
+  /** The number of in-memory bytes spilled by tasks. */
+  public final long memoryBytesSpilled;
+  /** The number of on-disk bytes spilled by tasks. */
+  public final long diskBytesSpilled;
+  /** If tasks read from a HadoopRDD or from persisted data, metrics on how much data was read. */
+  public final Optional<InputMetrics> inputMetrics;
+  /**
+   * If tasks read from shuffle output, metrics on getting shuffle data. This includes read metrics
+   * aggregated over all the tasks' shuffle dependencies.
+   */
+  public final Optional<ShuffleReadMetrics> shuffleReadMetrics;
+  /** If tasks wrote to shuffle output, metrics on the written shuffle data. */
+  public final Optional<ShuffleWriteMetrics> shuffleWriteMetrics;
+
+  public Metrics(
+      long executorDeserializeTime,
+      long executorRunTime,
+      long resultSize,
+      long jvmGCTime,
+      long resultSerializationTime,
+      long memoryBytesSpilled,
+      long diskBytesSpilled,
+      Optional<InputMetrics> inputMetrics,
+      Optional<ShuffleReadMetrics> shuffleReadMetrics,
+      Optional<ShuffleWriteMetrics> shuffleWriteMetrics) {
+    this.executorDeserializeTime = executorDeserializeTime;
+    this.executorRunTime = executorRunTime;
+    this.resultSize = resultSize;
+    this.jvmGCTime = jvmGCTime;
+    this.resultSerializationTime = resultSerializationTime;
+    this.memoryBytesSpilled = memoryBytesSpilled;
+    this.diskBytesSpilled = diskBytesSpilled;
+    this.inputMetrics = inputMetrics;
+    this.shuffleReadMetrics = shuffleReadMetrics;
+    this.shuffleWriteMetrics = shuffleWriteMetrics;
+  }
+
+  public Metrics(TaskMetrics metrics) {
+    this(
+      metrics.executorDeserializeTime(),
+      metrics.executorRunTime(),
+      metrics.resultSize(),
+      metrics.jvmGCTime(),
+      metrics.resultSerializationTime(),
+      metrics.memoryBytesSpilled(),
+      metrics.diskBytesSpilled(),
+      optionalInputMetric(metrics),
+      optionalShuffleReadMetric(metrics),
+      optionalShuffleWriteMetrics(metrics));
+  }
+
+  private static final Optional<InputMetrics> optionalInputMetric(TaskMetrics metrics) {
+    return metrics.inputMetrics().isDefined()
+      ? Optional.of(new InputMetrics(metrics))
+      : Optional.<InputMetrics>absent();
+  }
+
+  private static final Optional<ShuffleReadMetrics>
+      optionalShuffleReadMetric(TaskMetrics metrics) {
+    return metrics.shuffleReadMetrics().isDefined()
+      ? Optional.of(new ShuffleReadMetrics(metrics))
+      : Optional.<ShuffleReadMetrics>absent();
+  }
+
+  private static final Optional<ShuffleWriteMetrics>
+      optionalShuffleWriteMetrics(TaskMetrics metrics) {
+    return metrics.shuffleWriteMetrics().isDefined()
+      ? Optional.of(new ShuffleWriteMetrics(metrics))
+      : Optional.<ShuffleWriteMetrics>absent();
+  }
+
+}