You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/10/23 20:47:53 UTC
svn commit: r1633917 [1/2] - in /hive/branches/spark: ./ spark-client/
spark-client/src/ spark-client/src/main/ spark-client/src/main/java/
spark-client/src/main/java/org/ spark-client/src/main/java/org/apache/
spark-client/src/main/java/org/apache/hiv...
Author: xuefu
Date: Thu Oct 23 18:47:52 2014
New Revision: 1633917
URL: http://svn.apache.org/r1633917
Log:
HIVE-8528: Add remote Spark client to Hive [Spark Branch] (Marcelo via Xuefu)
Added:
hive/branches/spark/spark-client/
hive/branches/spark/spark-client/pom.xml
hive/branches/spark/spark-client/src/
hive/branches/spark/spark-client/src/main/
hive/branches/spark/spark-client/src/main/java/
hive/branches/spark/spark-client/src/main/java/org/
hive/branches/spark/spark-client/src/main/java/org/apache/
hive/branches/spark/spark-client/src/main/java/org/apache/hive/
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/ClientUtils.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Job.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MonitorCallback.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/DataReadMethod.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
hive/branches/spark/spark-client/src/test/
hive/branches/spark/spark-client/src/test/java/
hive/branches/spark/spark-client/src/test/java/org/
hive/branches/spark/spark-client/src/test/java/org/apache/
hive/branches/spark/spark-client/src/test/java/org/apache/hive/
hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/
hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/
hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
hive/branches/spark/spark-client/src/test/resources/
hive/branches/spark/spark-client/src/test/resources/log4j.properties
Modified:
hive/branches/spark/pom.xml
Modified: hive/branches/spark/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/pom.xml?rev=1633917&r1=1633916&r2=1633917&view=diff
==============================================================================
--- hive/branches/spark/pom.xml (original)
+++ hive/branches/spark/pom.xml Thu Oct 23 18:47:52 2014
@@ -47,6 +47,7 @@
<module>serde</module>
<module>service</module>
<module>shims</module>
+ <module>spark-client</module>
<module>testutils</module>
<module>packaging</module>
</modules>
Added: hive/branches/spark/spark-client/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/pom.xml?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/pom.xml (added)
+++ hive/branches/spark/spark-client/pom.xml Thu Oct 23 18:47:52 2014
@@ -0,0 +1,118 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive</artifactId>
+ <version>0.14.0-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.apache.hive</groupId>
+ <artifactId>spark-client</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Remote Client</name>
+ <version>0.14.0-SNAPSHOT</version>
+
+ <properties>
+ <hive.path.to.root>..</hive.path.to.root>
+ <scala.binary.version>2.10</scala.binary.version>
+
+ <!-- Should match Spark. -->
+ <akka.group>org.spark-project.akka</akka.group>
+ <akka.version>2.3.4-spark</akka.version>
+ <test.redirectToFile>true</test.redirectToFile>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>${akka.group}</groupId>
+ <artifactId>akka-actor_${scala.binary.version}</artifactId>
+ <version>${akka.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <!--
+ Spark depends on Guava 14, while Hive depends on Guava 11. The APIs used by
+ spark-client do not depend on Guava 14, but when running unit tests that
+ trigger Spark jobs, that will trigger the dependency. So, when running tests,
+ make sure Guava 14, and not Guava 11, is on the classpath.
+ -->
+ <id>copy-guava-14</id>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>14.0.1</version>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <additionalClasspathElements>
+ <!-- Note: wildcards don't work. Thankfully there's just one jar we care about. -->
+ <additionalClasspathElement>${project.build.directory}/dependency/guava-14.0.1.jar</additionalClasspathElement>
+ </additionalClasspathElements>
+ <classpathDependencyExcludes>
+ <classpathDependencyExclude>com.google.guava:guava</classpathDependencyExclude>
+ </classpathDependencyExcludes>
+ <systemPropertyVariables>
+ <java.awt.headless>true</java.awt.headless>
+ <spark.home>${spark.home}</spark.home>
+ </systemPropertyVariables>
+ <redirectTestOutputToFile>${test.redirectToFile}</redirectTestOutputToFile>
+ <useFile>${test.redirectToFile}</useFile>
+ <argLine>-Xmx4096m -XX:MaxPermSize=512m</argLine>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/ClientUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/ClientUtils.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/ClientUtils.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/ClientUtils.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.UUID;
+
+import akka.actor.ActorSystem;
+import akka.actor.ExtendedActorSystem;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ClientUtils {
+
+ private final static Logger LOG = LoggerFactory.getLogger(ClientUtils.class);
+
+ static final String CONF_KEY_SECRET = "spark.client.authentication.secret";
+ static final String CONF_KEY_IN_PROCESS = "spark.client.do_not_use_this.run_driver_in_process";
+ static final String CONF_KEY_SERIALIZER = "spark-remote.akka.serializer";
+
+ /**
+ * Create a new ActorSystem based on the given configuration.
+ *
+ * The akka configs are the same used to configure Akka in Spark.
+ *
+ * @param conf Configuration.
+ * @return 2-tuple (actor system, akka root url)
+ */
+ static ActorSystemInfo createActorSystem(Map<String, String> conf) throws IOException {
+ int akkaThreads = toInt(conf.get("spark.akka.threads"), 4);
+ int akkaBatchSize = toInt(conf.get("spark.akka.batchSize"), 15);
+ int akkaTimeout = toInt(conf.get("spark.akka.timeout"), 100);
+ int akkaFrameSize = toInt(conf.get("spark.akka.frameSize"), 10) * 1024 * 1024;
+ String lifecycleEvents = toBoolean(conf.get("spark.akka.logLifecycleEvents")) ? "on" : "off";
+ String logAkkaConfig = toBoolean(conf.get("spark.akka.logAkkaConfig")) ? "on" : "off";
+
+ int akkaHeartBeatPauses = toInt(conf.get("spark.akka.heartbeat.pauses"), 600);
+ double akkaFailureDetector =
+ toDouble(conf.get("spark.akka.failure-detector.threshold"), 300.0);
+ int akkaHeartBeatInterval = toInt(conf.get("spark.akka.heartbeat.interval"), 1000);
+
+ // Disabled due to chill-akka depending on kryo 2.21, which is incompatible with 2.22
+ // due to packaging changes (relocated org.objenesis classes).
+ // String akkaSerializer = Optional.fromNullable(conf.get(CONF_KEY_SERIALIZER)).or("java");
+ String akkaSerializer = "java";
+
+ String host = findLocalIpAddress();
+ String secret = conf.get(CONF_KEY_SECRET);
+ Preconditions.checkArgument(secret != null, "%s not set.", CONF_KEY_SECRET);
+
+ Map<String, String> sparkConf = Maps.newHashMap();
+ for (Map.Entry<String, String> e : sparkConf.entrySet()) {
+ if (e.getKey().startsWith("akka.")) {
+ sparkConf.put(e.getKey(), e.getValue());
+ }
+ }
+
+ Config fallback = ConfigFactory.parseString(""
+ + "akka.daemonic = on\n"
+ + "akka.loggers = [ \"akka.event.slf4j.Slf4jLogger\" ]\n"
+ + "akka.stdout-loglevel = \"ERROR\"\n"
+ + "akka.jvm-exit-on-fatal-error = off\n"
+ + "akka.actor.default-dispatcher.throughput = " + akkaBatchSize + "\n"
+ + "akka.actor.serializers.java = \"akka.serialization.JavaSerializer\"\n"
+ /* Disabled due to chill-akka depending on kryo 2.21, which is incompatible with 2.22
+ due to packaging changes (relocated org.objenesis classes).
+ + "akka.actor.serializers.kryo = \"com.twitter.chill.akka.AkkaSerializer\"\n"
+ */
+ + String.format(
+ "akka.actor.serialization-bindings = { \"java.io.Serializable\" = \"%s\" }\n",
+ akkaSerializer)
+ + "akka.log-config-on-start = " + logAkkaConfig + "\n"
+ + "akka.log-dead-letters = " + lifecycleEvents + "\n"
+ + "akka.log-dead-letters-during-shutdown = " + lifecycleEvents + "\n"
+ + "akka.actor.provider = \"akka.remote.RemoteActorRefProvider\"\n"
+ + "akka.remote.log-remote-lifecycle-events = " + lifecycleEvents + "\n"
+ + String.format("akka.remote.netty.tcp.connection-timeout = %d s\n", akkaTimeout)
+ + "akka.remote.netty.tcp.execution-pool-size = " + akkaThreads + "\n"
+ + "akka.remote.netty.tcp.hostname = \"" + host + "\"\n"
+ + String.format("akka.remote.netty.tcp.maximum-frame-size = %d B\n", akkaFrameSize)
+ + "akka.remote.netty.tcp.port = 0\n"
+ + "akka.remote.netty.tcp.tcp-nodelay = on\n"
+ + "akka.remote.netty.tcp.transport-class = \"akka.remote.transport.netty.NettyTransport\"\n"
+ + "akka.remote.require-cookie = on\n"
+ + "akka.remote.secure-cookie = \"" + secret + "\"\n"
+ + String.format(
+ "akka.remote.transport-failure-detector.acceptable-heartbeat-pause = %d s\n",
+ akkaHeartBeatPauses)
+ + String.format(
+ "akka.remote.transport-failure-detector.heartbeat-interval = %d s\n",
+ akkaHeartBeatInterval)
+ + "akka.remote.transport-failure-detector.threshold = " +
+ String.valueOf(akkaFailureDetector) + "\n");
+
+ String name = randomName();
+ Config akkaConf = ConfigFactory.parseMap(sparkConf).withFallback(fallback);
+ ActorSystem actorSystem = ActorSystem.create(name, akkaConf);
+ ExtendedActorSystem extActorSystem = (ExtendedActorSystem) actorSystem;
+ int boundPort =
+ ((Integer)extActorSystem.provider().getDefaultAddress().port().get()).intValue();
+ return new ActorSystemInfo(actorSystem,
+ String.format("akka.tcp://%s@%s:%d/user", name, host, boundPort));
+ }
+
+ static String randomName() {
+ return UUID.randomUUID().toString();
+ }
+
+ private static boolean toBoolean(String value) {
+ return Boolean.parseBoolean(Optional.fromNullable(value).or("false"));
+ }
+
+ private static double toDouble(String value, double defaultValue) {
+ return Double.parseDouble(Optional.fromNullable(value).or(String.valueOf(defaultValue)));
+ }
+
+ private static int toInt(String value, int defaultValue) {
+ return Integer.parseInt(Optional.fromNullable(value).or(String.valueOf(defaultValue)));
+ }
+
+ // Copied from Utils.scala.
+ private static String findLocalIpAddress() throws IOException {
+ String ip = System.getenv("SPARK_LOCAL_IP");
+ if (ip == null) {
+ InetAddress address = InetAddress.getLocalHost();
+ if (address.isLoopbackAddress()) {
+ // Address resolves to something like 127.0.1.1, which happens on Debian; try to find
+ // a better address using the local network interfaces
+ Enumeration<NetworkInterface> ifaces = NetworkInterface.getNetworkInterfaces();
+ while (ifaces.hasMoreElements()) {
+ NetworkInterface ni = ifaces.nextElement();
+ Enumeration<InetAddress> addrs = ni.getInetAddresses();
+ while (addrs.hasMoreElements()) {
+ InetAddress addr = addrs.nextElement();
+ if (!addr.isLinkLocalAddress() &&
+ !addr.isLoopbackAddress() &&
+ addr instanceof Inet4Address) {
+ // We've found an address that looks reasonable!
+ LOG.warn("Your hostname, {}, resolves to a loopback address; using {} " +
+ " instead (on interface {})",
+ address.getHostName(),
+ addr.getHostAddress(),
+ ni.getName());
+ LOG.warn("Set SPARK_LOCAL_IP if you need to bind to another address");
+ return addr.getHostAddress();
+ }
+ }
+ }
+ LOG.warn("Your hostname, {}, resolves to, but we couldn't find any external IP address!",
+ address.getHostName());
+ LOG.warn("Set SPARK_LOCAL_IP if you need to bind to another address");
+ }
+ return address.getHostAddress();
+ }
+ return ip;
+ }
+
+ static class ActorSystemInfo {
+ final ActorSystem system;
+ final String url;
+
+ private ActorSystemInfo(ActorSystem system, String url) {
+ this.system = system;
+ this.url = url;
+ }
+
+ }
+
+}
Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Job.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Job.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Job.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Job.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import java.io.Serializable;
+
+/**
+ * Interface for a Spark remote job.
+ */
+interface Job<T extends Serializable> extends Serializable {
+
+ T call(JobContext jc) throws Exception;
+
+}
Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import org.apache.spark.api.java.JavaFutureAction;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * Holds runtime information about the job execution context.
+ *
+ * An instance of this class is kept on the node hosting a remote Spark context and is made
+ * available to jobs being executed via RemoteSparkContext#submit().
+ */
+public interface JobContext {
+
+ /** The shared SparkContext instance. */
+ JavaSparkContext sc();
+
+ /**
+ * Monitor a job. This allows job-related information (such as metrics) to be communicated
+ * back to the client.
+ *
+ * @return The job (unmodified).
+ */
+ <T> JavaFutureAction<T> monitor(JavaFutureAction<T> job);
+
+}
Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import org.apache.spark.api.java.JavaFutureAction;
+import org.apache.spark.api.java.JavaSparkContext;
+
+class JobContextImpl implements JobContext {
+
+ private final JavaSparkContext sc;
+ private final ThreadLocal<MonitorCallback> monitorCb;
+
+ public JobContextImpl(JavaSparkContext sc) {
+ this.sc = sc;
+ this.monitorCb = new ThreadLocal<MonitorCallback>();
+ }
+
+
+ @Override
+ public JavaSparkContext sc() {
+ return sc;
+ }
+
+ @Override
+ public <T> JavaFutureAction<T> monitor(JavaFutureAction<T> job) {
+ monitorCb.get().call(job);
+ return job;
+ }
+
+ void setMonitorCb(MonitorCallback cb) {
+ monitorCb.set(cb);
+ }
+
+ void stop() {
+ sc.stop();
+ }
+
+}
Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import java.io.Serializable;
+import java.util.concurrent.Future;
+
+/**
+ * A handle to a submitted job. Allows for monitoring and controlling of the running remote job.
+ */
+interface JobHandle<T extends Serializable> extends Future<T> {
+
+ /**
+ * The client job ID. This is unrelated to any Spark jobs that might be triggered by the
+ * submitted job.
+ */
+ String getClientJobId();
+
+ /**
+ * A collection of metrics collected from the Spark jobs triggered by this job.
+ *
+ * To collect job metrics on the client, Spark jobs must be registered with JobContext::monitor()
+ * on the remote end.
+ */
+ MetricsCollection getMetrics();
+
+ // TODO: expose job status?
+
+}
Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import java.io.Serializable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A handle to a submitted job. Allows for monitoring and controlling of the running remote job.
+ */
+class JobHandleImpl<T extends Serializable> implements JobHandle<T> {
+
+ private final SparkClientImpl client;
+ private final String jobId;
+ private final MetricsCollection metrics;
+ private final Object monitor;
+
+ private AtomicBoolean cancelled;
+ private boolean completed;
+ private T result;
+ private Throwable error;
+
+ JobHandleImpl(SparkClientImpl client, String jobId) {
+ this.client = client;
+ this.jobId = jobId;
+ this.monitor = new Object();
+ this.metrics = new MetricsCollection();
+ this.cancelled = new AtomicBoolean();
+ this.completed = false;
+ }
+
+ /** Requests a running job to be cancelled. */
+ @Override
+ public boolean cancel(boolean unused) {
+ if (cancelled.compareAndSet(false, true)) {
+ client.cancel(jobId);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public T get() throws ExecutionException, InterruptedException {
+ try {
+ return get(-1);
+ } catch (TimeoutException te) {
+ // Shouldn't really happen.
+ throw new ExecutionException(te);
+ }
+ }
+
+ @Override
+ public T get(long timeout, TimeUnit unit)
+ throws ExecutionException, InterruptedException, TimeoutException {
+ return get(unit.toMillis(timeout));
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return cancelled.get();
+ }
+
+ @Override
+ public boolean isDone() {
+ return completed;
+ }
+
+ /**
+ * The client job ID. This is unrelated to any Spark jobs that might be triggered by the
+ * submitted job.
+ */
+ @Override
+ public String getClientJobId() {
+ return jobId;
+ }
+
+ /**
+ * A collection of metrics collected from the Spark jobs triggered by this job.
+ *
+ * To collect job metrics on the client, Spark jobs must be registered with JobContext::monitor()
+ * on the remote end.
+ */
+ @Override
+ public MetricsCollection getMetrics() {
+ return metrics;
+ }
+
+ private T get(long timeout) throws ExecutionException, InterruptedException, TimeoutException {
+ long deadline = System.currentTimeMillis() + timeout;
+ synchronized (monitor) {
+ while (!completed && !cancelled.get()) {
+ if (timeout >= 0) {
+ monitor.wait(timeout);
+ } else {
+ monitor.wait();
+ }
+ if (timeout >= 0 && System.currentTimeMillis() >= deadline) {
+ throw new TimeoutException();
+ }
+ }
+ }
+
+ if (error != null) {
+ throw new ExecutionException(error);
+ }
+
+ return result;
+ }
+
+ // TODO: expose job status?
+
+ @SuppressWarnings("unchecked")
+ void complete(Object result, Throwable error) {
+ if (result != null && error != null) {
+ throw new IllegalArgumentException("Either result or error should be set.");
+ }
+ synchronized (monitor) {
+ this.result = (T) result;
+ this.error = error;
+ this.completed = true;
+ monitor.notifyAll();
+ }
+ }
+
+}
Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.apache.hive.spark.client.metrics.DataReadMethod;
+import org.apache.hive.spark.client.metrics.InputMetrics;
+import org.apache.hive.spark.client.metrics.Metrics;
+import org.apache.hive.spark.client.metrics.ShuffleReadMetrics;
+import org.apache.hive.spark.client.metrics.ShuffleWriteMetrics;
+
+/**
+ * Provides metrics collected for a submitted job.
+ *
+ * The collected metrics can be analysed at different levels of granularity:
+ * - Global (all Spark jobs triggered by client job)
+ * - Spark job
+ * - Stage
+ * - Task
+ *
+ * Only successful, non-speculative tasks are considered. Metrics are updated as tasks finish,
+ * so snapshots can be retrieved before the whole job completes.
+ */
+public class MetricsCollection {
+
+ private final List<TaskInfo> taskMetrics = Lists.newArrayList();
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+ public Metrics getAllMetrics() {
+ return aggregate(Predicates.<TaskInfo>alwaysTrue());
+ }
+
+ public Set<Integer> getJobIds() {
+ Function<TaskInfo, Integer> fun = new Function<TaskInfo, Integer>() {
+ @Override
+ public Integer apply(TaskInfo input) {
+ return input.jobId;
+ }
+ };
+ return transform(Predicates.<TaskInfo>alwaysTrue(), fun);
+ }
+
+ public Metrics getJobMetrics(int jobId) {
+ return aggregate(new JobFilter(jobId));
+ }
+
+ public Set<Integer> getStageIds(int jobId) {
+ Function<TaskInfo, Integer> fun = new Function<TaskInfo, Integer>() {
+ @Override
+ public Integer apply(TaskInfo input) {
+ return input.stageId;
+ }
+ };
+ return transform(new JobFilter(jobId), fun);
+ }
+
+ public Metrics getStageMetrics(final int jobId, final int stageId) {
+ return aggregate(new StageFilter(jobId, stageId));
+ }
+
+ public Set<Long> getTaskIds(int jobId, int stageId) {
+ Function<TaskInfo, Long> fun = new Function<TaskInfo, Long>() {
+ @Override
+ public Long apply(TaskInfo input) {
+ return input.taskId;
+ }
+ };
+ return transform(new StageFilter(jobId, stageId), fun);
+ }
+
+ public Metrics getTaskMetrics(final int jobId, final int stageId, final long taskId) {
+ Predicate<TaskInfo> filter = new Predicate<TaskInfo>() {
+ @Override
+ public boolean apply(TaskInfo input) {
+ return jobId == input.jobId && stageId == input.stageId && taskId == input.taskId;
+ }
+ };
+ lock.readLock().lock();
+ try {
+ Iterator<TaskInfo> it = Collections2.filter(taskMetrics, filter).iterator();
+ if (it.hasNext()) {
+ return it.next().metrics;
+ } else {
+ throw new NoSuchElementException("Task not found.");
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ void addMetrics(int jobId, int stageId, long taskId, Metrics metrics) {
+ lock.writeLock().lock();
+ try {
+ taskMetrics.add(new TaskInfo(jobId, stageId, taskId, metrics));
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private <T> Set<T> transform(Predicate<TaskInfo> filter, Function<TaskInfo, T> fun) {
+ lock.readLock().lock();
+ try {
+ Collection<TaskInfo> filtered = Collections2.filter(taskMetrics, filter);
+ return Sets.newHashSet(Collections2.transform(filtered, fun));
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ private Metrics aggregate(Predicate<TaskInfo> filter) {
+ lock.readLock().lock();
+ try {
+ // Task metrics.
+ long executorDeserializeTime = 0L;
+ long executorRunTime = 0L;
+ long resultSize = 0L;
+ long jvmGCTime = 0L;
+ long resultSerializationTime = 0L;
+ long memoryBytesSpilled = 0L;
+ long diskBytesSpilled = 0L;
+
+ // Input metrics.
+ boolean hasInputMetrics = false;
+ DataReadMethod readMethod = null;
+ long bytesRead = 0L;
+
+ // Shuffle read metrics.
+ boolean hasShuffleReadMetrics = false;
+ int remoteBlocksFetched = 0;
+ int localBlocksFetched = 0;
+ long fetchWaitTime = 0L;
+ long remoteBytesRead = 0L;
+
+ // Shuffle write metrics.
+ boolean hasShuffleWriteMetrics = false;
+ long shuffleBytesWritten = 0L;
+ long shuffleWriteTime = 0L;
+
+ for (TaskInfo info : Collections2.filter(taskMetrics, filter)) {
+ Metrics m = info.metrics;
+ executorDeserializeTime += m.executorDeserializeTime;
+ executorRunTime += m.executorRunTime;
+ resultSize += m.resultSize;
+ jvmGCTime += m.jvmGCTime;
+ resultSerializationTime += m.resultSerializationTime;
+ memoryBytesSpilled += m.memoryBytesSpilled;
+ diskBytesSpilled += m.diskBytesSpilled;
+
+ if (m.inputMetrics.isPresent()) {
+ hasInputMetrics = true;
+ InputMetrics im = m.inputMetrics.get();
+ if (readMethod == null) {
+ readMethod = im.readMethod;
+ } else if (readMethod != im.readMethod) {
+ readMethod = DataReadMethod.Multiple;
+ }
+ bytesRead += im.bytesRead;
+ }
+
+ if (m.shuffleReadMetrics.isPresent()) {
+ ShuffleReadMetrics srm = m.shuffleReadMetrics.get();
+ hasShuffleReadMetrics = true;
+ remoteBlocksFetched += srm.remoteBlocksFetched;
+ localBlocksFetched += srm.localBlocksFetched;
+ fetchWaitTime += srm.fetchWaitTime;
+ remoteBytesRead += srm.remoteBytesRead;
+ }
+
+ if (m.shuffleWriteMetrics.isPresent()) {
+ ShuffleWriteMetrics swm = m.shuffleWriteMetrics.get();
+ hasShuffleWriteMetrics = true;
+ shuffleBytesWritten += swm.shuffleBytesWritten;
+ shuffleWriteTime += swm.shuffleWriteTime;
+ }
+ }
+
+ Optional<InputMetrics> inputMetrics = Optional.absent();
+ if (hasInputMetrics) {
+ inputMetrics = Optional.of(new InputMetrics(readMethod, bytesRead));
+ }
+
+ Optional<ShuffleReadMetrics> shuffleReadMetrics = Optional.absent();
+ if (hasShuffleReadMetrics) {
+ shuffleReadMetrics = Optional.of(new ShuffleReadMetrics(
+ remoteBlocksFetched,
+ localBlocksFetched,
+ fetchWaitTime,
+ remoteBytesRead));
+ }
+
+ Optional<ShuffleWriteMetrics> shuffleWriteMetrics = Optional.absent();
+ if (hasShuffleReadMetrics) {
+ shuffleWriteMetrics = Optional.of(new ShuffleWriteMetrics(
+ shuffleBytesWritten,
+ shuffleWriteTime));
+ }
+
+ return new Metrics(
+ executorDeserializeTime,
+ executorRunTime,
+ resultSize,
+ jvmGCTime,
+ resultSerializationTime,
+ memoryBytesSpilled,
+ diskBytesSpilled,
+ inputMetrics,
+ shuffleReadMetrics,
+ shuffleWriteMetrics);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ private static class TaskInfo {
+ final int jobId;
+ final int stageId;
+ final long taskId;
+ final Metrics metrics;
+
+ TaskInfo(int jobId, int stageId, long taskId, Metrics metrics) {
+ this.jobId = jobId;
+ this.stageId = stageId;
+ this.taskId = taskId;
+ this.metrics = metrics;
+ }
+
+ }
+
+ private static class JobFilter implements Predicate<TaskInfo> {
+
+ private final int jobId;
+
+ JobFilter(int jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public boolean apply(TaskInfo input) {
+ return jobId == input.jobId;
+ }
+
+ }
+
+ private static class StageFilter implements Predicate<TaskInfo> {
+
+ private final int jobId;
+ private final int stageId;
+
+ StageFilter(int jobId, int stageId) {
+ this.jobId = jobId;
+ this.stageId = stageId;
+ }
+
+ @Override
+ public boolean apply(TaskInfo input) {
+ return jobId == input.jobId && stageId == input.stageId;
+ }
+
+ }
+
+}
Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MonitorCallback.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MonitorCallback.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MonitorCallback.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MonitorCallback.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import org.apache.spark.api.java.JavaFutureAction;
+
+interface MonitorCallback {
+
+ void call(JavaFutureAction<?> future);
+
+}
Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import java.io.Serializable;
+
+import org.apache.hive.spark.client.metrics.Metrics;
+
+final class Protocol {
+
+ static class CancelJob implements Serializable {
+
+ final String id;
+
+ CancelJob(String id) {
+ this.id = id;
+ }
+
+ CancelJob() {
+ this(null);
+ }
+
+ }
+
+ static class EndSession implements Serializable {
+
+ }
+
+ static class Error implements Serializable {
+
+ final Exception cause;
+
+ Error(Exception cause) {
+ this.cause = cause;
+ }
+
+ Error() {
+ this(null);
+ }
+
+ }
+
+ static class Hello implements Serializable {
+
+ final String remoteUrl;
+
+ Hello(String remoteUrl) {
+ this.remoteUrl = remoteUrl;
+ }
+
+ Hello() {
+ this(null);
+ }
+
+ }
+
+ static class JobMetrics implements Serializable {
+
+ final String jobId;
+ final int sparkJobId;
+ final int stageId;
+ final long taskId;
+ final Metrics metrics;
+
+ JobMetrics(String jobId, int sparkJobId, int stageId, long taskId, Metrics metrics) {
+ this.jobId = jobId;
+ this.sparkJobId = sparkJobId;
+ this.stageId = stageId;
+ this.taskId = taskId;
+ this.metrics = metrics;
+ }
+
+ JobMetrics() {
+ this(null, -1, -1, -1, null);
+ }
+
+ }
+
+ static class JobRequest<T extends Serializable> implements Serializable {
+
+ final String id;
+ final Job<T> job;
+
+ JobRequest(String id, Job<T> job) {
+ this.id = id;
+ this.job = job;
+ }
+
+ JobRequest() {
+ this(null, null);
+ }
+
+ }
+
+ static class JobResult<T extends Serializable> implements Serializable {
+
+ final String id;
+ final T result;
+ final Throwable error;
+
+ JobResult(String id, T result, Throwable error) {
+ this.id = id;
+ this.result = result;
+ this.error = error;
+ }
+
+ JobResult() {
+ this(null, null, null);
+ }
+
+ }
+
+}
Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.Tuple2;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.remote.DisassociatedEvent;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.spark.SparkConf;
+import org.apache.spark.scheduler.*;
+import org.apache.spark.api.java.JavaFutureAction;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hive.spark.client.metrics.Metrics;
+
+/**
+ * Driver code for the Spark client library.
+ */
+public class RemoteDriver {
+
+ private final static Logger LOG = LoggerFactory.getLogger(RemoteDriver.class);
+
+ private final Map<String, JobWrapper<?>> activeJobs;
+ private final Object shutdownLock;
+ private final ActorSystem system;
+ private final ActorRef actor;
+ private final ActorSelection client;
+ private final ExecutorService executor;
+ private final JobContextImpl jc;
+
+ private boolean running;
+
+ private RemoteDriver(String[] args) throws Exception {
+ this.activeJobs = Maps.newConcurrentMap();
+ this.shutdownLock = new Object();
+
+ SparkConf conf = new SparkConf();
+ String remote = null;
+ for (int idx = 0; idx < args.length; idx += 2) {
+ String key = args[idx];
+ if (key.equals("--remote")) {
+ remote = getArg(args, idx);
+ } else if (key.equals("--secret")) {
+ conf.set(ClientUtils.CONF_KEY_SECRET, getArg(args, idx));
+ } else if (key.equals("--conf")) {
+ String[] val = getArg(args, idx).split("[=]", 2);
+ conf.set(val[0], val[1]);
+ } else {
+ throw new IllegalArgumentException("Invalid command line: " +
+ Joiner.on(" ").join(args));
+ }
+ }
+
+ executor = Executors.newCachedThreadPool();
+
+ LOG.info("Connecting to: {}", remote);
+
+ Map<String, String> mapConf = Maps.newHashMap();
+ for (Tuple2<String, String> e : conf.getAll()) {
+ mapConf.put(e._1(), e._2());
+ }
+
+ ClientUtils.ActorSystemInfo info = ClientUtils.createActorSystem(mapConf);
+ this.system = info.system;
+ this.actor = system.actorOf(Props.create(ServerActor.class, this), "RemoteDriver");
+ this.client = system.actorSelection(remote);
+
+ try {
+ JavaSparkContext sc = new JavaSparkContext(conf);
+ sc.sc().addSparkListener(new ClientListener());
+ jc = new JobContextImpl(sc);
+ } catch (Exception e) {
+ LOG.error("Failed to start SparkContext.", e);
+ shutdown(new Protocol.Error(e));
+ throw e;
+ }
+
+ client.tell(new Protocol.Hello(info.url + "/RemoteDriver"), actor);
+ running = true;
+ }
+
+ private void run() throws InterruptedException {
+ synchronized (shutdownLock) {
+ while (running) {
+ shutdownLock.wait();
+ }
+ }
+ executor.shutdownNow();
+ }
+
+ private synchronized void shutdown(Object msg) {
+ if (running) {
+ LOG.info("Shutting down remote driver.");
+ running = false;
+ for (JobWrapper<?> job : activeJobs.values()) {
+ cancelJob(job);
+ }
+
+ if (msg != null) {
+ client.tell(msg, actor);
+ }
+ if (jc != null) {
+ jc.stop();
+ }
+ system.shutdown();
+ synchronized (shutdownLock) {
+ shutdownLock.notifyAll();
+ }
+ }
+ }
+
+ private boolean cancelJob(JobWrapper<?> job) {
+ boolean cancelled = false;
+ for (JavaFutureAction<?> action : job.jobs) {
+ cancelled |= action.cancel(true);
+ }
+ return cancelled | job.future.cancel(true);
+ }
+
+ private String getArg(String[] args, int keyIdx) {
+ int valIdx = keyIdx + 1;
+ if (args.length <= valIdx) {
+ throw new IllegalArgumentException("Invalid command line: " +
+ Joiner.on(" ").join(args));
+ }
+ return args[valIdx];
+ }
+
+ private class ServerActor extends UntypedActor {
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+ if (message instanceof Protocol.CancelJob) {
+ Protocol.CancelJob cj = (Protocol.CancelJob) message;
+ JobWrapper<?> job = activeJobs.get(cj.id);
+ if (job == null || !cancelJob(job)) {
+ LOG.info("Requested to cancel an already finished job.");
+ }
+ } else if (message instanceof DisassociatedEvent) {
+ LOG.debug("Shutting down due to DisassociatedEvent.");
+ shutdown(null);
+ } else if (message instanceof Protocol.EndSession) {
+ LOG.debug("Shutting down due to EndSession request.");
+ shutdown(null);
+ } else if (message instanceof Protocol.JobRequest) {
+ Protocol.JobRequest req = (Protocol.JobRequest) message;
+ LOG.info("Received job request {}", req.id);
+ JobWrapper<?> wrapper = new JobWrapper<Serializable>(req);
+ activeJobs.put(req.id, wrapper);
+ wrapper.submit();
+ }
+ }
+
+ }
+
+ private class JobWrapper<T extends Serializable> implements Callable<Void> {
+
+ private final Protocol.JobRequest<T> req;
+ private final List<JavaFutureAction<?>> jobs;
+ private final AtomicInteger completed;
+
+ private Future<?> future;
+
+ JobWrapper(Protocol.JobRequest<T> req) {
+ this.req = req;
+ this.jobs = Lists.newArrayList();
+ this.completed = new AtomicInteger();
+ }
+
+ @Override
+ public Void call() throws Exception {
+ try {
+ jc.setMonitorCb(new MonitorCallback() {
+ @Override
+ public void call(JavaFutureAction<?> future) {
+ monitorJob(future);
+ }
+ });
+
+ T result = req.job.call(jc);
+ synchronized (completed) {
+ while (completed.get() != jobs.size()) {
+ LOG.debug("Client job {} finished, {} of {} Spark jobs finished.",
+ req.id, completed.get(), jobs.size());
+ completed.wait();
+ }
+ }
+ client.tell(new Protocol.JobResult(req.id, result, null), actor);
+ } catch (Throwable t) {
+ // Catch throwables in a best-effort to report job status back to the client. It's
+ // re-thrown so that the executor can destroy the affected thread (or the JVM can
+ // die or whatever would happen if the throwable bubbled up).
+ client.tell(new Protocol.JobResult(req.id, null, t), actor);
+ throw new ExecutionException(t);
+ } finally {
+ jc.setMonitorCb(null);
+ activeJobs.remove(req.id);
+ }
+ return null;
+ }
+
+ void submit() {
+ this.future = executor.submit(this);
+ }
+
+ void jobDone() {
+ synchronized (completed) {
+ completed.incrementAndGet();
+ completed.notifyAll();
+ }
+ }
+
+ private void monitorJob(JavaFutureAction<?> job) {
+ jobs.add(job);
+ }
+
+ }
+
+ private class ClientListener implements SparkListener {
+
+ private final Map<Integer, Integer> stageToJobId = Maps.newHashMap();
+
+ @Override
+ public void onJobStart(SparkListenerJobStart jobStart) {
+ // TODO: are stage IDs unique? Otherwise this won't work.
+ synchronized (stageToJobId) {
+ for (int i = 0; i < jobStart.stageIds().length(); i++) {
+ stageToJobId.put((Integer) jobStart.stageIds().apply(i), jobStart.jobId());
+ }
+ }
+ }
+
+ @Override
+ public void onJobEnd(SparkListenerJobEnd jobEnd) {
+ synchronized (stageToJobId) {
+ for (Iterator<Map.Entry<Integer, Integer>> it = stageToJobId.entrySet().iterator();
+ it.hasNext(); ) {
+ Map.Entry<Integer, Integer> e = it.next();
+ if (e.getValue() == jobEnd.jobId()) {
+ it.remove();
+ }
+ }
+ }
+
+ String clientId = getClientId(jobEnd.jobId());
+ if (clientId != null) {
+ activeJobs.get(clientId).jobDone();
+ }
+ }
+
+ @Override
+ public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
+ if (taskEnd.reason() instanceof org.apache.spark.Success$ &&
+ !taskEnd.taskInfo().speculative()) {
+ Metrics metrics = new Metrics(taskEnd.taskMetrics());
+ Integer jobId;
+ synchronized (stageToJobId) {
+ jobId = stageToJobId.get(taskEnd.stageId());
+ }
+
+ // TODO: implement implicit AsyncRDDActions conversion instead of jc.monitor()?
+ // TODO: how to handle stage failures?
+
+ String clientId = getClientId(jobId);
+ if (clientId != null) {
+ client.tell(new Protocol.JobMetrics(clientId, jobId, taskEnd.stageId(),
+ taskEnd.taskInfo().taskId(), metrics), actor);
+ }
+ }
+ }
+
+ @Override
+ public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { }
+
+ @Override
+ public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { }
+
+ @Override
+ public void onTaskStart(SparkListenerTaskStart taskStart) { }
+
+ @Override
+ public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { }
+
+ @Override
+ public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { }
+
+ @Override
+ public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { }
+
+ @Override
+ public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { }
+
+ @Override
+ public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { }
+
+ @Override
+ public void onApplicationStart(SparkListenerApplicationStart applicationStart) { }
+
+ @Override
+ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { }
+
+ @Override
+ public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { }
+
+ /**
+ * Returns the client job ID for the given Spark job ID.
+ *
+ * This will only work for jobs monitored via JobContext#monitor(). Other jobs won't be
+ * matched, and this method will return `None`.
+ */
+ private String getClientId(Integer jobId) {
+ for (Map.Entry<String, JobWrapper<?>> e : activeJobs.entrySet()) {
+ for (JavaFutureAction<?> future : e.getValue().jobs) {
+ if (future.jobIds().contains(jobId)) {
+ return e.getKey();
+ }
+ }
+ }
+ return null;
+ }
+
+ }
+
+ public static void main(String[] args) throws Exception {
+ new RemoteDriver(args).run();
+ }
+
+}
+
Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import java.net.URL;
+import java.io.Serializable;
+import java.util.concurrent.Future;
+
+/**
+ * Defines the API for the Spark remote client.
+ */
+public interface SparkClient {
+
+ /**
+ * Submits a job for asynchronous execution.
+ *
+ * @param job The job to execute.
+ * @return A handle that be used to monitor the job.
+ */
+ <T extends Serializable> JobHandle<T> submit(Job<T> job);
+
+ /**
+ * Stops the remote context.
+ *
+ * Any pending jobs will be cancelled, and the remote context will be torn down.
+ */
+ void stop();
+
+ /**
+ * Adds a jar file to the running remote context.
+ *
+ * Note that the URL should be reachable by the Spark driver process. If running the driver
+ * in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist
+ * on that node (and not on the client machine).
+ *
+ * @param url The location of the jar file.
+ * @return A future that can be used to monitor the operation.
+ */
+ Future<?> addJar(URL url);
+
+ /**
+ * Adds a file to the running remote context.
+ *
+ * Note that the URL should be reachable by the Spark driver process. If running the driver
+ * in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist
+ * on that node (and not on the client machine).
+ *
+ * @param url The location of the file.
+ * @return A future that can be used to monitor the operation.
+ */
+ Future<?> addFile(URL url);
+
+}
Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import java.io.IOException;
+import java.util.Map;
+
+import akka.actor.ActorSystem;
+import com.google.common.collect.Maps;
+import org.apache.spark.SparkException;
+
+/**
+ * Factory for SparkClient instances.
+ */
+public final class SparkClientFactory {
+
+ static ActorSystem actorSystem = null;
+ static String akkaUrl = null;
+ static String secret = null;
+
+ private static boolean initialized = false;
+
+ /**
+ * Initializes the SparkClient library. Must be called before creating client instances.
+ *
+ * @param conf Map containing configuration parameters for the client.
+ */
+ public static synchronized void initialize(Map<String, String> conf) throws IOException {
+ secret = akka.util.Crypt.generateSecureCookie();
+
+ Map<String, String> akkaConf = Maps.newHashMap(conf);
+ akkaConf.put(ClientUtils.CONF_KEY_SECRET, secret);
+
+ ClientUtils.ActorSystemInfo info = ClientUtils.createActorSystem(akkaConf);
+ actorSystem = info.system;
+ akkaUrl = info.url;
+ initialized = true;
+ }
+
+ /** Stops the SparkClient library. */
+ public static synchronized void stop() {
+ if (initialized) {
+ actorSystem.shutdown();
+ actorSystem = null;
+ akkaUrl = null;
+ secret = null;
+ initialized = false;
+ }
+ }
+
+ /**
+ * Instantiates a new Spark client.
+ *
+ * @param conf Configuration for the remote Spark application.
+ */
+ public static synchronized SparkClient createClient(Map<String, String> conf)
+ throws IOException, SparkException {
+ if (!initialized) {
+ throw new IllegalStateException("Library is not initialized. Call initialize() first.");
+ }
+ return new SparkClientImpl(conf);
+ }
+
+}
Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.io.Writer;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.spark.SparkContext;
+import org.apache.spark.SparkException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class SparkClientImpl implements SparkClient {
+
+ private final static Logger LOG = LoggerFactory.getLogger(SparkClientImpl.class);
+
+ private final Map<String, String> conf;
+ private final AtomicInteger childIdGenerator;
+ private final String name;
+ private final ActorRef clientRef;
+ private final Thread driverThread;
+ private final Map<String, JobHandleImpl<?>> jobs;
+
+ private volatile ActorSelection remoteRef;
+
+ SparkClientImpl(Map<String, String> conf) throws IOException, SparkException {
+ this.conf = conf;
+ this.childIdGenerator = new AtomicInteger();
+ this.name = "SparkClient-" + ClientUtils.randomName();
+ this.clientRef = bind(Props.create(ClientActor.class, this), name);
+ this.jobs = Maps.newConcurrentMap();
+ this.driverThread = startDriver();
+
+ long connectTimeout = Integer.parseInt(
+ Optional.fromNullable(conf.get("spark.client.connectTimeout")).or("10")) * 1000;
+ long endTime = System.currentTimeMillis() + connectTimeout;
+
+ synchronized (this) {
+ while (remoteRef == null) {
+ try {
+ wait(connectTimeout);
+ } catch (InterruptedException ie) {
+ throw new SparkException("Interrupted.", ie);
+ }
+
+ connectTimeout = endTime - System.currentTimeMillis();
+ if (remoteRef == null && connectTimeout <= 0) {
+ throw new SparkException("Timed out waiting for remote driver to connect.");
+ }
+ }
+ }
+ }
+
+ @Override
+ public <T extends Serializable> JobHandle<T> submit(Job<T> job) {
+ String jobId = ClientUtils.randomName();
+ remoteRef.tell(new Protocol.JobRequest(jobId, job), clientRef);
+
+ JobHandleImpl<T> handle = new JobHandleImpl<T>(this, jobId);
+ jobs.put(jobId, handle);
+ return handle;
+ }
+
+ @Override
+ public void stop() {
+ if (remoteRef != null) {
+ LOG.info("Sending EndSession to remote actor.");
+ remoteRef.tell(new Protocol.EndSession(), clientRef);
+ }
+ unbind(clientRef);
+ try {
+ driverThread.join(); // TODO: timeout?
+ } catch (InterruptedException ie) {
+ LOG.debug("Interrupted before driver thread was finished.");
+ }
+ }
+
+ @Override
+ public Future<?> addJar(URL url) {
+ return submit(new AddJarJob(url.toString()));
+ }
+
+ @Override
+ public Future<?> addFile(URL url) {
+ return submit(new AddFileJob(url.toString()));
+ }
+
+ void cancel(String jobId) {
+ remoteRef.tell(new Protocol.CancelJob(jobId), clientRef);
+ }
+
+ private Thread startDriver() throws IOException {
+ Runnable runnable;
+ if (conf.containsKey(ClientUtils.CONF_KEY_IN_PROCESS)) {
+ // Mostly for testing things quickly. Do not do this in production.
+ LOG.warn("!!!! Running remote driver in-process. !!!!");
+ runnable = new Runnable() {
+ @Override
+ public void run() {
+ List<String> args = Lists.newArrayList();
+ args.add("--remote");
+ args.add(String.format("%s/%s", SparkClientFactory.akkaUrl, name));
+ args.add("--secret");
+ args.add(SparkClientFactory.secret);
+
+ for (Map.Entry<String, String> e : conf.entrySet()) {
+ args.add("--conf");
+ args.add(String.format("%s=%s", e.getKey(), e.getValue()));
+ }
+ try {
+ RemoteDriver.main(args.toArray(new String[args.size()]));
+ } catch (Exception e) {
+ LOG.error("Error running driver.", e);
+ }
+ }
+ };
+ } else {
+ // Create a file with all the job properties to be read by spark-submit. Change the
+ // file's permissions so that only the owner can read it. This avoid having the
+ // connection secret show up in the child process's command line.
+ File properties = File.createTempFile("spark-submit.", ".properties");
+ if (!properties.setReadable(false) || !properties.setReadable(true, true)) {
+ throw new IOException("Cannot change permissions of job properties file.");
+ }
+
+ Properties allProps = new Properties();
+ for (Map.Entry<String, String> e : conf.entrySet()) {
+ allProps.put(e.getKey(), e.getValue());
+ }
+ allProps.put(ClientUtils.CONF_KEY_SECRET, SparkClientFactory.secret);
+
+ Writer writer = new OutputStreamWriter(new FileOutputStream(properties), Charsets.UTF_8);
+ try {
+ allProps.store(writer, "Spark Context configuration");
+ } finally {
+ writer.close();
+ }
+
+ // Define how to pass options to the child process. If launching in client (or local)
+ // mode, the driver options need to be passed directly on the command line. Otherwise,
+ // SparkSubmit will take care of that for us.
+ String master = conf.get("spark.master");
+ Preconditions.checkArgument(master != null, "spark.master is not defined.");
+
+ List<String> argv = Lists.newArrayList();
+
+ // If a Spark installation is provided, use the spark-submit script. Otherwise, call the
+ // SparkSubmit class directly, which has some caveats (like having to provide a proper
+ // version of Guava on the classpath depending on the deploy mode).
+ if (conf.get("spark.home") != null) {
+ argv.add(new File(conf.get("spark.home"), "bin/spark-submit").getAbsolutePath());
+ } else {
+ LOG.info("No spark.home provided, calling SparkSubmit directly.");
+ argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath());
+
+ if (master.startsWith("local") || master.startsWith("mesos") || master.endsWith("-client")) {
+ String mem = conf.get("spark.driver.memory");
+ if (mem != null) {
+ argv.add("-Xms" + mem);
+ argv.add("-Xmx" + mem);
+ }
+
+ String cp = conf.get("spark.driver.extraClassPath");
+ if (cp != null) {
+ argv.add("-classpath");
+ argv.add(cp);
+ }
+
+ String libPath = conf.get("spark.driver.extraLibPath");
+ if (libPath != null) {
+ argv.add("-Djava.library.path=" + libPath);
+ }
+
+ String extra = conf.get("spark.driver.extraJavaOptions");
+ if (extra != null) {
+ for (String opt : extra.split("[ ]")) {
+ if (!opt.trim().isEmpty()) {
+ argv.add(opt.trim());
+ }
+ }
+ }
+ }
+
+ argv.add("org.apache.spark.deploy.SparkSubmit");
+ }
+
+
+ argv.add("--properties-file");
+ argv.add(properties.getAbsolutePath());
+ argv.add("--class");
+ argv.add(RemoteDriver.class.getName());
+
+ String jar = "spark-internal";
+ if (SparkContext.jarOfClass(this.getClass()).isDefined()) {
+ jar = SparkContext.jarOfClass(this.getClass()).get();
+ }
+ argv.add(jar);
+
+
+ argv.add("--remote");
+ argv.add(String.format("%s/%s", SparkClientFactory.akkaUrl, name));
+
+ LOG.debug("Running client driver with argv: {}", Joiner.on(" ").join(argv));
+
+ ProcessBuilder pb = new ProcessBuilder(argv.toArray(new String[argv.size()]));
+ pb.environment().clear();
+ final Process child = pb.start();
+
+ int childId = childIdGenerator.incrementAndGet();
+ redirect("stdout-redir-" + childId, child.getInputStream(), System.out);
+ redirect("stderr-redir-" + childId, child.getErrorStream(), System.err);
+
+ runnable = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ int exitCode = child.waitFor();
+ if (exitCode != 0) {
+ LOG.warn("Child process exited with code {}.", exitCode);
+ }
+ } catch (Exception e) {
+ LOG.warn("Exception while waiting for child process.", e);
+ }
+ }
+ };
+ }
+
+ Thread thread = new Thread(runnable);
+ thread.setDaemon(true);
+ thread.setName("Driver");
+ thread.start();
+ return thread;
+ }
+
+ private void redirect(String name, InputStream in, OutputStream out) {
+ Thread thread = new Thread(new Redirector(in, out));
+ thread.setName(name);
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ private ActorRef bind(Props props, String name) {
+ return SparkClientFactory.actorSystem.actorOf(props, name);
+ }
+
+ private void unbind(ActorRef actor) {
+ SparkClientFactory.actorSystem.stop(actor);
+ }
+
+ private ActorSelection select(String url) {
+ return SparkClientFactory.actorSystem.actorSelection(url);
+ }
+
+ private class ClientActor extends UntypedActor {
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+ if (message instanceof Protocol.Error) {
+ Protocol.Error e = (Protocol.Error) message;
+ LOG.error("Error report from remote driver.", e.cause);
+ } else if (message instanceof Protocol.Hello) {
+ Protocol.Hello hello = (Protocol.Hello) message;
+ LOG.info("Received hello from {}", hello.remoteUrl);
+ remoteRef = select(hello.remoteUrl);
+ synchronized (SparkClientImpl.this) {
+ SparkClientImpl.this.notifyAll();
+ }
+ } else if (message instanceof Protocol.JobMetrics) {
+ Protocol.JobMetrics jm = (Protocol.JobMetrics) message;
+ JobHandleImpl<?> handle = jobs.get(jm.jobId);
+ if (handle != null) {
+ handle.getMetrics().addMetrics(jm.sparkJobId, jm.stageId, jm.taskId, jm.metrics);
+ } else {
+ LOG.warn("Received metrics for unknown job {}", jm.jobId);
+ }
+ } else if (message instanceof Protocol.JobResult) {
+ Protocol.JobResult jr = (Protocol.JobResult) message;
+ JobHandleImpl<?> handle = jobs.remove(jr.id);
+ if (handle != null) {
+ LOG.info("Received result for {}", jr.id);
+ handle.complete(jr.result, jr.error);
+ } else {
+ LOG.warn("Received result for unknown job {}", jr.id);
+ }
+ }
+ }
+
+ }
+
+ private class Redirector implements Runnable {
+
+ private final InputStream in;
+ private final OutputStream out;
+
+ Redirector(InputStream in, OutputStream out) {
+ this.in = in;
+ this.out = out;
+ }
+
+ @Override
+ public void run() {
+ try {
+ byte[] buf = new byte[1024];
+ int len = in.read(buf);
+ while (len != -1) {
+ out.write(buf, 0, len);
+ out.flush();
+ len = in.read(buf);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error in redirector thread.", e);
+ }
+ }
+
+ }
+
+ private static class AddJarJob implements Job<Serializable> {
+
+ private final String path;
+
+ AddJarJob(String path) {
+ this.path = path;
+ }
+
+ @Override
+ public Serializable call(JobContext jc) throws Exception {
+ jc.sc().addJar(path);
+ return null;
+ }
+
+ }
+
+ private static class AddFileJob implements Job<Serializable> {
+
+ private final String path;
+
+ AddFileJob(String path) {
+ this.path = path;
+ }
+
+ @Override
+ public Serializable call(JobContext jc) throws Exception {
+ jc.sc().addFile(path);
+ return null;
+ }
+
+ }
+
+}
Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/DataReadMethod.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/DataReadMethod.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/DataReadMethod.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/DataReadMethod.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client.metrics;
+
+/**
+ * Method by which input data was read. Network means that the data was read over the network
+ * from a remote block manager (which may have stored the data on-disk or in-memory).
+ */
+public enum DataReadMethod {
+ Memory, Disk, Hadoop, Network, Multiple
+}
Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client.metrics;
+
+import java.io.Serializable;
+
+import org.apache.spark.executor.TaskMetrics;
+
+/**
+ * Metrics pertaining to reading input data.
+ */
+public class InputMetrics implements Serializable {
+
+ public final DataReadMethod readMethod;
+ public final long bytesRead;
+
+ public InputMetrics(
+ DataReadMethod readMethod,
+ long bytesRead) {
+ this.readMethod = readMethod;
+ this.bytesRead = bytesRead;
+ }
+
+ public InputMetrics(TaskMetrics metrics) {
+ this(DataReadMethod.valueOf(metrics.inputMetrics().get().readMethod().toString()),
+ metrics.inputMetrics().get().bytesRead());
+ }
+
+}
Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client.metrics;
+
+import java.io.Serializable;
+
+import com.google.common.base.Optional;
+import org.apache.spark.executor.TaskMetrics;
+
+/**
+ * Metrics tracked during the execution of a job.
+ *
+ * Depending on how the metrics object is obtained (by calling methods in the `MetricsCollection`
+ * class), metrics will refer to one or more tasks.
+ */
+public class Metrics implements Serializable {
+
+ /** Time taken on the executor to deserialize tasks. */
+ public final long executorDeserializeTime;
+ /** Time the executor spends actually running the task (including fetching shuffle data). */
+ public final long executorRunTime;
+ /** The number of bytes sent back to the driver by tasks. */
+ public final long resultSize;
+ /** Amount of time the JVM spent in garbage collection while executing tasks. */
+ public final long jvmGCTime;
+ /** Amount of time spent serializing task results. */
+ public final long resultSerializationTime;
+ /** The number of in-memory bytes spilled by tasks. */
+ public final long memoryBytesSpilled;
+ /** The number of on-disk bytes spilled by tasks. */
+ public final long diskBytesSpilled;
+ /** If tasks read from a HadoopRDD or from persisted data, metrics on how much data was read. */
+ public final Optional<InputMetrics> inputMetrics;
+ /**
+ * If tasks read from shuffle output, metrics on getting shuffle data. This includes read metrics
+ * aggregated over all the tasks' shuffle dependencies.
+ */
+ public final Optional<ShuffleReadMetrics> shuffleReadMetrics;
+ /** If tasks wrote to shuffle output, metrics on the written shuffle data. */
+ public final Optional<ShuffleWriteMetrics> shuffleWriteMetrics;
+
+ public Metrics(
+ long executorDeserializeTime,
+ long executorRunTime,
+ long resultSize,
+ long jvmGCTime,
+ long resultSerializationTime,
+ long memoryBytesSpilled,
+ long diskBytesSpilled,
+ Optional<InputMetrics> inputMetrics,
+ Optional<ShuffleReadMetrics> shuffleReadMetrics,
+ Optional<ShuffleWriteMetrics> shuffleWriteMetrics) {
+ this.executorDeserializeTime = executorDeserializeTime;
+ this.executorRunTime = executorRunTime;
+ this.resultSize = resultSize;
+ this.jvmGCTime = jvmGCTime;
+ this.resultSerializationTime = resultSerializationTime;
+ this.memoryBytesSpilled = memoryBytesSpilled;
+ this.diskBytesSpilled = diskBytesSpilled;
+ this.inputMetrics = inputMetrics;
+ this.shuffleReadMetrics = shuffleReadMetrics;
+ this.shuffleWriteMetrics = shuffleWriteMetrics;
+ }
+
+ public Metrics(TaskMetrics metrics) {
+ this(
+ metrics.executorDeserializeTime(),
+ metrics.executorRunTime(),
+ metrics.resultSize(),
+ metrics.jvmGCTime(),
+ metrics.resultSerializationTime(),
+ metrics.memoryBytesSpilled(),
+ metrics.diskBytesSpilled(),
+ optionalInputMetric(metrics),
+ optionalShuffleReadMetric(metrics),
+ optionalShuffleWriteMetrics(metrics));
+ }
+
+ private static final Optional<InputMetrics> optionalInputMetric(TaskMetrics metrics) {
+ return metrics.inputMetrics().isDefined()
+ ? Optional.of(new InputMetrics(metrics))
+ : Optional.<InputMetrics>absent();
+ }
+
+ private static final Optional<ShuffleReadMetrics>
+ optionalShuffleReadMetric(TaskMetrics metrics) {
+ return metrics.shuffleReadMetrics().isDefined()
+ ? Optional.of(new ShuffleReadMetrics(metrics))
+ : Optional.<ShuffleReadMetrics>absent();
+ }
+
+ private static final Optional<ShuffleWriteMetrics>
+ optionalShuffleWriteMetrics(TaskMetrics metrics) {
+ return metrics.shuffleWriteMetrics().isDefined()
+ ? Optional.of(new ShuffleWriteMetrics(metrics))
+ : Optional.<ShuffleWriteMetrics>absent();
+ }
+
+}