You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/07/05 03:06:53 UTC
[24/33] incubator-livy git commit: LIVY-375. Change Livy code package
name to org.apache.livy
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/org/apache/livy/repl/SparkSessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/SparkSessionSpec.scala b/repl/src/test/scala/org/apache/livy/repl/SparkSessionSpec.scala
new file mode 100644
index 0000000..9369519
--- /dev/null
+++ b/repl/src/test/scala/org/apache/livy/repl/SparkSessionSpec.scala
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.repl
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.apache.spark.SparkConf
+import org.json4s.Extraction
+import org.json4s.JsonAST.JValue
+import org.json4s.jackson.JsonMethods.parse
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.livy.rsc.RSCConf
+import org.apache.livy.rsc.driver.StatementState
+
+class SparkSessionSpec extends BaseSessionSpec {
+
+ override def createInterpreter(): Interpreter = new SparkInterpreter(new SparkConf())
+
+ it should "execute `1 + 2` == 3" in withSession { session =>
+ val statement = execute(session)("1 + 2")
+ statement.id should equal (0)
+
+ val result = parse(statement.output)
+ val expectedResult = Extraction.decompose(Map(
+ "status" -> "ok",
+ "execution_count" -> 0,
+ "data" -> Map(
+ "text/plain" -> "res0: Int = 3"
+ )
+ ))
+
+ result should equal (expectedResult)
+ }
+
+ it should "execute `x = 1`, then `y = 2`, then `x + y`" in withSession { session =>
+ val executeWithSession = execute(session)(_)
+ var statement = executeWithSession("val x = 1")
+ statement.id should equal (0)
+
+ var result = parse(statement.output)
+ var expectedResult = Extraction.decompose(Map(
+ "status" -> "ok",
+ "execution_count" -> 0,
+ "data" -> Map(
+ "text/plain" -> "x: Int = 1"
+ )
+ ))
+
+ result should equal (expectedResult)
+
+ statement = executeWithSession("val y = 2")
+ statement.id should equal (1)
+
+ result = parse(statement.output)
+ expectedResult = Extraction.decompose(Map(
+ "status" -> "ok",
+ "execution_count" -> 1,
+ "data" -> Map(
+ "text/plain" -> "y: Int = 2"
+ )
+ ))
+
+ result should equal (expectedResult)
+
+ statement = executeWithSession("x + y")
+ statement.id should equal (2)
+
+ result = parse(statement.output)
+ expectedResult = Extraction.decompose(Map(
+ "status" -> "ok",
+ "execution_count" -> 2,
+ "data" -> Map(
+ "text/plain" -> "res0: Int = 3"
+ )
+ ))
+
+ result should equal (expectedResult)
+ }
+
+ it should "capture stdout" in withSession { session =>
+ val statement = execute(session)("""println("Hello World")""")
+ statement.id should equal (0)
+
+ val result = parse(statement.output)
+ val expectedResult = Extraction.decompose(Map(
+ "status" -> "ok",
+ "execution_count" -> 0,
+ "data" -> Map(
+ "text/plain" -> "Hello World"
+ )
+ ))
+
+ result should equal (expectedResult)
+ }
+
+ it should "report an error if accessing an unknown variable" in withSession { session =>
+ val statement = execute(session)("""x""")
+ statement.id should equal (0)
+
+ val result = parse(statement.output)
+
+ def extract(key: String): String = (result \ key).extract[String]
+
+ extract("status") should equal ("error")
+ extract("execution_count") should equal ("0")
+ extract("ename") should equal ("Error")
+ extract("evalue") should include ("error: not found: value x")
+ }
+
+ it should "report an error if exception is thrown" in withSession { session =>
+ val statement = execute(session)(
+ """def func1() {
+ |throw new Exception()
+ |}
+ |func1()""".stripMargin)
+ statement.id should equal (0)
+
+ val result = parse(statement.output)
+ val resultMap = result.extract[Map[String, JValue]]
+
+ // Manually extract the values since the line numbers in the exception could change.
+ resultMap("status").extract[String] should equal ("error")
+ resultMap("execution_count").extract[Int] should equal (0)
+ resultMap("ename").extract[String] should equal ("Error")
+ resultMap("evalue").extract[String] should include ("java.lang.Exception")
+
+ val traceback = resultMap("traceback").extract[Seq[String]]
+ traceback(0) should include ("func1(<console>:")
+ }
+
+ it should "access the spark context" in withSession { session =>
+ val statement = execute(session)("""sc""")
+ statement.id should equal (0)
+
+ val result = parse(statement.output)
+ val resultMap = result.extract[Map[String, JValue]]
+
+ // Manually extract the values since the line numbers in the exception could change.
+ resultMap("status").extract[String] should equal ("ok")
+ resultMap("execution_count").extract[Int] should equal (0)
+
+ val data = resultMap("data").extract[Map[String, JValue]]
+ data("text/plain").extract[String] should include (
+ "res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext")
+ }
+
+ it should "execute spark commands" in withSession { session =>
+ val statement = execute(session)(
+ """sc.parallelize(0 to 1).map{i => i+1}.collect""".stripMargin)
+ statement.id should equal (0)
+
+ val result = parse(statement.output)
+
+ val expectedResult = Extraction.decompose(Map(
+ "status" -> "ok",
+ "execution_count" -> 0,
+ "data" -> Map(
+ "text/plain" -> "res0: Array[Int] = Array(1, 2)"
+ )
+ ))
+
+ result should equal (expectedResult)
+ }
+
+ it should "do table magic" in withSession { session =>
+ val statement = execute(session)("val x = List((1, \"a\"), (3, \"b\"))\n%table x")
+ statement.id should equal (0)
+
+ val result = parse(statement.output)
+
+ val expectedResult = Extraction.decompose(Map(
+ "status" -> "ok",
+ "execution_count" -> 0,
+ "data" -> Map(
+ "application/vnd.livy.table.v1+json" -> Map(
+ "headers" -> List(
+ Map("type" -> "BIGINT_TYPE", "name" -> "_1"),
+ Map("type" -> "STRING_TYPE", "name" -> "_2")),
+ "data" -> List(List(1, "a"), List(3, "b"))
+ )
+ )
+ ))
+
+ result should equal (expectedResult)
+ }
+
+ it should "cancel spark jobs" in withSession { session =>
+ val stmtId = session.execute(
+ """sc.parallelize(0 to 10).map { i => Thread.sleep(10000); i + 1 }.collect""".stripMargin)
+ eventually(timeout(30 seconds), interval(100 millis)) {
+ assert(session.statements(stmtId).state.get() == StatementState.Running)
+ }
+ session.cancel(stmtId)
+
+ eventually(timeout(30 seconds), interval(100 millis)) {
+ assert(session.statements(stmtId).state.get() == StatementState.Cancelled)
+ session.statements(stmtId).output should include (
+ "Job 0 cancelled part of cancelled job group 0")
+ }
+ }
+
+ it should "cancel waiting statement" in withSession { session =>
+ val stmtId1 = session.execute(
+ """sc.parallelize(0 to 10).map { i => Thread.sleep(10000); i + 1 }.collect""".stripMargin)
+ val stmtId2 = session.execute(
+ """sc.parallelize(0 to 10).map { i => Thread.sleep(10000); i + 1 }.collect""".stripMargin)
+ eventually(timeout(30 seconds), interval(100 millis)) {
+ assert(session.statements(stmtId1).state.get() == StatementState.Running)
+ }
+
+ assert(session.statements(stmtId2).state.get() == StatementState.Waiting)
+
+ session.cancel(stmtId2)
+ assert(session.statements(stmtId2).state.get() == StatementState.Cancelled)
+
+ session.cancel(stmtId1)
+ assert(session.statements(stmtId1).state.get() == StatementState.Cancelling)
+ eventually(timeout(30 seconds), interval(100 millis)) {
+ assert(session.statements(stmtId1).state.get() == StatementState.Cancelled)
+ session.statements(stmtId1).output should include (
+ "Job 0 cancelled part of cancelled job group 0")
+ }
+ }
+
+ it should "correctly calculate progress" in withSession { session =>
+ val executeCode =
+ """
+ |sc.parallelize(1 to 2, 2).map(i => (i, 1)).collect()
+ """.stripMargin
+
+ val stmtId = session.execute(executeCode)
+ eventually(timeout(30 seconds), interval(100 millis)) {
+ session.progressOfStatement(stmtId) should be(1.0)
+ }
+ }
+
+ it should "not generate Spark jobs for plain Scala code" in withSession { session =>
+ val executeCode = """1 + 1"""
+
+ val stmtId = session.execute(executeCode)
+ session.progressOfStatement(stmtId) should be (0.0)
+ }
+
+ it should "handle multiple jobs in one statement" in withSession { session =>
+ val executeCode =
+ """
+ |sc.parallelize(1 to 2, 2).map(i => (i, 1)).collect()
+ |sc.parallelize(1 to 2, 2).map(i => (i, 1)).collect()
+ """.stripMargin
+
+ val stmtId = session.execute(executeCode)
+ eventually(timeout(30 seconds), interval(100 millis)) {
+ session.progressOfStatement(stmtId) should be(1.0)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/pom.xml
----------------------------------------------------------------------
diff --git a/rsc/pom.xml b/rsc/pom.xml
index 7ff8c2a..660ca35 100644
--- a/rsc/pom.xml
+++ b/rsc/pom.xml
@@ -19,14 +19,14 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
- <groupId>com.cloudera.livy</groupId>
+ <groupId>org.apache.livy</groupId>
<artifactId>livy-main</artifactId>
- <version>0.4.0-SNAPSHOT</version>
+ <version>0.4.0-incubating-SNAPSHOT</version>
</parent>
- <groupId>com.cloudera.livy</groupId>
+ <groupId>org.apache.livy</groupId>
<artifactId>livy-rsc</artifactId>
- <version>0.4.0-SNAPSHOT</version>
+ <version>0.4.0-incubating-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
@@ -35,17 +35,17 @@
<dependencies>
<dependency>
- <groupId>com.cloudera.livy</groupId>
+ <groupId>org.apache.livy</groupId>
<artifactId>livy-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>com.cloudera.livy</groupId>
+ <groupId>org.apache.livy</groupId>
<artifactId>livy-client-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>com.cloudera.livy</groupId>
+ <groupId>org.apache.livy</groupId>
<artifactId>livy-test-lib</artifactId>
<version>${project.version}</version>
<scope>test</scope>
@@ -133,7 +133,7 @@
<shadedArtifactAttached>false</shadedArtifactAttached>
<artifactSet>
<includes>
- <include>com.cloudera.livy:livy-client-common</include>
+ <include>org.apache.livy:livy-client-common</include>
<include>com.esotericsoftware.kryo:kryo</include>
</includes>
</artifactSet>
@@ -150,7 +150,7 @@
<relocations>
<relocation>
<pattern>com.esotericsoftware</pattern>
- <shadedPattern>com.cloudera.livy.shaded.kryo</shadedPattern>
+ <shadedPattern>org.apache.livy.shaded.kryo</shadedPattern>
</relocation>
</relocations>
<outputFile>${project.build.directory}/jars/${project.artifactId}-${project.version}.jar</outputFile>
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/BaseProtocol.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/BaseProtocol.java b/rsc/src/main/java/com/cloudera/livy/rsc/BaseProtocol.java
deleted file mode 100644
index 419c1f7..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/BaseProtocol.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.rsc.driver.Statement;
-import com.cloudera.livy.rsc.rpc.RpcDispatcher;
-
-public abstract class BaseProtocol extends RpcDispatcher {
-
- protected static class CancelJob {
-
- public final String id;
-
- CancelJob(String id) {
- this.id = id;
- }
-
- CancelJob() {
- this(null);
- }
-
- }
-
- protected static class EndSession {
-
- }
-
- protected static class Error {
-
- public final String cause;
-
- public Error(Throwable cause) {
- if (cause == null) {
- this.cause = "";
- } else {
- this.cause = Utils.stackTraceAsString(cause);
- }
- }
-
- public Error() {
- this(null);
- }
-
- }
-
- public static class BypassJobRequest {
-
- public final String id;
- public final byte[] serializedJob;
- public final boolean synchronous;
-
- public BypassJobRequest(String id, byte[] serializedJob, boolean synchronous) {
- this.id = id;
- this.serializedJob = serializedJob;
- this.synchronous = synchronous;
- }
-
- public BypassJobRequest() {
- this(null, null, false);
- }
-
- }
-
- protected static class GetBypassJobStatus {
-
- public final String id;
-
- public GetBypassJobStatus(String id) {
- this.id = id;
- }
-
- public GetBypassJobStatus() {
- this(null);
- }
-
- }
-
- protected static class JobRequest<T> {
-
- public final String id;
- public final Job<T> job;
-
- public JobRequest(String id, Job<T> job) {
- this.id = id;
- this.job = job;
- }
-
- public JobRequest() {
- this(null, null);
- }
-
- }
-
- protected static class JobResult<T> {
-
- public final String id;
- public final T result;
- public final String error;
-
- public JobResult(String id, T result, Throwable error) {
- this.id = id;
- this.result = result;
- this.error = error != null ? Utils.stackTraceAsString(error) : null;
- }
-
- public JobResult() {
- this(null, null, null);
- }
-
- }
-
- protected static class JobStarted {
-
- public final String id;
-
- public JobStarted(String id) {
- this.id = id;
- }
-
- public JobStarted() {
- this(null);
- }
-
- }
-
- protected static class SyncJobRequest<T> {
-
- public final Job<T> job;
-
- public SyncJobRequest(Job<T> job) {
- this.job = job;
- }
-
- public SyncJobRequest() {
- this(null);
- }
-
- }
-
- public static class RemoteDriverAddress {
-
- public final String host;
- public final int port;
-
- public RemoteDriverAddress(String host, int port) {
- this.host = host;
- this.port = port;
- }
-
- public RemoteDriverAddress() {
- this(null, -1);
- }
-
- }
-
- public static class ReplJobRequest {
-
- public final String code;
-
- public ReplJobRequest(String code) {
- this.code = code;
- }
-
- public ReplJobRequest() {
- this(null);
- }
- }
-
- public static class GetReplJobResults {
- public boolean allResults;
- public Integer from, size;
-
- public GetReplJobResults(Integer from, Integer size) {
- this.allResults = false;
- this.from = from;
- this.size = size;
- }
-
- public GetReplJobResults() {
- this.allResults = true;
- from = null;
- size = null;
- }
- }
-
- protected static class ReplState {
-
- public final String state;
-
- public ReplState(String state) {
- this.state = state;
- }
-
- public ReplState() {
- this(null);
- }
- }
-
- public static class CancelReplJobRequest {
- public final int id;
-
- public CancelReplJobRequest(int id) {
- this.id = id;
- }
-
- public CancelReplJobRequest() {
- this(-1);
- }
- }
-
- public static class InitializationError {
-
- public final String stackTrace;
-
- public InitializationError(String stackTrace) {
- this.stackTrace = stackTrace;
- }
-
- public InitializationError() {
- this(null);
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/BypassJobStatus.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/BypassJobStatus.java b/rsc/src/main/java/com/cloudera/livy/rsc/BypassJobStatus.java
deleted file mode 100644
index 7f35cc7..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/BypassJobStatus.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc;
-
-import java.util.List;
-
-import com.cloudera.livy.JobHandle;
-
-public class BypassJobStatus {
-
- public final JobHandle.State state;
- public final byte[] result;
- public final String error;
-
- public BypassJobStatus(JobHandle.State state, byte[] result, String error) {
- this.state = state;
- this.result = result;
- this.error = error;
- }
-
- BypassJobStatus() {
- this(null, null, null);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/ContextInfo.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/ContextInfo.java b/rsc/src/main/java/com/cloudera/livy/rsc/ContextInfo.java
deleted file mode 100644
index 5e930cd..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/ContextInfo.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc;
-
-/**
- * Information about a running RSC instance.
- */
-class ContextInfo {
-
- final String remoteAddress;
- final int remotePort;
- final String clientId;
- final String secret;
-
- ContextInfo(String remoteAddress, int remotePort, String clientId, String secret) {
- this.remoteAddress = remoteAddress;
- this.remotePort = remotePort;
- this.clientId = clientId;
- this.secret = secret;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java b/rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java
deleted file mode 100644
index 9a5e447..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java
+++ /dev/null
@@ -1,456 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.Reader;
-import java.io.Writer;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.nio.file.attribute.PosixFilePermission.*;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.util.concurrent.Promise;
-import org.apache.spark.launcher.SparkLauncher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.livy.client.common.TestUtils;
-import com.cloudera.livy.rsc.driver.RSCDriverBootstrapper;
-import com.cloudera.livy.rsc.rpc.Rpc;
-import com.cloudera.livy.rsc.rpc.RpcDispatcher;
-import com.cloudera.livy.rsc.rpc.RpcServer;
-import static com.cloudera.livy.rsc.RSCConf.Entry.*;
-
-/**
- * Encapsulates code needed to launch a new Spark context and collect information about how
- * to establish a client connection to it.
- */
-class ContextLauncher {
-
- private static final Logger LOG = LoggerFactory.getLogger(ContextLauncher.class);
- private static final AtomicInteger CHILD_IDS = new AtomicInteger();
-
- private static final String SPARK_DEPLOY_MODE = "spark.submit.deployMode";
- private static final String SPARK_JARS_KEY = "spark.jars";
- private static final String SPARK_ARCHIVES_KEY = "spark.yarn.dist.archives";
- private static final String SPARK_HOME_ENV = "SPARK_HOME";
-
- static DriverProcessInfo create(RSCClientFactory factory, RSCConf conf)
- throws IOException {
- ContextLauncher launcher = new ContextLauncher(factory, conf);
- return new DriverProcessInfo(launcher.promise, launcher.child.child);
- }
-
- private final Promise<ContextInfo> promise;
- private final ScheduledFuture<?> timeout;
- private final String clientId;
- private final String secret;
- private final ChildProcess child;
- private final RSCConf conf;
- private final RSCClientFactory factory;
-
- private ContextLauncher(RSCClientFactory factory, RSCConf conf) throws IOException {
- this.promise = factory.getServer().getEventLoopGroup().next().newPromise();
- this.clientId = UUID.randomUUID().toString();
- this.secret = factory.getServer().createSecret();
- this.conf = conf;
- this.factory = factory;
-
- final RegistrationHandler handler = new RegistrationHandler();
- try {
- factory.getServer().registerClient(clientId, secret, handler);
- String replMode = conf.get("repl");
- boolean repl = replMode != null && replMode.equals("true");
-
- conf.set(LAUNCHER_ADDRESS, factory.getServer().getAddress());
- conf.set(LAUNCHER_PORT, factory.getServer().getPort());
- conf.set(CLIENT_ID, clientId);
- conf.set(CLIENT_SECRET, secret);
-
- Utils.addListener(promise, new FutureListener<ContextInfo>() {
- @Override
- public void onFailure(Throwable error) throws Exception {
- // If promise is cancelled or failed, make sure spark-submit is not leaked.
- if (child != null) {
- child.kill();
- }
- }
- });
-
- this.child = startDriver(conf, promise);
-
- // Set up a timeout to fail the promise if we don't hear back from the context
- // after a configurable timeout.
- Runnable timeoutTask = new Runnable() {
- @Override
- public void run() {
- connectTimeout(handler);
- }
- };
- this.timeout = factory.getServer().getEventLoopGroup().schedule(timeoutTask,
- conf.getTimeAsMs(RPC_CLIENT_HANDSHAKE_TIMEOUT), TimeUnit.MILLISECONDS);
- } catch (Exception e) {
- dispose(true);
- throw Utils.propagate(e);
- }
- }
-
- private void connectTimeout(RegistrationHandler handler) {
- if (promise.tryFailure(new TimeoutException("Timed out waiting for context to start."))) {
- handler.dispose();
- }
- dispose(true);
- }
-
- private void dispose(boolean forceKill) {
- factory.getServer().unregisterClient(clientId);
- try {
- if (child != null) {
- if (forceKill) {
- child.kill();
- } else {
- child.detach();
- }
- }
- } finally {
- factory.unref();
- }
- }
-
- private static ChildProcess startDriver(final RSCConf conf, Promise<?> promise)
- throws IOException {
- String livyJars = conf.get(LIVY_JARS);
- if (livyJars == null) {
- String livyHome = System.getenv("LIVY_HOME");
- Utils.checkState(livyHome != null,
- "Need one of LIVY_HOME or %s set.", LIVY_JARS.key());
- File rscJars = new File(livyHome, "rsc-jars");
- if (!rscJars.isDirectory()) {
- rscJars = new File(livyHome, "rsc/target/jars");
- }
- Utils.checkState(rscJars.isDirectory(),
- "Cannot find 'client-jars' directory under LIVY_HOME.");
- List<String> jars = new ArrayList<>();
- for (File f : rscJars.listFiles()) {
- jars.add(f.getAbsolutePath());
- }
- livyJars = Utils.join(jars, ",");
- }
- merge(conf, SPARK_JARS_KEY, livyJars, ",");
-
- String kind = conf.get(SESSION_KIND);
- if ("sparkr".equals(kind)) {
- merge(conf, SPARK_ARCHIVES_KEY, conf.get(RSCConf.Entry.SPARKR_PACKAGE), ",");
- } else if ("pyspark".equals(kind)) {
- merge(conf, "spark.submit.pyFiles", conf.get(RSCConf.Entry.PYSPARK_ARCHIVES), ",");
- }
-
- // Disable multiple attempts since the RPC server doesn't yet support multiple
- // connections for the same registered app.
- conf.set("spark.yarn.maxAppAttempts", "1");
-
- // Let the launcher go away when launcher in yarn cluster mode. This avoids keeping lots
- // of "small" Java processes lingering on the Livy server node.
- conf.set("spark.yarn.submit.waitAppCompletion", "false");
-
- if (!conf.getBoolean(CLIENT_IN_PROCESS) &&
- // For tests which doesn't shutdown RscDriver gracefully, JaCoCo exec isn't dumped properly.
- // Disable JaCoCo for this case.
- !conf.getBoolean(TEST_STUCK_END_SESSION)) {
- // For testing; propagate jacoco settings so that we also do coverage analysis
- // on the launched driver. We replace the name of the main file ("main.exec")
- // so that we don't end up fighting with the main test launcher.
- String jacocoArgs = TestUtils.getJacocoArgs();
- if (jacocoArgs != null) {
- merge(conf, SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, jacocoArgs, " ");
- }
- }
-
- final File confFile = writeConfToFile(conf);
-
- if (ContextLauncher.mockSparkSubmit != null) {
- LOG.warn("!!!! Using mock spark-submit. !!!!");
- return new ChildProcess(conf, promise, ContextLauncher.mockSparkSubmit, confFile);
- } else if (conf.getBoolean(CLIENT_IN_PROCESS)) {
- // Mostly for testing things quickly. Do not do this in production.
- LOG.warn("!!!! Running remote driver in-process. !!!!");
- Runnable child = new Runnable() {
- @Override
- public void run() {
- try {
- RSCDriverBootstrapper.main(new String[] { confFile.getAbsolutePath() });
- } catch (Exception e) {
- throw Utils.propagate(e);
- }
- }
- };
- return new ChildProcess(conf, promise, child, confFile);
- } else {
- final SparkLauncher launcher = new SparkLauncher();
-
- // Spark 1.x does not support specifying deploy mode in conf and needs special handling.
- String deployMode = conf.get(SPARK_DEPLOY_MODE);
- if (deployMode != null) {
- launcher.setDeployMode(deployMode);
- }
-
- launcher.setSparkHome(System.getenv(SPARK_HOME_ENV));
- launcher.setAppResource("spark-internal");
- launcher.setPropertiesFile(confFile.getAbsolutePath());
- launcher.setMainClass(RSCDriverBootstrapper.class.getName());
-
- if (conf.get(PROXY_USER) != null) {
- launcher.addSparkArg("--proxy-user", conf.get(PROXY_USER));
- }
-
- return new ChildProcess(conf, promise, launcher.launch(), confFile);
- }
- }
-
- private static void merge(RSCConf conf, String key, String livyConf, String sep) {
- String confValue = Utils.join(Arrays.asList(livyConf, conf.get(key)), sep);
- conf.set(key, confValue);
- }
-
- /**
- * Write the configuration to a file readable only by the process's owner. Livy properties
- * are written with an added prefix so that they can be loaded using SparkConf on the driver
- * side.
- *
- * The default Spark configuration (from either SPARK_HOME or SPARK_CONF_DIR) is merged into
- * the user configuration, so that defaults set by Livy's admin take effect when not overridden
- * by the user.
- */
- private static File writeConfToFile(RSCConf conf) throws IOException {
- Properties confView = new Properties();
- for (Map.Entry<String, String> e : conf) {
- String key = e.getKey();
- if (!key.startsWith(RSCConf.SPARK_CONF_PREFIX)) {
- key = RSCConf.LIVY_SPARK_PREFIX + key;
- }
- confView.setProperty(key, e.getValue());
- }
-
- // Load the default Spark configuration.
- String confDir = System.getenv("SPARK_CONF_DIR");
- if (confDir == null && System.getenv(SPARK_HOME_ENV) != null) {
- confDir = System.getenv(SPARK_HOME_ENV) + File.separator + "conf";
- }
-
- if (confDir != null) {
- File sparkDefaults = new File(confDir + File.separator + "spark-defaults.conf");
- if (sparkDefaults.isFile()) {
- Properties sparkConf = new Properties();
- Reader r = new InputStreamReader(new FileInputStream(sparkDefaults), UTF_8);
- try {
- sparkConf.load(r);
- } finally {
- r.close();
- }
-
- for (String key : sparkConf.stringPropertyNames()) {
- if (!confView.containsKey(key)) {
- confView.put(key, sparkConf.getProperty(key));
- }
- }
- }
- }
-
- File file = File.createTempFile("livyConf", ".properties");
- Files.setPosixFilePermissions(file.toPath(), EnumSet.of(OWNER_READ, OWNER_WRITE));
- //file.deleteOnExit();
-
- Writer writer = new OutputStreamWriter(new FileOutputStream(file), UTF_8);
- try {
- confView.store(writer, "Livy App Context Configuration");
- } finally {
- writer.close();
- }
-
- return file;
- }
-
-
- private class RegistrationHandler extends BaseProtocol
- implements RpcServer.ClientCallback {
-
- volatile RemoteDriverAddress driverAddress;
-
- private Rpc client;
-
- @Override
- public RpcDispatcher onNewClient(Rpc client) {
- LOG.debug("New RPC client connected from {}.", client.getChannel());
- this.client = client;
- return this;
- }
-
- @Override
- public void onSaslComplete(Rpc client) {
- }
-
- void dispose() {
- if (client != null) {
- client.close();
- }
- }
-
- private void handle(ChannelHandlerContext ctx, RemoteDriverAddress msg) {
- ContextInfo info = new ContextInfo(msg.host, msg.port, clientId, secret);
- if (promise.trySuccess(info)) {
- timeout.cancel(true);
- LOG.debug("Received driver info for client {}: {}/{}.", client.getChannel(),
- msg.host, msg.port);
- } else {
- LOG.warn("Connection established but promise is already finalized.");
- }
-
- ctx.executor().submit(new Runnable() {
- @Override
- public void run() {
- dispose();
- ContextLauncher.this.dispose(false);
- }
- });
- }
-
- }
-
- private static class ChildProcess {
-
- private final RSCConf conf;
- private final Promise<?> promise;
- private final Process child;
- private final Thread monitor;
- private final File confFile;
-
- public ChildProcess(RSCConf conf, Promise<?> promise, Runnable child, File confFile) {
- this.conf = conf;
- this.promise = promise;
- this.monitor = monitor(child, CHILD_IDS.incrementAndGet());
- this.child = null;
- this.confFile = confFile;
- }
-
- public ChildProcess(RSCConf conf, Promise<?> promise, final Process childProc, File confFile) {
- int childId = CHILD_IDS.incrementAndGet();
- this.conf = conf;
- this.promise = promise;
- this.child = childProc;
- this.confFile = confFile;
-
- Runnable monitorTask = new Runnable() {
- @Override
- public void run() {
- try {
- int exitCode = child.waitFor();
- if (exitCode != 0) {
- LOG.warn("Child process exited with code {}.", exitCode);
- fail(new IOException(String.format("Child process exited with code %d.", exitCode)));
- }
- } catch (InterruptedException ie) {
- LOG.warn("Waiting thread interrupted, killing child process.");
- Thread.interrupted();
- child.destroy();
- } catch (Exception e) {
- LOG.warn("Exception while waiting for child process.", e);
- }
- }
- };
- this.monitor = monitor(monitorTask, childId);
- }
-
- private void fail(Throwable error) {
- promise.tryFailure(error);
- }
-
- public void kill() {
- if (child != null) {
- child.destroy();
- }
- monitor.interrupt();
- detach();
-
- if (!monitor.isAlive()) {
- return;
- }
-
- // Last ditch effort.
- if (monitor.isAlive()) {
- LOG.warn("Timed out shutting down remote driver, interrupting...");
- monitor.interrupt();
- }
- }
-
- public void detach() {
- try {
- monitor.join(conf.getTimeAsMs(CLIENT_SHUTDOWN_TIMEOUT));
- } catch (InterruptedException ie) {
- LOG.debug("Interrupted before driver thread was finished.");
- }
- }
-
- private Thread monitor(final Runnable task, int childId) {
- Runnable wrappedTask = new Runnable() {
- @Override
- public void run() {
- try {
- task.run();
- } finally {
- confFile.delete();
- }
- }
- };
- Thread thread = new Thread(wrappedTask);
- thread.setDaemon(true);
- thread.setName("ContextLauncher-" + childId);
- thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- LOG.warn("Child task threw exception.", e);
- fail(e);
- }
- });
- thread.start();
- return thread;
- }
- }
-
- // Just for testing.
- static Process mockSparkSubmit;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/DriverProcessInfo.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/DriverProcessInfo.java b/rsc/src/main/java/com/cloudera/livy/rsc/DriverProcessInfo.java
deleted file mode 100644
index a224fd6..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/DriverProcessInfo.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc;
-
-import io.netty.util.concurrent.Promise;
-
-/**
- * Information about driver process and @{@link ContextInfo}
- */
-public class DriverProcessInfo {
-
- private Promise<ContextInfo> contextInfo;
- private transient Process driverProcess;
-
- public DriverProcessInfo(Promise<ContextInfo> contextInfo, Process driverProcess) {
- this.contextInfo = contextInfo;
- this.driverProcess = driverProcess;
- }
-
- public Promise<ContextInfo> getContextInfo() {
- return contextInfo;
- }
-
- public Process getDriverProcess() {
- return driverProcess;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/FutureListener.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/FutureListener.java b/rsc/src/main/java/com/cloudera/livy/rsc/FutureListener.java
deleted file mode 100644
index 2109c81..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/FutureListener.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc;
-
-/** A simplified future listener for netty futures. See Utils.addListener(). */
-public abstract class FutureListener<T> {
-
- public void onSuccess(T result) throws Exception { }
-
- public void onFailure(Throwable error) throws Exception { }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/JobHandleImpl.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/JobHandleImpl.java b/rsc/src/main/java/com/cloudera/livy/rsc/JobHandleImpl.java
deleted file mode 100644
index 9146425..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/JobHandleImpl.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc;
-
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import io.netty.util.concurrent.Promise;
-
-import com.cloudera.livy.JobHandle;
-import com.cloudera.livy.client.common.AbstractJobHandle;
-
-/**
- * A handle to a submitted job. Allows for monitoring and controlling of the running remote job.
- */
-class JobHandleImpl<T> extends AbstractJobHandle<T> {
-
- private final RSCClient client;
- private final String jobId;
- private final Promise<T> promise;
- private volatile State state;
-
- JobHandleImpl(RSCClient client, Promise<T> promise, String jobId) {
- this.client = client;
- this.jobId = jobId;
- this.promise = promise;
- }
-
- /** Requests a running job to be cancelled. */
- @Override
- public boolean cancel(boolean mayInterrupt) {
- if (changeState(State.CANCELLED)) {
- client.cancel(jobId);
- promise.cancel(mayInterrupt);
- return true;
- }
- return false;
- }
-
- @Override
- public T get() throws ExecutionException, InterruptedException {
- return promise.get();
- }
-
- @Override
- public T get(long timeout, TimeUnit unit)
- throws ExecutionException, InterruptedException, TimeoutException {
- return promise.get(timeout, unit);
- }
-
- @Override
- public boolean isCancelled() {
- return promise.isCancelled();
- }
-
- @Override
- public boolean isDone() {
- return promise.isDone();
- }
-
- @Override
- protected T result() {
- return promise.getNow();
- }
-
- @Override
- protected Throwable error() {
- return promise.cause();
- }
-
- @SuppressWarnings("unchecked")
- void setSuccess(Object result) {
- // The synchronization here is not necessary, but tests depend on it.
- synchronized (listeners) {
- promise.setSuccess((T) result);
- changeState(State.SUCCEEDED);
- }
- }
-
- void setFailure(Throwable error) {
- // The synchronization here is not necessary, but tests depend on it.
- synchronized (listeners) {
- promise.setFailure(error);
- changeState(State.FAILED);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/PingJob.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/PingJob.java b/rsc/src/main/java/com/cloudera/livy/rsc/PingJob.java
deleted file mode 100644
index 088763a..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/PingJob.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobContext;
-
-/** A job that can be used to check for liveness of the remote context. */
-public class PingJob implements Job<Void> {
-
- @Override
- public Void call(JobContext jc) {
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java b/rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java
deleted file mode 100644
index 11cb0f6..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java
+++ /dev/null
@@ -1,409 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.ImmediateEventExecutor;
-import io.netty.util.concurrent.Promise;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobHandle;
-import com.cloudera.livy.LivyClient;
-import com.cloudera.livy.client.common.BufferUtils;
-import com.cloudera.livy.rsc.driver.AddFileJob;
-import com.cloudera.livy.rsc.driver.AddJarJob;
-import com.cloudera.livy.rsc.rpc.Rpc;
-import static com.cloudera.livy.rsc.RSCConf.Entry.*;
-
-public class RSCClient implements LivyClient {
- private static final Logger LOG = LoggerFactory.getLogger(RSCClient.class);
- private static final AtomicInteger EXECUTOR_GROUP_ID = new AtomicInteger();
-
- private final RSCConf conf;
- private final Promise<ContextInfo> contextInfoPromise;
- private final Map<String, JobHandleImpl<?>> jobs;
- private final ClientProtocol protocol;
- private final Promise<Rpc> driverRpc;
- private final int executorGroupId;
- private final EventLoopGroup eventLoopGroup;
- private final Promise<URI> serverUriPromise;
-
- private ContextInfo contextInfo;
- private Process driverProcess;
- private volatile boolean isAlive;
- private volatile String replState;
-
- RSCClient(RSCConf conf, Promise<ContextInfo> ctx, Process driverProcess) throws IOException {
- this.conf = conf;
- this.contextInfoPromise = ctx;
- this.driverProcess = driverProcess;
- this.jobs = new ConcurrentHashMap<>();
- this.protocol = new ClientProtocol();
- this.driverRpc = ImmediateEventExecutor.INSTANCE.newPromise();
- this.executorGroupId = EXECUTOR_GROUP_ID.incrementAndGet();
- this.eventLoopGroup = new NioEventLoopGroup(
- conf.getInt(RPC_MAX_THREADS),
- Utils.newDaemonThreadFactory("RSCClient-" + executorGroupId + "-%d"));
- this.serverUriPromise = ImmediateEventExecutor.INSTANCE.newPromise();
-
- Utils.addListener(this.contextInfoPromise, new FutureListener<ContextInfo>() {
- @Override
- public void onSuccess(ContextInfo info) throws Exception {
- connectToContext(info);
- String url = String.format("rsc://%s:%s@%s:%d",
- info.clientId, info.secret, info.remoteAddress, info.remotePort);
- serverUriPromise.setSuccess(URI.create(url));
- }
-
- @Override
- public void onFailure(Throwable error) {
- connectionError(error);
- serverUriPromise.setFailure(error);
- }
- });
-
- isAlive = true;
- }
-
- public boolean isAlive() {
- return isAlive;
- }
-
- public Process getDriverProcess() {
- return driverProcess;
- }
-
- private synchronized void connectToContext(final ContextInfo info) throws Exception {
- this.contextInfo = info;
-
- try {
- Promise<Rpc> promise = Rpc.createClient(conf,
- eventLoopGroup,
- info.remoteAddress,
- info.remotePort,
- info.clientId,
- info.secret,
- protocol);
- Utils.addListener(promise, new FutureListener<Rpc>() {
- @Override
- public void onSuccess(Rpc rpc) throws Exception {
- driverRpc.setSuccess(rpc);
- Utils.addListener(rpc.getChannel().closeFuture(), new FutureListener<Void>() {
- @Override
- public void onSuccess(Void unused) {
- if (isAlive) {
- LOG.warn("Client RPC channel closed unexpectedly.");
- try {
- stop(false);
- } catch (Exception e) { /* stop() itself prints warning. */ }
- }
- }
- });
- LOG.debug("Connected to context {} ({}, {}).", info.clientId,
- rpc.getChannel(), executorGroupId);
- }
-
- @Override
- public void onFailure(Throwable error) throws Exception {
- driverRpc.setFailure(error);
- connectionError(error);
- }
- });
- } catch (Exception e) {
- connectionError(e);
- }
- }
-
- private void connectionError(Throwable error) {
- LOG.error("Failed to connect to context.", error);
- try {
- stop(false);
- } catch (Exception e) { /* stop() itself prints warning. */ }
- }
-
- private <T> io.netty.util.concurrent.Future<T> deferredCall(final Object msg,
- final Class<T> retType) {
- if (driverRpc.isSuccess()) {
- try {
- return driverRpc.get().call(msg, retType);
- } catch (Exception ie) {
- throw Utils.propagate(ie);
- }
- }
-
- // No driver RPC yet, so install a listener and return a promise that will be ready when
- // the driver is up and the message is actually delivered.
- final Promise<T> promise = eventLoopGroup.next().newPromise();
- final FutureListener<T> callListener = new FutureListener<T>() {
- @Override
- public void onSuccess(T value) throws Exception {
- promise.setSuccess(value);
- }
-
- @Override
- public void onFailure(Throwable error) throws Exception {
- promise.setFailure(error);
- }
- };
-
- Utils.addListener(driverRpc, new FutureListener<Rpc>() {
- @Override
- public void onSuccess(Rpc rpc) throws Exception {
- Utils.addListener(rpc.call(msg, retType), callListener);
- }
-
- @Override
- public void onFailure(Throwable error) throws Exception {
- promise.setFailure(error);
- }
- });
- return promise;
- }
-
- public Future<URI> getServerUri() {
- return serverUriPromise;
- }
-
- @Override
- public <T> JobHandle<T> submit(Job<T> job) {
- return protocol.submit(job);
- }
-
- @Override
- public <T> Future<T> run(Job<T> job) {
- return protocol.run(job);
- }
-
- @Override
- public synchronized void stop(boolean shutdownContext) {
- if (isAlive) {
- isAlive = false;
- try {
- this.contextInfoPromise.cancel(true);
-
- if (shutdownContext && driverRpc.isSuccess()) {
- protocol.endSession();
-
- // Because the remote context won't really reply to the end session message -
- // since it closes the channel while handling it, we wait for the RPC's channel
- // to close instead.
- long stopTimeout = conf.getTimeAsMs(CLIENT_SHUTDOWN_TIMEOUT);
- driverRpc.get().getChannel().closeFuture().get(stopTimeout,
- TimeUnit.MILLISECONDS);
- }
- } catch (Exception e) {
- LOG.warn("Exception while waiting for end session reply.", e);
- Utils.propagate(e);
- } finally {
- if (driverRpc.isSuccess()) {
- try {
- driverRpc.get().close();
- } catch (Exception e) {
- LOG.warn("Error stopping RPC.", e);
- }
- }
-
- // Report failure for all pending jobs, so that clients can react.
- for (Map.Entry<String, JobHandleImpl<?>> e : jobs.entrySet()) {
- LOG.info("Failing pending job {} due to shutdown.", e.getKey());
- e.getValue().setFailure(new IOException("RSCClient instance stopped."));
- }
-
- eventLoopGroup.shutdownGracefully();
- }
- if (contextInfo != null) {
- LOG.debug("Disconnected from context {}, shutdown = {}.", contextInfo.clientId,
- shutdownContext);
- }
- }
- }
-
- @Override
- public Future<?> uploadJar(File jar) {
- throw new UnsupportedOperationException("Use addJar to add the jar to the remote context!");
- }
-
- @Override
- public Future<?> addJar(URI uri) {
- return submit(new AddJarJob(uri.toString()));
- }
-
- @Override
- public Future<?> uploadFile(File file) {
- throw new UnsupportedOperationException("Use addFile to add the file to the remote context!");
- }
-
- @Override
- public Future<?> addFile(URI uri) {
- return submit(new AddFileJob(uri.toString()));
- }
-
- public String bypass(ByteBuffer serializedJob, boolean sync) {
- return protocol.bypass(serializedJob, sync);
- }
-
- public Future<BypassJobStatus> getBypassJobStatus(String id) {
- return protocol.getBypassJobStatus(id);
- }
-
- public void cancel(String jobId) {
- protocol.cancel(jobId);
- }
-
- ContextInfo getContextInfo() {
- return contextInfo;
- }
-
- public Future<Integer> submitReplCode(String code) throws Exception {
- return deferredCall(new BaseProtocol.ReplJobRequest(code), Integer.class);
- }
-
- public void cancelReplCode(int statementId) throws Exception {
- deferredCall(new BaseProtocol.CancelReplJobRequest(statementId), Void.class);
- }
-
- public Future<ReplJobResults> getReplJobResults(Integer from, Integer size) throws Exception {
- return deferredCall(new BaseProtocol.GetReplJobResults(from, size), ReplJobResults.class);
- }
-
- public Future<ReplJobResults> getReplJobResults() throws Exception {
- return deferredCall(new BaseProtocol.GetReplJobResults(), ReplJobResults.class);
- }
-
- /**
- * @return Return the repl state. If this's not connected to a repl session, it will return null.
- */
- public String getReplState() {
- return replState;
- }
-
- private class ClientProtocol extends BaseProtocol {
-
- <T> JobHandleImpl<T> submit(Job<T> job) {
- final String jobId = UUID.randomUUID().toString();
- Object msg = new JobRequest<T>(jobId, job);
-
- final Promise<T> promise = eventLoopGroup.next().newPromise();
- final JobHandleImpl<T> handle = new JobHandleImpl<T>(RSCClient.this,
- promise, jobId);
- jobs.put(jobId, handle);
-
- final io.netty.util.concurrent.Future<Void> rpc = deferredCall(msg, Void.class);
- LOG.debug("Sending JobRequest[{}].", jobId);
-
- Utils.addListener(rpc, new FutureListener<Void>() {
- @Override
- public void onSuccess(Void unused) throws Exception {
- handle.changeState(JobHandle.State.QUEUED);
- }
-
- @Override
- public void onFailure(Throwable error) throws Exception {
- error.printStackTrace();
- promise.tryFailure(error);
- }
- });
- promise.addListener(new GenericFutureListener<Promise<T>>() {
- @Override
- public void operationComplete(Promise<T> p) {
- if (jobId != null) {
- jobs.remove(jobId);
- }
- if (p.isCancelled() && !rpc.isDone()) {
- rpc.cancel(true);
- }
- }
- });
- return handle;
- }
-
- @SuppressWarnings("unchecked")
- <T> Future<T> run(Job<T> job) {
- return (Future<T>) deferredCall(new SyncJobRequest(job), Object.class);
- }
-
- String bypass(ByteBuffer serializedJob, boolean sync) {
- String jobId = UUID.randomUUID().toString();
- Object msg = new BypassJobRequest(jobId, BufferUtils.toByteArray(serializedJob), sync);
- deferredCall(msg, Void.class);
- return jobId;
- }
-
- Future<BypassJobStatus> getBypassJobStatus(String id) {
- return deferredCall(new GetBypassJobStatus(id), BypassJobStatus.class);
- }
-
- void cancel(String jobId) {
- deferredCall(new CancelJob(jobId), Void.class);
- }
-
- Future<?> endSession() {
- return deferredCall(new EndSession(), Void.class);
- }
-
- private void handle(ChannelHandlerContext ctx, InitializationError msg) {
- LOG.warn("Error reported from remote driver: %s", msg.stackTrace);
- }
-
- private void handle(ChannelHandlerContext ctx, JobResult msg) {
- JobHandleImpl<?> handle = jobs.remove(msg.id);
- if (handle != null) {
- LOG.info("Received result for {}", msg.id);
- // TODO: need a better exception for this.
- Throwable error = msg.error != null ? new RuntimeException(msg.error) : null;
- if (error == null) {
- handle.setSuccess(msg.result);
- } else {
- handle.setFailure(error);
- }
- } else {
- LOG.warn("Received result for unknown job {}", msg.id);
- }
- }
-
- private void handle(ChannelHandlerContext ctx, JobStarted msg) {
- JobHandleImpl<?> handle = jobs.get(msg.id);
- if (handle != null) {
- handle.changeState(JobHandle.State.STARTED);
- } else {
- LOG.warn("Received event for unknown job {}", msg.id);
- }
- }
-
- private void handle(ChannelHandlerContext ctx, ReplState msg) {
- LOG.trace("Received repl state for {}", msg.state);
- replState = msg.state;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/RSCClientFactory.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/RSCClientFactory.java b/rsc/src/main/java/com/cloudera/livy/rsc/RSCClientFactory.java
deleted file mode 100644
index 27540bb..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/RSCClientFactory.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import io.netty.util.concurrent.ImmediateEventExecutor;
-import io.netty.util.concurrent.Promise;
-
-import com.cloudera.livy.LivyClient;
-import com.cloudera.livy.LivyClientFactory;
-import com.cloudera.livy.rsc.rpc.RpcServer;
-
-/**
- * Factory for RSC clients.
- */
-public final class RSCClientFactory implements LivyClientFactory {
-
- private final AtomicInteger refCount = new AtomicInteger();
- private RpcServer server = null;
-
- /**
- * Creates a local Livy client if the URI has the "rsc" scheme.
- * <p>
- * If the URI contains user information, host and port, the library will try to connect to an
- * existing RSC instance with the provided information, and most of the provided configuration
- * will be ignored.
- * <p>
- * Otherwise, a new Spark context will be started with the given configuration.
- */
- @Override
- public LivyClient createClient(URI uri, Properties config) {
- if (!"rsc".equals(uri.getScheme())) {
- return null;
- }
-
- RSCConf lconf = new RSCConf(config);
-
- boolean needsServer = false;
- try {
- Promise<ContextInfo> info;
- Process driverProcess = null;
- if (uri.getUserInfo() != null && uri.getHost() != null && uri.getPort() > 0) {
- info = createContextInfo(uri);
- } else {
- needsServer = true;
- ref(lconf);
- DriverProcessInfo processInfo = ContextLauncher.create(this, lconf);
- info = processInfo.getContextInfo();
- driverProcess = processInfo.getDriverProcess();
- }
- return new RSCClient(lconf, info, driverProcess);
- } catch (Exception e) {
- if (needsServer) {
- unref();
- }
- throw Utils.propagate(e);
- }
- }
-
- RpcServer getServer() {
- return server;
- }
-
- private synchronized void ref(RSCConf config) throws IOException {
- if (refCount.get() != 0) {
- refCount.incrementAndGet();
- return;
- }
-
- Utils.checkState(server == null, "Server already running but ref count is 0.");
- if (server == null) {
- try {
- server = new RpcServer(config);
- } catch (InterruptedException ie) {
- throw Utils.propagate(ie);
- }
- }
-
- refCount.incrementAndGet();
- }
-
- synchronized void unref() {
- if (refCount.decrementAndGet() == 0) {
- server.close();
- server = null;
- }
- }
-
- private static Promise<ContextInfo> createContextInfo(final URI uri) {
- String[] userInfo = uri.getUserInfo().split(":", 2);
- ImmediateEventExecutor executor = ImmediateEventExecutor.INSTANCE;
- Promise<ContextInfo> promise = executor.newPromise();
- promise.setSuccess(new ContextInfo(uri.getHost(), uri.getPort(), userInfo[0], userInfo[1]));
- return promise;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java b/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java
deleted file mode 100644
index afd935d..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc;
-
-import java.io.IOException;
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import javax.security.sasl.Sasl;
-
-import com.cloudera.livy.client.common.ClientConf;
-
-public class RSCConf extends ClientConf<RSCConf> {
-
- public static final String SPARK_CONF_PREFIX = "spark.";
- public static final String LIVY_SPARK_PREFIX = SPARK_CONF_PREFIX + "__livy__.";
-
- private static final String RSC_CONF_PREFIX = "livy.rsc.";
-
- public static enum Entry implements ConfEntry {
- CLIENT_ID("client.auth.id", null),
- CLIENT_SECRET("client.auth.secret", null),
- CLIENT_IN_PROCESS("client.do-not-use.run-driver-in-process", false),
- CLIENT_SHUTDOWN_TIMEOUT("client.shutdown-timeout", "10s"),
- DRIVER_CLASS("driver-class", null),
- SESSION_KIND("session.kind", null),
-
- LIVY_JARS("jars", null),
- SPARKR_PACKAGE("sparkr.package", null),
- PYSPARK_ARCHIVES("pyspark.archives", null),
-
- // Address for the RSC driver to connect back with it's connection info.
- LAUNCHER_ADDRESS("launcher.address", null),
- LAUNCHER_PORT_RANGE("launcher.port.range", "10000~10010"),
- // Setting up of this propety by user has no benefit. It is currently being used
- // to pass port information from ContextLauncher to RSCDriver
- LAUNCHER_PORT("launcher.port", -1),
- // How long will the RSC wait for a connection for a Livy server before shutting itself down.
- SERVER_IDLE_TIMEOUT("server.idle-timeout", "10m"),
-
- PROXY_USER("proxy-user", null),
-
- RPC_SERVER_ADDRESS("rpc.server.address", null),
- RPC_CLIENT_HANDSHAKE_TIMEOUT("server.connect.timeout", "90s"),
- RPC_CLIENT_CONNECT_TIMEOUT("client.connect.timeout", "10s"),
- RPC_CHANNEL_LOG_LEVEL("channel.log.level", null),
- RPC_MAX_MESSAGE_SIZE("rpc.max.size", 50 * 1024 * 1024),
- RPC_MAX_THREADS("rpc.threads", 8),
- RPC_SECRET_RANDOM_BITS("secret.bits", 256),
-
- SASL_MECHANISMS("rpc.sasl.mechanisms", "DIGEST-MD5"),
- SASL_QOP("rpc.sasl.qop", null),
-
- TEST_STUCK_END_SESSION("test.do-not-use.stuck-end-session", false),
- TEST_STUCK_START_DRIVER("test.do-not-use.stuck-start-driver", false),
-
- JOB_CANCEL_TRIGGER_INTERVAL("job-cancel.trigger-interval", "100ms"),
- JOB_CANCEL_TIMEOUT("job-cancel.timeout", "30s"),
-
- RETAINED_STATEMENT_NUMBER("retained-statements", 100);
-
- private final String key;
- private final Object dflt;
-
- private Entry(String key, Object dflt) {
- this.key = RSC_CONF_PREFIX + key;
- this.dflt = dflt;
- }
-
- @Override
- public String key() { return key; }
-
- @Override
- public Object dflt() { return dflt; }
- }
-
- public RSCConf() {
- this(new Properties());
- }
-
- public RSCConf(Properties config) {
- super(config);
- }
-
- public Map<String, String> getSaslOptions() {
- Map<String, String> opts = new HashMap<>();
-
- // TODO: add more options?
- String qop = get(Entry.SASL_QOP);
- if (qop != null) {
- opts.put(Sasl.QOP, qop);
- }
-
- return opts;
- }
-
- public String findLocalAddress() throws IOException {
- InetAddress address = InetAddress.getLocalHost();
- if (address.isLoopbackAddress()) {
- // Address resolves to something like 127.0.1.1, which happens on Debian;
- // try to find a better address using the local network interfaces
- Enumeration<NetworkInterface> ifaces = NetworkInterface.getNetworkInterfaces();
- while (ifaces.hasMoreElements()) {
- NetworkInterface ni = ifaces.nextElement();
- Enumeration<InetAddress> addrs = ni.getInetAddresses();
- while (addrs.hasMoreElements()) {
- InetAddress addr = addrs.nextElement();
- if (!addr.isLinkLocalAddress() && !addr.isLoopbackAddress()
- && addr instanceof Inet4Address) {
- // We've found an address that looks reasonable!
- LOG.warn("Your hostname, {}, resolves to a loopback address; using {} "
- + " instead (on interface {})", address.getHostName(), addr.getHostAddress(),
- ni.getName());
- LOG.warn("Set '{}' if you need to bind to another address.",
- Entry.RPC_SERVER_ADDRESS.key);
- return addr.getHostAddress();
- }
- }
- }
- }
-
- LOG.warn("Your hostname, {}, resolves to a loopback address, but we couldn't find "
- + "any external IP address!", address.getCanonicalHostName());
- LOG.warn("Set {} if you need to bind to another address.",
- Entry.RPC_SERVER_ADDRESS.key);
- return address.getCanonicalHostName();
- }
-
- private static final Map<String, DeprecatedConf> configsWithAlternatives
- = Collections.unmodifiableMap(new HashMap<String, DeprecatedConf>() {{
- put(RSCConf.Entry.CLIENT_IN_PROCESS.key, DepConf.CLIENT_IN_PROCESS);
- put(RSCConf.Entry.CLIENT_SHUTDOWN_TIMEOUT.key, DepConf.CLIENT_SHUTDOWN_TIMEOUT);
- put(RSCConf.Entry.DRIVER_CLASS.key, DepConf.DRIVER_CLASS);
- put(RSCConf.Entry.SERVER_IDLE_TIMEOUT.key, DepConf.SERVER_IDLE_TIMEOUT);
- put(RSCConf.Entry.PROXY_USER.key, DepConf.PROXY_USER);
- put(RSCConf.Entry.TEST_STUCK_END_SESSION.key, DepConf.TEST_STUCK_END_SESSION);
- put(RSCConf.Entry.TEST_STUCK_START_DRIVER.key, DepConf.TEST_STUCK_START_DRIVER);
- put(RSCConf.Entry.JOB_CANCEL_TRIGGER_INTERVAL.key, DepConf.JOB_CANCEL_TRIGGER_INTERVAL);
- put(RSCConf.Entry.JOB_CANCEL_TIMEOUT.key, DepConf.JOB_CANCEL_TIMEOUT);
- put(RSCConf.Entry.RETAINED_STATEMENT_NUMBER.key, DepConf.RETAINED_STATEMENT_NUMBER);
- }});
-
- // Maps deprecated key to DeprecatedConf with the same key.
- // There are no deprecated configs without alternatives currently.
- private static final Map<String, DeprecatedConf> deprecatedConfigs
- = Collections.unmodifiableMap(new HashMap<String, DeprecatedConf>());
-
- protected Map<String, DeprecatedConf> getConfigsWithAlternatives() {
- return configsWithAlternatives;
- }
-
- protected Map<String, DeprecatedConf> getDeprecatedConfigs() {
- return deprecatedConfigs;
- }
-
- static enum DepConf implements DeprecatedConf {
- CLIENT_IN_PROCESS("client.do_not_use.run_driver_in_process", "0.4"),
- CLIENT_SHUTDOWN_TIMEOUT("client.shutdown_timeout", "0.4"),
- DRIVER_CLASS("driver_class", "0.4"),
- SERVER_IDLE_TIMEOUT("server.idle_timeout", "0.4"),
- PROXY_USER("proxy_user", "0.4"),
- TEST_STUCK_END_SESSION("test.do_not_use.stuck_end_session", "0.4"),
- TEST_STUCK_START_DRIVER("test.do_not_use.stuck_start_driver", "0.4"),
- JOB_CANCEL_TRIGGER_INTERVAL("job_cancel.trigger_interval", "0.4"),
- JOB_CANCEL_TIMEOUT("job_cancel.timeout", "0.4"),
- RETAINED_STATEMENT_NUMBER("retained_statements", "0.4");
-
- private final String key;
- private final String version;
- private final String deprecationMessage;
-
- private DepConf(String key, String version) {
- this(key, version, "");
- }
-
- private DepConf(String key, String version, String deprecationMessage) {
- this.key = RSC_CONF_PREFIX + key;
- this.version = version;
- this.deprecationMessage = deprecationMessage;
- }
-
- @Override
- public String key() { return key; }
-
- @Override
- public String version() { return version; }
-
- @Override
- public String deprecationMessage() { return deprecationMessage; }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/ReplJobResults.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/ReplJobResults.java b/rsc/src/main/java/com/cloudera/livy/rsc/ReplJobResults.java
deleted file mode 100644
index deef162..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/ReplJobResults.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.cloudera.livy.rsc;
-
-import com.cloudera.livy.rsc.driver.Statement;
-
-public class ReplJobResults {
- public final Statement[] statements;
-
- public ReplJobResults(Statement[] statements) {
- this.statements = statements;
- }
-
- public ReplJobResults() {
- this(null);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/Utils.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/Utils.java b/rsc/src/main/java/com/cloudera/livy/rsc/Utils.java
deleted file mode 100644
index 993ea1e..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/Utils.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc;
-
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-
-/**
- * A few simple utility functions used by the code, mostly to avoid a direct dependency
- * on Guava.
- */
-public class Utils {
-
- public static void checkArgument(boolean condition) {
- if (!condition) {
- throw new IllegalArgumentException();
- }
- }
-
- public static void checkArgument(boolean condition, String msg, Object... args) {
- if (!condition) {
- throw new IllegalArgumentException(String.format(msg, args));
- }
- }
-
- public static void checkState(boolean condition, String msg, Object... args) {
- if (!condition) {
- throw new IllegalStateException(String.format(msg, args));
- }
- }
-
- public static void checkNotNull(Object o) {
- if (o == null) {
- throw new NullPointerException();
- }
- }
-
- public static RuntimeException propagate(Throwable t) {
- if (t instanceof RuntimeException) {
- throw (RuntimeException) t;
- } else {
- throw new RuntimeException(t);
- }
- }
-
- public static ThreadFactory newDaemonThreadFactory(final String nameFormat) {
- return new ThreadFactory() {
-
- private final AtomicInteger threadId = new AtomicInteger();
-
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r);
- t.setName(String.format(nameFormat, threadId.incrementAndGet()));
- t.setDaemon(true);
- return t;
- }
-
- };
- }
-
- public static String join(Iterable<String> strs, String sep) {
- StringBuilder sb = new StringBuilder();
- for (String s : strs) {
- if (s != null && !s.isEmpty()) {
- sb.append(s).append(sep);
- }
- }
- if (sb.length() > 0) {
- sb.setLength(sb.length() - sep.length());
- }
- return sb.toString();
- }
-
- public static String stackTraceAsString(Throwable t) {
- StringBuilder sb = new StringBuilder();
- sb.append(t.getClass().getName()).append(": ").append(t.getMessage());
- for (StackTraceElement e : t.getStackTrace()) {
- sb.append("\n");
- sb.append(e.toString());
- }
- return sb.toString();
- }
-
- public static <T> void addListener(Future<T> future, final FutureListener<T> lsnr) {
- future.addListener(new GenericFutureListener<Future<T>>() {
- @Override
- public void operationComplete(Future<T> f) throws Exception {
- if (f.isSuccess()) {
- lsnr.onSuccess(f.get());
- } else {
- lsnr.onFailure(f.cause());
- }
- }
- });
- }
-
- private Utils() { }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/driver/AddFileJob.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/driver/AddFileJob.java b/rsc/src/main/java/com/cloudera/livy/rsc/driver/AddFileJob.java
deleted file mode 100644
index 82b0a69..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/driver/AddFileJob.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.cloudera.livy.rsc.driver;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobContext;
-
-public class AddFileJob implements Job<Object> {
-
- private final String path;
-
- AddFileJob() {
- this(null);
-}
-
- public AddFileJob(String path) {
- this.path = path;
-}
-
- @Override
- public Object call(JobContext jc) throws Exception {
- JobContextImpl jobContextImpl = (JobContextImpl)jc;
- jobContextImpl.addFile(path);
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/driver/AddJarJob.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/driver/AddJarJob.java b/rsc/src/main/java/com/cloudera/livy/rsc/driver/AddJarJob.java
deleted file mode 100644
index d23f3b8..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/driver/AddJarJob.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc.driver;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobContext;
-
-public class AddJarJob implements Job<Object> {
-
- private final String path;
-
- // For serialization.
- private AddJarJob() {
- this(null);
- }
-
- public AddJarJob(String path) {
- this.path = path;
- }
-
- @Override
- public Object call(JobContext jc) throws Exception {
- JobContextImpl jobContextImpl = (JobContextImpl)jc;
- jobContextImpl.addJarOrPyFile(path);
- return null;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/driver/BypassJob.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/driver/BypassJob.java b/rsc/src/main/java/com/cloudera/livy/rsc/driver/BypassJob.java
deleted file mode 100644
index 8ca2c8e..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/driver/BypassJob.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc.driver;
-
-import java.nio.ByteBuffer;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobContext;
-import com.cloudera.livy.client.common.BufferUtils;
-import com.cloudera.livy.client.common.Serializer;
-
-class BypassJob implements Job<byte[]> {
-
- private final Serializer serializer;
- private final byte[] serializedJob;
-
- BypassJob(Serializer serializer, byte[] serializedJob) {
- this.serializer = serializer;
- this.serializedJob = serializedJob;
- }
-
- @Override
- public byte[] call(JobContext jc) throws Exception {
- Job<?> job = (Job<?>) serializer.deserialize(ByteBuffer.wrap(serializedJob));
- Object result = job.call(jc);
- byte[] serializedResult;
- if (result != null) {
- ByteBuffer data = serializer.serialize(result);
- serializedResult = BufferUtils.toByteArray(data);
- } else {
- serializedResult = null;
- }
- return serializedResult;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/driver/BypassJobWrapper.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/driver/BypassJobWrapper.java b/rsc/src/main/java/com/cloudera/livy/rsc/driver/BypassJobWrapper.java
deleted file mode 100644
index b8951af..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/driver/BypassJobWrapper.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc.driver;
-
-import java.util.List;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobHandle;
-import com.cloudera.livy.rsc.BypassJobStatus;
-import com.cloudera.livy.rsc.Utils;
-
-public class BypassJobWrapper extends JobWrapper<byte[]> {
-
- private volatile byte[] result;
- private volatile Throwable error;
- private volatile JobHandle.State state;
- private volatile List<Integer> newSparkJobs;
-
- public BypassJobWrapper(RSCDriver driver, String jobId, Job<byte[]> serializedJob) {
- super(driver, jobId, serializedJob);
- state = JobHandle.State.QUEUED;
- }
-
- @Override
- public Void call() throws Exception {
- state = JobHandle.State.STARTED;
- return super.call();
- }
-
- @Override
- protected synchronized void finished(byte[] result, Throwable error) {
- if (error == null) {
- this.result = result;
- this.state = JobHandle.State.SUCCEEDED;
- } else {
- this.error = error;
- this.state = JobHandle.State.FAILED;
- }
- }
-
- @Override
- boolean cancel() {
- if (super.cancel()) {
- this.state = JobHandle.State.CANCELLED;
- return true;
- }
- return false;
- }
-
- @Override
- protected void jobStarted() {
- // Do nothing; just avoid sending data back to the driver.
- }
-
- synchronized BypassJobStatus getStatus() {
- String stackTrace = error != null ? Utils.stackTraceAsString(error) : null;
- return new BypassJobStatus(state, result, stackTrace);
- }
-
-}