You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/07/05 03:06:53 UTC

[24/33] incubator-livy git commit: LIVY-375. Change Livy code package name to org.apache.livy

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/org/apache/livy/repl/SparkSessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/SparkSessionSpec.scala b/repl/src/test/scala/org/apache/livy/repl/SparkSessionSpec.scala
new file mode 100644
index 0000000..9369519
--- /dev/null
+++ b/repl/src/test/scala/org/apache/livy/repl/SparkSessionSpec.scala
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.repl
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.apache.spark.SparkConf
+import org.json4s.Extraction
+import org.json4s.JsonAST.JValue
+import org.json4s.jackson.JsonMethods.parse
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.livy.rsc.RSCConf
+import org.apache.livy.rsc.driver.StatementState
+
+class SparkSessionSpec extends BaseSessionSpec {
+
+  override def createInterpreter(): Interpreter = new SparkInterpreter(new SparkConf())
+
+  it should "execute `1 + 2` == 3" in withSession { session =>
+    val statement = execute(session)("1 + 2")
+    statement.id should equal (0)
+
+    val result = parse(statement.output)
+    val expectedResult = Extraction.decompose(Map(
+      "status" -> "ok",
+      "execution_count" -> 0,
+      "data" -> Map(
+        "text/plain" -> "res0: Int = 3"
+      )
+    ))
+
+    result should equal (expectedResult)
+  }
+
+  it should "execute `x = 1`, then `y = 2`, then `x + y`" in withSession { session =>
+    val executeWithSession = execute(session)(_)
+    var statement = executeWithSession("val x = 1")
+    statement.id should equal (0)
+
+    var result = parse(statement.output)
+    var expectedResult = Extraction.decompose(Map(
+      "status" -> "ok",
+      "execution_count" -> 0,
+      "data" -> Map(
+        "text/plain" -> "x: Int = 1"
+      )
+    ))
+
+    result should equal (expectedResult)
+
+    statement = executeWithSession("val y = 2")
+    statement.id should equal (1)
+
+    result = parse(statement.output)
+    expectedResult = Extraction.decompose(Map(
+      "status" -> "ok",
+      "execution_count" -> 1,
+      "data" -> Map(
+        "text/plain" -> "y: Int = 2"
+      )
+    ))
+
+    result should equal (expectedResult)
+
+    statement = executeWithSession("x + y")
+    statement.id should equal (2)
+
+    result = parse(statement.output)
+    expectedResult = Extraction.decompose(Map(
+      "status" -> "ok",
+      "execution_count" -> 2,
+      "data" -> Map(
+        "text/plain" -> "res0: Int = 3"
+      )
+    ))
+
+    result should equal (expectedResult)
+  }
+
+  it should "capture stdout" in withSession { session =>
+    val statement = execute(session)("""println("Hello World")""")
+    statement.id should equal (0)
+
+    val result = parse(statement.output)
+    val expectedResult = Extraction.decompose(Map(
+      "status" -> "ok",
+      "execution_count" -> 0,
+      "data" -> Map(
+        "text/plain" -> "Hello World"
+      )
+    ))
+
+    result should equal (expectedResult)
+  }
+
+  it should "report an error if accessing an unknown variable" in withSession { session =>
+    val statement = execute(session)("""x""")
+    statement.id should equal (0)
+
+    val result = parse(statement.output)
+
+    def extract(key: String): String = (result \ key).extract[String]
+
+    extract("status") should equal ("error")
+    extract("execution_count") should equal ("0")
+    extract("ename") should equal ("Error")
+    extract("evalue") should include ("error: not found: value x")
+  }
+
+  it should "report an error if exception is thrown" in withSession { session =>
+    val statement = execute(session)(
+      """def func1() {
+        |throw new Exception()
+        |}
+        |func1()""".stripMargin)
+    statement.id should equal (0)
+
+    val result = parse(statement.output)
+    val resultMap = result.extract[Map[String, JValue]]
+
+    // Manually extract the values since the line numbers in the exception could change.
+    resultMap("status").extract[String] should equal ("error")
+    resultMap("execution_count").extract[Int] should equal (0)
+    resultMap("ename").extract[String] should equal ("Error")
+    resultMap("evalue").extract[String] should include ("java.lang.Exception")
+
+    val traceback = resultMap("traceback").extract[Seq[String]]
+    traceback(0) should include ("func1(<console>:")
+  }
+
+  it should "access the spark context" in withSession { session =>
+    val statement = execute(session)("""sc""")
+    statement.id should equal (0)
+
+    val result = parse(statement.output)
+    val resultMap = result.extract[Map[String, JValue]]
+
+    // Manually extract the values since the line numbers in the exception could change.
+    resultMap("status").extract[String] should equal ("ok")
+    resultMap("execution_count").extract[Int] should equal (0)
+
+    val data = resultMap("data").extract[Map[String, JValue]]
+    data("text/plain").extract[String] should include (
+      "res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext")
+  }
+
+  it should "execute spark commands" in withSession { session =>
+    val statement = execute(session)(
+      """sc.parallelize(0 to 1).map{i => i+1}.collect""".stripMargin)
+    statement.id should equal (0)
+
+    val result = parse(statement.output)
+
+    val expectedResult = Extraction.decompose(Map(
+      "status" -> "ok",
+      "execution_count" -> 0,
+      "data" -> Map(
+        "text/plain" -> "res0: Array[Int] = Array(1, 2)"
+      )
+    ))
+
+    result should equal (expectedResult)
+  }
+
+  it should "do table magic" in withSession { session =>
+    val statement = execute(session)("val x = List((1, \"a\"), (3, \"b\"))\n%table x")
+    statement.id should equal (0)
+
+    val result = parse(statement.output)
+
+    val expectedResult = Extraction.decompose(Map(
+      "status" -> "ok",
+      "execution_count" -> 0,
+      "data" -> Map(
+        "application/vnd.livy.table.v1+json" -> Map(
+          "headers" -> List(
+            Map("type" -> "BIGINT_TYPE", "name" -> "_1"),
+            Map("type" -> "STRING_TYPE", "name" -> "_2")),
+          "data" -> List(List(1, "a"), List(3, "b"))
+        )
+      )
+    ))
+
+    result should equal (expectedResult)
+  }
+
+  it should "cancel spark jobs" in withSession { session =>
+    val stmtId = session.execute(
+      """sc.parallelize(0 to 10).map { i => Thread.sleep(10000); i + 1 }.collect""".stripMargin)
+    eventually(timeout(30 seconds), interval(100 millis)) {
+      assert(session.statements(stmtId).state.get() == StatementState.Running)
+    }
+    session.cancel(stmtId)
+
+    eventually(timeout(30 seconds), interval(100 millis)) {
+      assert(session.statements(stmtId).state.get() == StatementState.Cancelled)
+      session.statements(stmtId).output should include (
+        "Job 0 cancelled part of cancelled job group 0")
+    }
+  }
+
+  it should "cancel waiting statement" in withSession { session =>
+    val stmtId1 = session.execute(
+      """sc.parallelize(0 to 10).map { i => Thread.sleep(10000); i + 1 }.collect""".stripMargin)
+    val stmtId2 = session.execute(
+      """sc.parallelize(0 to 10).map { i => Thread.sleep(10000); i + 1 }.collect""".stripMargin)
+    eventually(timeout(30 seconds), interval(100 millis)) {
+      assert(session.statements(stmtId1).state.get() == StatementState.Running)
+    }
+
+    assert(session.statements(stmtId2).state.get() == StatementState.Waiting)
+
+    session.cancel(stmtId2)
+    assert(session.statements(stmtId2).state.get() == StatementState.Cancelled)
+
+    session.cancel(stmtId1)
+    assert(session.statements(stmtId1).state.get() == StatementState.Cancelling)
+    eventually(timeout(30 seconds), interval(100 millis)) {
+      assert(session.statements(stmtId1).state.get() == StatementState.Cancelled)
+      session.statements(stmtId1).output should include (
+        "Job 0 cancelled part of cancelled job group 0")
+    }
+  }
+
+  it should "correctly calculate progress" in withSession { session =>
+    val executeCode =
+      """
+        |sc.parallelize(1 to 2, 2).map(i => (i, 1)).collect()
+      """.stripMargin
+
+    val stmtId = session.execute(executeCode)
+    eventually(timeout(30 seconds), interval(100 millis)) {
+      session.progressOfStatement(stmtId) should be(1.0)
+    }
+  }
+
+  it should "not generate Spark jobs for plain Scala code" in withSession { session =>
+    val executeCode = """1 + 1"""
+
+    val stmtId = session.execute(executeCode)
+    session.progressOfStatement(stmtId) should be (0.0)
+  }
+
+  it should "handle multiple jobs in one statement" in withSession { session =>
+    val executeCode =
+      """
+        |sc.parallelize(1 to 2, 2).map(i => (i, 1)).collect()
+        |sc.parallelize(1 to 2, 2).map(i => (i, 1)).collect()
+      """.stripMargin
+
+    val stmtId = session.execute(executeCode)
+    eventually(timeout(30 seconds), interval(100 millis)) {
+      session.progressOfStatement(stmtId) should be(1.0)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/pom.xml
----------------------------------------------------------------------
diff --git a/rsc/pom.xml b/rsc/pom.xml
index 7ff8c2a..660ca35 100644
--- a/rsc/pom.xml
+++ b/rsc/pom.xml
@@ -19,14 +19,14 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
-    <groupId>com.cloudera.livy</groupId>
+    <groupId>org.apache.livy</groupId>
     <artifactId>livy-main</artifactId>
-    <version>0.4.0-SNAPSHOT</version>
+    <version>0.4.0-incubating-SNAPSHOT</version>
   </parent>
 
-  <groupId>com.cloudera.livy</groupId>
+  <groupId>org.apache.livy</groupId>
   <artifactId>livy-rsc</artifactId>
-  <version>0.4.0-SNAPSHOT</version>
+  <version>0.4.0-incubating-SNAPSHOT</version>
   <packaging>jar</packaging>
 
   <properties>
@@ -35,17 +35,17 @@
 
   <dependencies>
     <dependency>
-      <groupId>com.cloudera.livy</groupId>
+      <groupId>org.apache.livy</groupId>
       <artifactId>livy-api</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>com.cloudera.livy</groupId>
+      <groupId>org.apache.livy</groupId>
       <artifactId>livy-client-common</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>com.cloudera.livy</groupId>
+      <groupId>org.apache.livy</groupId>
       <artifactId>livy-test-lib</artifactId>
       <version>${project.version}</version>
       <scope>test</scope>
@@ -133,7 +133,7 @@
               <shadedArtifactAttached>false</shadedArtifactAttached>
               <artifactSet>
                 <includes>
-                  <include>com.cloudera.livy:livy-client-common</include>
+                  <include>org.apache.livy:livy-client-common</include>
                   <include>com.esotericsoftware.kryo:kryo</include>
                 </includes>
               </artifactSet>
@@ -150,7 +150,7 @@
               <relocations>
                 <relocation>
                   <pattern>com.esotericsoftware</pattern>
-                  <shadedPattern>com.cloudera.livy.shaded.kryo</shadedPattern>
+                  <shadedPattern>org.apache.livy.shaded.kryo</shadedPattern>
                 </relocation>
               </relocations>
               <outputFile>${project.build.directory}/jars/${project.artifactId}-${project.version}.jar</outputFile>

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/BaseProtocol.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/BaseProtocol.java b/rsc/src/main/java/com/cloudera/livy/rsc/BaseProtocol.java
deleted file mode 100644
index 419c1f7..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/BaseProtocol.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.rsc.driver.Statement;
-import com.cloudera.livy.rsc.rpc.RpcDispatcher;
-
-public abstract class BaseProtocol extends RpcDispatcher {
-
-  protected static class CancelJob {
-
-    public final String id;
-
-    CancelJob(String id) {
-      this.id = id;
-    }
-
-    CancelJob() {
-      this(null);
-    }
-
-  }
-
-  protected static class EndSession {
-
-  }
-
-  protected static class Error {
-
-    public final String cause;
-
-    public Error(Throwable cause) {
-      if (cause == null) {
-        this.cause = "";
-      } else {
-        this.cause = Utils.stackTraceAsString(cause);
-      }
-    }
-
-    public Error() {
-      this(null);
-    }
-
-  }
-
-  public static class BypassJobRequest {
-
-    public final String id;
-    public final byte[] serializedJob;
-    public final boolean synchronous;
-
-    public BypassJobRequest(String id, byte[] serializedJob, boolean synchronous) {
-      this.id = id;
-      this.serializedJob = serializedJob;
-      this.synchronous = synchronous;
-    }
-
-    public BypassJobRequest() {
-      this(null, null, false);
-    }
-
-  }
-
-  protected static class GetBypassJobStatus {
-
-    public final String id;
-
-    public GetBypassJobStatus(String id) {
-      this.id = id;
-    }
-
-    public GetBypassJobStatus() {
-      this(null);
-    }
-
-  }
-
-  protected static class JobRequest<T> {
-
-    public final String id;
-    public final Job<T> job;
-
-    public JobRequest(String id, Job<T> job) {
-      this.id = id;
-      this.job = job;
-    }
-
-    public JobRequest() {
-      this(null, null);
-    }
-
-  }
-
-  protected static class JobResult<T> {
-
-    public final String id;
-    public final T result;
-    public final String error;
-
-    public JobResult(String id, T result, Throwable error) {
-      this.id = id;
-      this.result = result;
-      this.error = error != null ? Utils.stackTraceAsString(error) : null;
-    }
-
-    public JobResult() {
-      this(null, null, null);
-    }
-
-  }
-
-  protected static class JobStarted {
-
-    public final String id;
-
-    public JobStarted(String id) {
-      this.id = id;
-    }
-
-    public JobStarted() {
-      this(null);
-    }
-
-  }
-
-  protected static class SyncJobRequest<T> {
-
-    public final Job<T> job;
-
-    public SyncJobRequest(Job<T> job) {
-      this.job = job;
-    }
-
-    public SyncJobRequest() {
-      this(null);
-    }
-
-  }
-
-  public static class RemoteDriverAddress {
-
-    public final String host;
-    public final int port;
-
-    public RemoteDriverAddress(String host, int port) {
-      this.host = host;
-      this.port = port;
-    }
-
-    public RemoteDriverAddress() {
-      this(null, -1);
-    }
-
-  }
-
-  public static class ReplJobRequest {
-
-    public final String code;
-
-    public ReplJobRequest(String code) {
-      this.code = code;
-    }
-
-    public ReplJobRequest() {
-      this(null);
-    }
-  }
-
-  public static class GetReplJobResults {
-    public boolean allResults;
-    public Integer from, size;
-
-    public GetReplJobResults(Integer from, Integer size) {
-      this.allResults = false;
-      this.from = from;
-      this.size = size;
-    }
-
-    public GetReplJobResults() {
-      this.allResults = true;
-      from = null;
-      size = null;
-    }
-  }
-
-  protected static class ReplState {
-
-    public final String state;
-
-    public ReplState(String state) {
-      this.state = state;
-    }
-
-    public ReplState() {
-      this(null);
-    }
-  }
-
-  public static class CancelReplJobRequest {
-    public final int id;
-
-    public CancelReplJobRequest(int id) {
-      this.id = id;
-    }
-
-    public CancelReplJobRequest() {
-      this(-1);
-    }
-  }
-
-  public static class InitializationError {
-
-    public final String stackTrace;
-
-    public InitializationError(String stackTrace) {
-      this.stackTrace = stackTrace;
-    }
-
-    public InitializationError() {
-      this(null);
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/BypassJobStatus.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/BypassJobStatus.java b/rsc/src/main/java/com/cloudera/livy/rsc/BypassJobStatus.java
deleted file mode 100644
index 7f35cc7..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/BypassJobStatus.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc;
-
-import java.util.List;
-
-import com.cloudera.livy.JobHandle;
-
-public class BypassJobStatus {
-
-  public final JobHandle.State state;
-  public final byte[] result;
-  public final String error;
-
-  public BypassJobStatus(JobHandle.State state, byte[] result, String error) {
-    this.state = state;
-    this.result = result;
-    this.error = error;
-  }
-
-  BypassJobStatus() {
-    this(null, null, null);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/ContextInfo.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/ContextInfo.java b/rsc/src/main/java/com/cloudera/livy/rsc/ContextInfo.java
deleted file mode 100644
index 5e930cd..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/ContextInfo.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc;
-
-/**
- * Information about a running RSC instance.
- */
-class ContextInfo {
-
-  final String remoteAddress;
-  final int remotePort;
-  final String clientId;
-  final String secret;
-
-  ContextInfo(String remoteAddress, int remotePort, String clientId, String secret) {
-    this.remoteAddress = remoteAddress;
-    this.remotePort = remotePort;
-    this.clientId = clientId;
-    this.secret = secret;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java b/rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java
deleted file mode 100644
index 9a5e447..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java
+++ /dev/null
@@ -1,456 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.Reader;
-import java.io.Writer;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.nio.file.attribute.PosixFilePermission.*;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.util.concurrent.Promise;
-import org.apache.spark.launcher.SparkLauncher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.livy.client.common.TestUtils;
-import com.cloudera.livy.rsc.driver.RSCDriverBootstrapper;
-import com.cloudera.livy.rsc.rpc.Rpc;
-import com.cloudera.livy.rsc.rpc.RpcDispatcher;
-import com.cloudera.livy.rsc.rpc.RpcServer;
-import static com.cloudera.livy.rsc.RSCConf.Entry.*;
-
-/**
- * Encapsulates code needed to launch a new Spark context and collect information about how
- * to establish a client connection to it.
- */
-class ContextLauncher {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ContextLauncher.class);
-  private static final AtomicInteger CHILD_IDS = new AtomicInteger();
-
-  private static final String SPARK_DEPLOY_MODE = "spark.submit.deployMode";
-  private static final String SPARK_JARS_KEY = "spark.jars";
-  private static final String SPARK_ARCHIVES_KEY = "spark.yarn.dist.archives";
-  private static final String SPARK_HOME_ENV = "SPARK_HOME";
-
-  static DriverProcessInfo create(RSCClientFactory factory, RSCConf conf)
-      throws IOException {
-    ContextLauncher launcher = new ContextLauncher(factory, conf);
-    return new DriverProcessInfo(launcher.promise, launcher.child.child);
-  }
-
-  private final Promise<ContextInfo> promise;
-  private final ScheduledFuture<?> timeout;
-  private final String clientId;
-  private final String secret;
-  private final ChildProcess child;
-  private final RSCConf conf;
-  private final RSCClientFactory factory;
-
-  private ContextLauncher(RSCClientFactory factory, RSCConf conf) throws IOException {
-    this.promise = factory.getServer().getEventLoopGroup().next().newPromise();
-    this.clientId = UUID.randomUUID().toString();
-    this.secret = factory.getServer().createSecret();
-    this.conf = conf;
-    this.factory = factory;
-
-    final RegistrationHandler handler = new RegistrationHandler();
-    try {
-      factory.getServer().registerClient(clientId, secret, handler);
-      String replMode = conf.get("repl");
-      boolean repl = replMode != null && replMode.equals("true");
-
-      conf.set(LAUNCHER_ADDRESS, factory.getServer().getAddress());
-      conf.set(LAUNCHER_PORT, factory.getServer().getPort());
-      conf.set(CLIENT_ID, clientId);
-      conf.set(CLIENT_SECRET, secret);
-
-      Utils.addListener(promise, new FutureListener<ContextInfo>() {
-        @Override
-        public void onFailure(Throwable error) throws Exception {
-          // If promise is cancelled or failed, make sure spark-submit is not leaked.
-          if (child != null) {
-            child.kill();
-          }
-        }
-      });
-
-      this.child = startDriver(conf, promise);
-
-      // Set up a timeout to fail the promise if we don't hear back from the context
-      // after a configurable timeout.
-      Runnable timeoutTask = new Runnable() {
-        @Override
-        public void run() {
-          connectTimeout(handler);
-        }
-      };
-      this.timeout = factory.getServer().getEventLoopGroup().schedule(timeoutTask,
-        conf.getTimeAsMs(RPC_CLIENT_HANDSHAKE_TIMEOUT), TimeUnit.MILLISECONDS);
-    } catch (Exception e) {
-      dispose(true);
-      throw Utils.propagate(e);
-    }
-  }
-
-  private void connectTimeout(RegistrationHandler handler) {
-    if (promise.tryFailure(new TimeoutException("Timed out waiting for context to start."))) {
-      handler.dispose();
-    }
-    dispose(true);
-  }
-
-  private void dispose(boolean forceKill) {
-    factory.getServer().unregisterClient(clientId);
-    try {
-      if (child != null) {
-        if (forceKill) {
-          child.kill();
-        } else {
-          child.detach();
-        }
-      }
-    } finally {
-      factory.unref();
-    }
-  }
-
-  private static ChildProcess startDriver(final RSCConf conf, Promise<?> promise)
-      throws IOException {
-    String livyJars = conf.get(LIVY_JARS);
-    if (livyJars == null) {
-      String livyHome = System.getenv("LIVY_HOME");
-      Utils.checkState(livyHome != null,
-        "Need one of LIVY_HOME or %s set.", LIVY_JARS.key());
-      File rscJars = new File(livyHome, "rsc-jars");
-      if (!rscJars.isDirectory()) {
-        rscJars = new File(livyHome, "rsc/target/jars");
-      }
-      Utils.checkState(rscJars.isDirectory(),
-        "Cannot find 'client-jars' directory under LIVY_HOME.");
-      List<String> jars = new ArrayList<>();
-      for (File f : rscJars.listFiles()) {
-         jars.add(f.getAbsolutePath());
-      }
-      livyJars = Utils.join(jars, ",");
-    }
-    merge(conf, SPARK_JARS_KEY, livyJars, ",");
-
-    String kind = conf.get(SESSION_KIND);
-    if ("sparkr".equals(kind)) {
-      merge(conf, SPARK_ARCHIVES_KEY, conf.get(RSCConf.Entry.SPARKR_PACKAGE), ",");
-    } else if ("pyspark".equals(kind)) {
-      merge(conf, "spark.submit.pyFiles", conf.get(RSCConf.Entry.PYSPARK_ARCHIVES), ",");
-    }
-
-    // Disable multiple attempts since the RPC server doesn't yet support multiple
-    // connections for the same registered app.
-    conf.set("spark.yarn.maxAppAttempts", "1");
-
-    // Let the launcher go away when launcher in yarn cluster mode. This avoids keeping lots
-    // of "small" Java processes lingering on the Livy server node.
-    conf.set("spark.yarn.submit.waitAppCompletion", "false");
-
-    if (!conf.getBoolean(CLIENT_IN_PROCESS) &&
-        // For tests which doesn't shutdown RscDriver gracefully, JaCoCo exec isn't dumped properly.
-        // Disable JaCoCo for this case.
-        !conf.getBoolean(TEST_STUCK_END_SESSION)) {
-      // For testing; propagate jacoco settings so that we also do coverage analysis
-      // on the launched driver. We replace the name of the main file ("main.exec")
-      // so that we don't end up fighting with the main test launcher.
-      String jacocoArgs = TestUtils.getJacocoArgs();
-      if (jacocoArgs != null) {
-        merge(conf, SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, jacocoArgs, " ");
-      }
-    }
-
-    final File confFile = writeConfToFile(conf);
-
-    if (ContextLauncher.mockSparkSubmit != null) {
-      LOG.warn("!!!! Using mock spark-submit. !!!!");
-      return new ChildProcess(conf, promise, ContextLauncher.mockSparkSubmit, confFile);
-    } else if (conf.getBoolean(CLIENT_IN_PROCESS)) {
-      // Mostly for testing things quickly. Do not do this in production.
-      LOG.warn("!!!! Running remote driver in-process. !!!!");
-      Runnable child = new Runnable() {
-        @Override
-        public void run() {
-          try {
-            RSCDriverBootstrapper.main(new String[] { confFile.getAbsolutePath() });
-          } catch (Exception e) {
-            throw Utils.propagate(e);
-          }
-        }
-      };
-      return new ChildProcess(conf, promise, child, confFile);
-    } else {
-      final SparkLauncher launcher = new SparkLauncher();
-
-      // Spark 1.x does not support specifying deploy mode in conf and needs special handling.
-      String deployMode = conf.get(SPARK_DEPLOY_MODE);
-      if (deployMode != null) {
-        launcher.setDeployMode(deployMode);
-      }
-
-      launcher.setSparkHome(System.getenv(SPARK_HOME_ENV));
-      launcher.setAppResource("spark-internal");
-      launcher.setPropertiesFile(confFile.getAbsolutePath());
-      launcher.setMainClass(RSCDriverBootstrapper.class.getName());
-
-      if (conf.get(PROXY_USER) != null) {
-        launcher.addSparkArg("--proxy-user", conf.get(PROXY_USER));
-      }
-
-      return new ChildProcess(conf, promise, launcher.launch(), confFile);
-    }
-  }
-
-  private static void merge(RSCConf conf, String key, String livyConf, String sep) {
-    String confValue = Utils.join(Arrays.asList(livyConf, conf.get(key)), sep);
-    conf.set(key, confValue);
-  }
-
-  /**
-   * Write the configuration to a file readable only by the process's owner. Livy properties
-   * are written with an added prefix so that they can be loaded using SparkConf on the driver
-   * side.
-   *
-   * The default Spark configuration (from either SPARK_HOME or SPARK_CONF_DIR) is merged into
-   * the user configuration, so that defaults set by Livy's admin take effect when not overridden
-   * by the user.
-   */
-  private static File writeConfToFile(RSCConf conf) throws IOException {
-    Properties confView = new Properties();
-    for (Map.Entry<String, String> e : conf) {
-      String key = e.getKey();
-      if (!key.startsWith(RSCConf.SPARK_CONF_PREFIX)) {
-        key = RSCConf.LIVY_SPARK_PREFIX + key;
-      }
-      confView.setProperty(key, e.getValue());
-    }
-
-    // Load the default Spark configuration.
-    String confDir = System.getenv("SPARK_CONF_DIR");
-    if (confDir == null && System.getenv(SPARK_HOME_ENV) != null) {
-      confDir = System.getenv(SPARK_HOME_ENV) + File.separator + "conf";
-    }
-
-    if (confDir != null) {
-      File sparkDefaults = new File(confDir + File.separator + "spark-defaults.conf");
-      if (sparkDefaults.isFile()) {
-        Properties sparkConf = new Properties();
-        Reader r = new InputStreamReader(new FileInputStream(sparkDefaults), UTF_8);
-        try {
-          sparkConf.load(r);
-        } finally {
-          r.close();
-        }
-
-        for (String key : sparkConf.stringPropertyNames()) {
-          if (!confView.containsKey(key)) {
-            confView.put(key, sparkConf.getProperty(key));
-          }
-        }
-      }
-    }
-
-    File file = File.createTempFile("livyConf", ".properties");
-    Files.setPosixFilePermissions(file.toPath(), EnumSet.of(OWNER_READ, OWNER_WRITE));
-    //file.deleteOnExit();
-
-    Writer writer = new OutputStreamWriter(new FileOutputStream(file), UTF_8);
-    try {
-      confView.store(writer, "Livy App Context Configuration");
-    } finally {
-      writer.close();
-    }
-
-    return file;
-  }
-
-
-  private class RegistrationHandler extends BaseProtocol
-    implements RpcServer.ClientCallback {
-
-    volatile RemoteDriverAddress driverAddress;
-
-    private Rpc client;
-
-    @Override
-    public RpcDispatcher onNewClient(Rpc client) {
-      LOG.debug("New RPC client connected from {}.", client.getChannel());
-      this.client = client;
-      return this;
-    }
-
-    @Override
-    public void onSaslComplete(Rpc client) {
-    }
-
-    void dispose() {
-      if (client != null) {
-        client.close();
-      }
-    }
-
-    private void handle(ChannelHandlerContext ctx, RemoteDriverAddress msg) {
-      ContextInfo info = new ContextInfo(msg.host, msg.port, clientId, secret);
-      if (promise.trySuccess(info)) {
-        timeout.cancel(true);
-        LOG.debug("Received driver info for client {}: {}/{}.", client.getChannel(),
-          msg.host, msg.port);
-      } else {
-        LOG.warn("Connection established but promise is already finalized.");
-      }
-
-      ctx.executor().submit(new Runnable() {
-        @Override
-        public void run() {
-          dispose();
-          ContextLauncher.this.dispose(false);
-        }
-      });
-    }
-
-  }
-
-  private static class ChildProcess {
-
-    private final RSCConf conf;
-    private final Promise<?> promise;
-    private final Process child;
-    private final Thread monitor;
-    private final File confFile;
-
-    public ChildProcess(RSCConf conf, Promise<?> promise, Runnable child, File confFile) {
-      this.conf = conf;
-      this.promise = promise;
-      this.monitor = monitor(child, CHILD_IDS.incrementAndGet());
-      this.child = null;
-      this.confFile = confFile;
-    }
-
-    public ChildProcess(RSCConf conf, Promise<?> promise, final Process childProc, File confFile) {
-      int childId = CHILD_IDS.incrementAndGet();
-      this.conf = conf;
-      this.promise = promise;
-      this.child = childProc;
-      this.confFile = confFile;
-
-      Runnable monitorTask = new Runnable() {
-        @Override
-        public void run() {
-          try {
-            int exitCode = child.waitFor();
-            if (exitCode != 0) {
-              LOG.warn("Child process exited with code {}.", exitCode);
-              fail(new IOException(String.format("Child process exited with code %d.", exitCode)));
-            }
-          } catch (InterruptedException ie) {
-            LOG.warn("Waiting thread interrupted, killing child process.");
-            Thread.interrupted();
-            child.destroy();
-          } catch (Exception e) {
-            LOG.warn("Exception while waiting for child process.", e);
-          }
-        }
-      };
-      this.monitor = monitor(monitorTask, childId);
-    }
-
-    private void fail(Throwable error) {
-      promise.tryFailure(error);
-    }
-
-    public void kill() {
-      if (child != null) {
-        child.destroy();
-      }
-      monitor.interrupt();
-      detach();
-
-      if (!monitor.isAlive()) {
-        return;
-      }
-
-      // Last ditch effort.
-      if (monitor.isAlive()) {
-        LOG.warn("Timed out shutting down remote driver, interrupting...");
-        monitor.interrupt();
-      }
-    }
-
-    public void detach() {
-      try {
-        monitor.join(conf.getTimeAsMs(CLIENT_SHUTDOWN_TIMEOUT));
-      } catch (InterruptedException ie) {
-        LOG.debug("Interrupted before driver thread was finished.");
-      }
-    }
-
-    private Thread monitor(final Runnable task, int childId) {
-      Runnable wrappedTask = new Runnable() {
-        @Override
-        public void run() {
-          try {
-            task.run();
-          } finally {
-            confFile.delete();
-          }
-        }
-      };
-      Thread thread = new Thread(wrappedTask);
-      thread.setDaemon(true);
-      thread.setName("ContextLauncher-" + childId);
-      thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
-        @Override
-        public void uncaughtException(Thread t, Throwable e) {
-          LOG.warn("Child task threw exception.", e);
-          fail(e);
-        }
-      });
-      thread.start();
-      return thread;
-    }
-  }
-
-  // Just for testing.
-  static Process mockSparkSubmit;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/DriverProcessInfo.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/DriverProcessInfo.java b/rsc/src/main/java/com/cloudera/livy/rsc/DriverProcessInfo.java
deleted file mode 100644
index a224fd6..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/DriverProcessInfo.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc;
-
-import io.netty.util.concurrent.Promise;
-
-/**
- * Information about driver process and @{@link ContextInfo}
- */
-public class DriverProcessInfo {
-
-  private Promise<ContextInfo> contextInfo;
-  private transient Process driverProcess;
-
-  public DriverProcessInfo(Promise<ContextInfo> contextInfo, Process driverProcess) {
-    this.contextInfo = contextInfo;
-    this.driverProcess = driverProcess;
-  }
-
-  public Promise<ContextInfo> getContextInfo() {
-    return contextInfo;
-  }
-
-  public Process getDriverProcess() {
-    return driverProcess;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/FutureListener.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/FutureListener.java b/rsc/src/main/java/com/cloudera/livy/rsc/FutureListener.java
deleted file mode 100644
index 2109c81..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/FutureListener.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc;
-
-/** A simplified future listener for netty futures. See Utils.addListener(). */
-public abstract class FutureListener<T> {
-
-  public void onSuccess(T result) throws Exception { }
-
-  public void onFailure(Throwable error) throws Exception { }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/JobHandleImpl.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/JobHandleImpl.java b/rsc/src/main/java/com/cloudera/livy/rsc/JobHandleImpl.java
deleted file mode 100644
index 9146425..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/JobHandleImpl.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc;
-
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import io.netty.util.concurrent.Promise;
-
-import com.cloudera.livy.JobHandle;
-import com.cloudera.livy.client.common.AbstractJobHandle;
-
-/**
- * A handle to a submitted job. Allows for monitoring and controlling of the running remote job.
- */
-class JobHandleImpl<T> extends AbstractJobHandle<T> {
-
-  private final RSCClient client;
-  private final String jobId;
-  private final Promise<T> promise;
-  private volatile State state;
-
-  JobHandleImpl(RSCClient client, Promise<T> promise, String jobId) {
-    this.client = client;
-    this.jobId = jobId;
-    this.promise = promise;
-  }
-
-  /** Requests a running job to be cancelled. */
-  @Override
-  public boolean cancel(boolean mayInterrupt) {
-    if (changeState(State.CANCELLED)) {
-      client.cancel(jobId);
-      promise.cancel(mayInterrupt);
-      return true;
-    }
-    return false;
-  }
-
-  @Override
-  public T get() throws ExecutionException, InterruptedException {
-    return promise.get();
-  }
-
-  @Override
-  public T get(long timeout, TimeUnit unit)
-      throws ExecutionException, InterruptedException, TimeoutException {
-    return promise.get(timeout, unit);
-  }
-
-  @Override
-  public boolean isCancelled() {
-    return promise.isCancelled();
-  }
-
-  @Override
-  public boolean isDone() {
-    return promise.isDone();
-  }
-
-  @Override
-  protected T result() {
-    return promise.getNow();
-  }
-
-  @Override
-  protected Throwable error() {
-    return promise.cause();
-  }
-
-  @SuppressWarnings("unchecked")
-  void setSuccess(Object result) {
-    // The synchronization here is not necessary, but tests depend on it.
-    synchronized (listeners) {
-      promise.setSuccess((T) result);
-      changeState(State.SUCCEEDED);
-    }
-  }
-
-  void setFailure(Throwable error) {
-    // The synchronization here is not necessary, but tests depend on it.
-    synchronized (listeners) {
-      promise.setFailure(error);
-      changeState(State.FAILED);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/PingJob.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/PingJob.java b/rsc/src/main/java/com/cloudera/livy/rsc/PingJob.java
deleted file mode 100644
index 088763a..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/PingJob.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobContext;
-
-/** A job that can be used to check for liveness of the remote context. */
-public class PingJob implements Job<Void> {
-
-  @Override
-  public Void call(JobContext jc) {
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java b/rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java
deleted file mode 100644
index 11cb0f6..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java
+++ /dev/null
@@ -1,409 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.ImmediateEventExecutor;
-import io.netty.util.concurrent.Promise;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobHandle;
-import com.cloudera.livy.LivyClient;
-import com.cloudera.livy.client.common.BufferUtils;
-import com.cloudera.livy.rsc.driver.AddFileJob;
-import com.cloudera.livy.rsc.driver.AddJarJob;
-import com.cloudera.livy.rsc.rpc.Rpc;
-import static com.cloudera.livy.rsc.RSCConf.Entry.*;
-
-public class RSCClient implements LivyClient {
-  private static final Logger LOG = LoggerFactory.getLogger(RSCClient.class);
-  private static final AtomicInteger EXECUTOR_GROUP_ID = new AtomicInteger();
-
-  private final RSCConf conf;
-  private final Promise<ContextInfo> contextInfoPromise;
-  private final Map<String, JobHandleImpl<?>> jobs;
-  private final ClientProtocol protocol;
-  private final Promise<Rpc> driverRpc;
-  private final int executorGroupId;
-  private final EventLoopGroup eventLoopGroup;
-  private final Promise<URI> serverUriPromise;
-
-  private ContextInfo contextInfo;
-  private Process driverProcess;
-  private volatile boolean isAlive;
-  private volatile String replState;
-
-  RSCClient(RSCConf conf, Promise<ContextInfo> ctx, Process driverProcess) throws IOException {
-    this.conf = conf;
-    this.contextInfoPromise = ctx;
-    this.driverProcess = driverProcess;
-    this.jobs = new ConcurrentHashMap<>();
-    this.protocol = new ClientProtocol();
-    this.driverRpc = ImmediateEventExecutor.INSTANCE.newPromise();
-    this.executorGroupId = EXECUTOR_GROUP_ID.incrementAndGet();
-    this.eventLoopGroup = new NioEventLoopGroup(
-        conf.getInt(RPC_MAX_THREADS),
-        Utils.newDaemonThreadFactory("RSCClient-" + executorGroupId + "-%d"));
-    this.serverUriPromise = ImmediateEventExecutor.INSTANCE.newPromise();
-
-    Utils.addListener(this.contextInfoPromise, new FutureListener<ContextInfo>() {
-      @Override
-      public void onSuccess(ContextInfo info) throws Exception {
-        connectToContext(info);
-        String url = String.format("rsc://%s:%s@%s:%d",
-          info.clientId, info.secret, info.remoteAddress, info.remotePort);
-        serverUriPromise.setSuccess(URI.create(url));
-      }
-
-      @Override
-      public void onFailure(Throwable error) {
-        connectionError(error);
-        serverUriPromise.setFailure(error);
-      }
-    });
-
-    isAlive = true;
-  }
-
-  public boolean isAlive() {
-    return isAlive;
-  }
-
-  public Process getDriverProcess() {
-    return driverProcess;
-  }
-
-  private synchronized void connectToContext(final ContextInfo info) throws Exception {
-    this.contextInfo = info;
-
-    try {
-      Promise<Rpc> promise = Rpc.createClient(conf,
-        eventLoopGroup,
-        info.remoteAddress,
-        info.remotePort,
-        info.clientId,
-        info.secret,
-        protocol);
-      Utils.addListener(promise, new FutureListener<Rpc>() {
-        @Override
-        public void onSuccess(Rpc rpc) throws Exception {
-          driverRpc.setSuccess(rpc);
-          Utils.addListener(rpc.getChannel().closeFuture(), new FutureListener<Void>() {
-            @Override
-            public void onSuccess(Void unused) {
-              if (isAlive) {
-                LOG.warn("Client RPC channel closed unexpectedly.");
-                try {
-                  stop(false);
-                } catch (Exception e) { /* stop() itself prints warning. */ }
-              }
-            }
-          });
-          LOG.debug("Connected to context {} ({}, {}).", info.clientId,
-            rpc.getChannel(), executorGroupId);
-        }
-
-        @Override
-        public void onFailure(Throwable error) throws Exception {
-          driverRpc.setFailure(error);
-          connectionError(error);
-        }
-      });
-    } catch (Exception e) {
-      connectionError(e);
-    }
-  }
-
-  private void connectionError(Throwable error) {
-    LOG.error("Failed to connect to context.", error);
-    try {
-      stop(false);
-    } catch (Exception e) { /* stop() itself prints warning. */ }
-  }
-
-  private <T> io.netty.util.concurrent.Future<T> deferredCall(final Object msg,
-      final Class<T> retType) {
-    if (driverRpc.isSuccess()) {
-      try {
-        return driverRpc.get().call(msg, retType);
-      } catch (Exception ie) {
-        throw Utils.propagate(ie);
-      }
-    }
-
-    // No driver RPC yet, so install a listener and return a promise that will be ready when
-    // the driver is up and the message is actually delivered.
-    final Promise<T> promise = eventLoopGroup.next().newPromise();
-    final FutureListener<T> callListener = new FutureListener<T>() {
-      @Override
-      public void onSuccess(T value) throws Exception {
-        promise.setSuccess(value);
-      }
-
-      @Override
-      public void onFailure(Throwable error) throws Exception {
-        promise.setFailure(error);
-      }
-    };
-
-    Utils.addListener(driverRpc, new FutureListener<Rpc>() {
-      @Override
-      public void onSuccess(Rpc rpc) throws Exception {
-        Utils.addListener(rpc.call(msg, retType), callListener);
-      }
-
-      @Override
-      public void onFailure(Throwable error) throws Exception {
-        promise.setFailure(error);
-      }
-    });
-    return promise;
-  }
-
-  public Future<URI> getServerUri() {
-    return serverUriPromise;
-  }
-
-  @Override
-  public <T> JobHandle<T> submit(Job<T> job) {
-    return protocol.submit(job);
-  }
-
-  @Override
-  public <T> Future<T> run(Job<T> job) {
-    return protocol.run(job);
-  }
-
-  @Override
-  public synchronized void stop(boolean shutdownContext) {
-    if (isAlive) {
-      isAlive = false;
-      try {
-        this.contextInfoPromise.cancel(true);
-
-        if (shutdownContext && driverRpc.isSuccess()) {
-          protocol.endSession();
-
-          // Because the remote context won't really reply to the end session message -
-          // since it closes the channel while handling it, we wait for the RPC's channel
-          // to close instead.
-          long stopTimeout = conf.getTimeAsMs(CLIENT_SHUTDOWN_TIMEOUT);
-          driverRpc.get().getChannel().closeFuture().get(stopTimeout,
-            TimeUnit.MILLISECONDS);
-        }
-      } catch (Exception e) {
-        LOG.warn("Exception while waiting for end session reply.", e);
-        Utils.propagate(e);
-      } finally {
-        if (driverRpc.isSuccess()) {
-          try {
-            driverRpc.get().close();
-          } catch (Exception e) {
-            LOG.warn("Error stopping RPC.", e);
-          }
-        }
-
-        // Report failure for all pending jobs, so that clients can react.
-        for (Map.Entry<String, JobHandleImpl<?>> e : jobs.entrySet()) {
-          LOG.info("Failing pending job {} due to shutdown.", e.getKey());
-          e.getValue().setFailure(new IOException("RSCClient instance stopped."));
-        }
-
-        eventLoopGroup.shutdownGracefully();
-      }
-      if (contextInfo != null) {
-        LOG.debug("Disconnected from context {}, shutdown = {}.", contextInfo.clientId,
-          shutdownContext);
-      }
-    }
-  }
-
-  @Override
-  public Future<?> uploadJar(File jar) {
-    throw new UnsupportedOperationException("Use addJar to add the jar to the remote context!");
-  }
-
-  @Override
-  public Future<?> addJar(URI uri) {
-    return submit(new AddJarJob(uri.toString()));
-  }
-
-  @Override
-  public Future<?> uploadFile(File file) {
-    throw new UnsupportedOperationException("Use addFile to add the file to the remote context!");
-  }
-
-  @Override
-  public Future<?> addFile(URI uri) {
-    return submit(new AddFileJob(uri.toString()));
-  }
-
-  public String bypass(ByteBuffer serializedJob, boolean sync) {
-    return protocol.bypass(serializedJob, sync);
-  }
-
-  public Future<BypassJobStatus> getBypassJobStatus(String id) {
-    return protocol.getBypassJobStatus(id);
-  }
-
-  public void cancel(String jobId) {
-    protocol.cancel(jobId);
-  }
-
-  ContextInfo getContextInfo() {
-    return contextInfo;
-  }
-
-  public Future<Integer> submitReplCode(String code) throws Exception {
-    return deferredCall(new BaseProtocol.ReplJobRequest(code), Integer.class);
-  }
-
-  public void cancelReplCode(int statementId) throws Exception {
-    deferredCall(new BaseProtocol.CancelReplJobRequest(statementId), Void.class);
-  }
-
-  public Future<ReplJobResults> getReplJobResults(Integer from, Integer size) throws Exception {
-    return deferredCall(new BaseProtocol.GetReplJobResults(from, size), ReplJobResults.class);
-  }
-
-  public Future<ReplJobResults> getReplJobResults() throws Exception {
-    return deferredCall(new BaseProtocol.GetReplJobResults(), ReplJobResults.class);
-  }
-
-  /**
-   * @return Return the repl state. If this's not connected to a repl session, it will return null.
-   */
-  public String getReplState() {
-    return replState;
-  }
-
-  private class ClientProtocol extends BaseProtocol {
-
-    <T> JobHandleImpl<T> submit(Job<T> job) {
-      final String jobId = UUID.randomUUID().toString();
-      Object msg = new JobRequest<T>(jobId, job);
-
-      final Promise<T> promise = eventLoopGroup.next().newPromise();
-      final JobHandleImpl<T> handle = new JobHandleImpl<T>(RSCClient.this,
-        promise, jobId);
-      jobs.put(jobId, handle);
-
-      final io.netty.util.concurrent.Future<Void> rpc = deferredCall(msg, Void.class);
-      LOG.debug("Sending JobRequest[{}].", jobId);
-
-      Utils.addListener(rpc, new FutureListener<Void>() {
-        @Override
-        public void onSuccess(Void unused) throws Exception {
-          handle.changeState(JobHandle.State.QUEUED);
-        }
-
-        @Override
-        public void onFailure(Throwable error) throws Exception {
-          error.printStackTrace();
-          promise.tryFailure(error);
-        }
-      });
-      promise.addListener(new GenericFutureListener<Promise<T>>() {
-        @Override
-        public void operationComplete(Promise<T> p) {
-          if (jobId != null) {
-            jobs.remove(jobId);
-          }
-          if (p.isCancelled() && !rpc.isDone()) {
-            rpc.cancel(true);
-          }
-        }
-      });
-      return handle;
-    }
-
-    @SuppressWarnings("unchecked")
-    <T> Future<T> run(Job<T> job) {
-      return (Future<T>) deferredCall(new SyncJobRequest(job), Object.class);
-    }
-
-    String bypass(ByteBuffer serializedJob, boolean sync) {
-      String jobId = UUID.randomUUID().toString();
-      Object msg = new BypassJobRequest(jobId, BufferUtils.toByteArray(serializedJob), sync);
-      deferredCall(msg, Void.class);
-      return jobId;
-    }
-
-    Future<BypassJobStatus> getBypassJobStatus(String id) {
-      return deferredCall(new GetBypassJobStatus(id), BypassJobStatus.class);
-    }
-
-    void cancel(String jobId) {
-      deferredCall(new CancelJob(jobId), Void.class);
-    }
-
-    Future<?> endSession() {
-      return deferredCall(new EndSession(), Void.class);
-    }
-
-    private void handle(ChannelHandlerContext ctx, InitializationError msg) {
-      LOG.warn("Error reported from remote driver: %s", msg.stackTrace);
-    }
-
-    private void handle(ChannelHandlerContext ctx, JobResult msg) {
-      JobHandleImpl<?> handle = jobs.remove(msg.id);
-      if (handle != null) {
-        LOG.info("Received result for {}", msg.id);
-        // TODO: need a better exception for this.
-        Throwable error = msg.error != null ? new RuntimeException(msg.error) : null;
-        if (error == null) {
-          handle.setSuccess(msg.result);
-        } else {
-          handle.setFailure(error);
-        }
-      } else {
-        LOG.warn("Received result for unknown job {}", msg.id);
-      }
-    }
-
-    private void handle(ChannelHandlerContext ctx, JobStarted msg) {
-      JobHandleImpl<?> handle = jobs.get(msg.id);
-      if (handle != null) {
-        handle.changeState(JobHandle.State.STARTED);
-      } else {
-        LOG.warn("Received event for unknown job {}", msg.id);
-      }
-    }
-
-    private void handle(ChannelHandlerContext ctx, ReplState msg) {
-      LOG.trace("Received repl state for {}", msg.state);
-      replState = msg.state;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/RSCClientFactory.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/RSCClientFactory.java b/rsc/src/main/java/com/cloudera/livy/rsc/RSCClientFactory.java
deleted file mode 100644
index 27540bb..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/RSCClientFactory.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import io.netty.util.concurrent.ImmediateEventExecutor;
-import io.netty.util.concurrent.Promise;
-
-import com.cloudera.livy.LivyClient;
-import com.cloudera.livy.LivyClientFactory;
-import com.cloudera.livy.rsc.rpc.RpcServer;
-
-/**
- * Factory for RSC clients.
- */
-public final class RSCClientFactory implements LivyClientFactory {
-
-  private final AtomicInteger refCount = new AtomicInteger();
-  private RpcServer server = null;
-
-  /**
-   * Creates a local Livy client if the URI has the "rsc" scheme.
-   * <p>
-   * If the URI contains user information, host and port, the library will try to connect to an
-   * existing RSC instance with the provided information, and most of the provided configuration
-   * will be ignored.
-   * <p>
-   * Otherwise, a new Spark context will be started with the given configuration.
-   */
-  @Override
-  public LivyClient createClient(URI uri, Properties config) {
-    if (!"rsc".equals(uri.getScheme())) {
-      return null;
-    }
-
-    RSCConf lconf = new RSCConf(config);
-
-    boolean needsServer = false;
-    try {
-      Promise<ContextInfo> info;
-      Process driverProcess = null;
-      if (uri.getUserInfo() != null && uri.getHost() != null && uri.getPort() > 0) {
-        info = createContextInfo(uri);
-      } else {
-        needsServer = true;
-        ref(lconf);
-        DriverProcessInfo processInfo = ContextLauncher.create(this, lconf);
-        info = processInfo.getContextInfo();
-        driverProcess = processInfo.getDriverProcess();
-      }
-      return new RSCClient(lconf, info, driverProcess);
-    } catch (Exception e) {
-      if (needsServer) {
-        unref();
-      }
-      throw Utils.propagate(e);
-    }
-  }
-
-  RpcServer getServer() {
-    return server;
-  }
-
-  private synchronized void ref(RSCConf config) throws IOException {
-    if (refCount.get() != 0) {
-      refCount.incrementAndGet();
-      return;
-    }
-
-    Utils.checkState(server == null, "Server already running but ref count is 0.");
-    if (server == null) {
-      try {
-        server = new RpcServer(config);
-      } catch (InterruptedException ie) {
-        throw Utils.propagate(ie);
-      }
-    }
-
-    refCount.incrementAndGet();
-  }
-
-  synchronized void unref() {
-    if (refCount.decrementAndGet() == 0) {
-      server.close();
-      server = null;
-    }
-  }
-
-  private static Promise<ContextInfo> createContextInfo(final URI uri) {
-    String[] userInfo = uri.getUserInfo().split(":", 2);
-    ImmediateEventExecutor executor = ImmediateEventExecutor.INSTANCE;
-    Promise<ContextInfo> promise = executor.newPromise();
-    promise.setSuccess(new ContextInfo(uri.getHost(), uri.getPort(), userInfo[0], userInfo[1]));
-    return promise;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java b/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java
deleted file mode 100644
index afd935d..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc;
-
-import java.io.IOException;
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import javax.security.sasl.Sasl;
-
-import com.cloudera.livy.client.common.ClientConf;
-
-public class RSCConf extends ClientConf<RSCConf> {
-
-  public static final String SPARK_CONF_PREFIX = "spark.";
-  public static final String LIVY_SPARK_PREFIX = SPARK_CONF_PREFIX + "__livy__.";
-
-  private static final String RSC_CONF_PREFIX = "livy.rsc.";
-
-  public static enum Entry implements ConfEntry {
-    CLIENT_ID("client.auth.id", null),
-    CLIENT_SECRET("client.auth.secret", null),
-    CLIENT_IN_PROCESS("client.do-not-use.run-driver-in-process", false),
-    CLIENT_SHUTDOWN_TIMEOUT("client.shutdown-timeout", "10s"),
-    DRIVER_CLASS("driver-class", null),
-    SESSION_KIND("session.kind", null),
-
-    LIVY_JARS("jars", null),
-    SPARKR_PACKAGE("sparkr.package", null),
-    PYSPARK_ARCHIVES("pyspark.archives", null),
-
-    // Address for the RSC driver to connect back with it's connection info.
-    LAUNCHER_ADDRESS("launcher.address", null),
-    LAUNCHER_PORT_RANGE("launcher.port.range", "10000~10010"),
-    // Setting up of this propety by user has no benefit. It is currently being used
-    // to pass  port information from ContextLauncher to RSCDriver
-    LAUNCHER_PORT("launcher.port", -1),
-    // How long will the RSC wait for a connection for a Livy server before shutting itself down.
-    SERVER_IDLE_TIMEOUT("server.idle-timeout", "10m"),
-
-    PROXY_USER("proxy-user", null),
-
-    RPC_SERVER_ADDRESS("rpc.server.address", null),
-    RPC_CLIENT_HANDSHAKE_TIMEOUT("server.connect.timeout", "90s"),
-    RPC_CLIENT_CONNECT_TIMEOUT("client.connect.timeout", "10s"),
-    RPC_CHANNEL_LOG_LEVEL("channel.log.level", null),
-    RPC_MAX_MESSAGE_SIZE("rpc.max.size", 50 * 1024 * 1024),
-    RPC_MAX_THREADS("rpc.threads", 8),
-    RPC_SECRET_RANDOM_BITS("secret.bits", 256),
-
-    SASL_MECHANISMS("rpc.sasl.mechanisms", "DIGEST-MD5"),
-    SASL_QOP("rpc.sasl.qop", null),
-
-    TEST_STUCK_END_SESSION("test.do-not-use.stuck-end-session", false),
-    TEST_STUCK_START_DRIVER("test.do-not-use.stuck-start-driver", false),
-
-    JOB_CANCEL_TRIGGER_INTERVAL("job-cancel.trigger-interval", "100ms"),
-    JOB_CANCEL_TIMEOUT("job-cancel.timeout", "30s"),
-
-    RETAINED_STATEMENT_NUMBER("retained-statements", 100);
-
-    private final String key;
-    private final Object dflt;
-
-    private Entry(String key, Object dflt) {
-      this.key = RSC_CONF_PREFIX + key;
-      this.dflt = dflt;
-    }
-
-    @Override
-    public String key() { return key; }
-
-    @Override
-    public Object dflt() { return dflt; }
-  }
-
-  public RSCConf() {
-    this(new Properties());
-  }
-
-  public RSCConf(Properties config) {
-    super(config);
-  }
-
-  public Map<String, String> getSaslOptions() {
-    Map<String, String> opts = new HashMap<>();
-
-    // TODO: add more options?
-    String qop = get(Entry.SASL_QOP);
-    if (qop != null) {
-      opts.put(Sasl.QOP, qop);
-    }
-
-    return opts;
-  }
-
-  public String findLocalAddress() throws IOException {
-    InetAddress address = InetAddress.getLocalHost();
-    if (address.isLoopbackAddress()) {
-      // Address resolves to something like 127.0.1.1, which happens on Debian;
-      // try to find a better address using the local network interfaces
-      Enumeration<NetworkInterface> ifaces = NetworkInterface.getNetworkInterfaces();
-      while (ifaces.hasMoreElements()) {
-        NetworkInterface ni = ifaces.nextElement();
-        Enumeration<InetAddress> addrs = ni.getInetAddresses();
-        while (addrs.hasMoreElements()) {
-          InetAddress addr = addrs.nextElement();
-          if (!addr.isLinkLocalAddress() && !addr.isLoopbackAddress()
-              && addr instanceof Inet4Address) {
-            // We've found an address that looks reasonable!
-            LOG.warn("Your hostname, {}, resolves to a loopback address; using {} "
-                + " instead (on interface {})", address.getHostName(), addr.getHostAddress(),
-                ni.getName());
-            LOG.warn("Set '{}' if you need to bind to another address.",
-              Entry.RPC_SERVER_ADDRESS.key);
-            return addr.getHostAddress();
-          }
-        }
-      }
-    }
-
-    LOG.warn("Your hostname, {}, resolves to a loopback address, but we couldn't find "
-        + "any external IP address!", address.getCanonicalHostName());
-    LOG.warn("Set {} if you need to bind to another address.",
-      Entry.RPC_SERVER_ADDRESS.key);
-    return address.getCanonicalHostName();
-  }
-
-  private static final Map<String, DeprecatedConf> configsWithAlternatives
-    = Collections.unmodifiableMap(new HashMap<String, DeprecatedConf>() {{
-      put(RSCConf.Entry.CLIENT_IN_PROCESS.key, DepConf.CLIENT_IN_PROCESS);
-      put(RSCConf.Entry.CLIENT_SHUTDOWN_TIMEOUT.key, DepConf.CLIENT_SHUTDOWN_TIMEOUT);
-      put(RSCConf.Entry.DRIVER_CLASS.key, DepConf.DRIVER_CLASS);
-      put(RSCConf.Entry.SERVER_IDLE_TIMEOUT.key, DepConf.SERVER_IDLE_TIMEOUT);
-      put(RSCConf.Entry.PROXY_USER.key, DepConf.PROXY_USER);
-      put(RSCConf.Entry.TEST_STUCK_END_SESSION.key, DepConf.TEST_STUCK_END_SESSION);
-      put(RSCConf.Entry.TEST_STUCK_START_DRIVER.key, DepConf.TEST_STUCK_START_DRIVER);
-      put(RSCConf.Entry.JOB_CANCEL_TRIGGER_INTERVAL.key, DepConf.JOB_CANCEL_TRIGGER_INTERVAL);
-      put(RSCConf.Entry.JOB_CANCEL_TIMEOUT.key, DepConf.JOB_CANCEL_TIMEOUT);
-      put(RSCConf.Entry.RETAINED_STATEMENT_NUMBER.key, DepConf.RETAINED_STATEMENT_NUMBER);
-  }});
-
-  // Maps deprecated key to DeprecatedConf with the same key.
-  // There are no deprecated configs without alternatives currently.
-  private static final Map<String, DeprecatedConf> deprecatedConfigs
-    = Collections.unmodifiableMap(new HashMap<String, DeprecatedConf>());
-
-  protected Map<String, DeprecatedConf> getConfigsWithAlternatives() {
-    return configsWithAlternatives;
-  }
-
-  protected Map<String, DeprecatedConf> getDeprecatedConfigs() {
-    return deprecatedConfigs;
-  }
-
-  static enum DepConf implements DeprecatedConf {
-    CLIENT_IN_PROCESS("client.do_not_use.run_driver_in_process", "0.4"),
-    CLIENT_SHUTDOWN_TIMEOUT("client.shutdown_timeout", "0.4"),
-    DRIVER_CLASS("driver_class", "0.4"),
-    SERVER_IDLE_TIMEOUT("server.idle_timeout", "0.4"),
-    PROXY_USER("proxy_user", "0.4"),
-    TEST_STUCK_END_SESSION("test.do_not_use.stuck_end_session", "0.4"),
-    TEST_STUCK_START_DRIVER("test.do_not_use.stuck_start_driver", "0.4"),
-    JOB_CANCEL_TRIGGER_INTERVAL("job_cancel.trigger_interval", "0.4"),
-    JOB_CANCEL_TIMEOUT("job_cancel.timeout", "0.4"),
-    RETAINED_STATEMENT_NUMBER("retained_statements", "0.4");
-
-    private final String key;
-    private final String version;
-    private final String deprecationMessage;
-
-    private DepConf(String key, String version) {
-      this(key, version, "");
-    }
-
-    private DepConf(String key, String version, String deprecationMessage) {
-      this.key = RSC_CONF_PREFIX + key;
-      this.version = version;
-      this.deprecationMessage = deprecationMessage;
-    }
-
-    @Override
-    public String key() { return key; }
-
-    @Override
-    public String version() { return version; }
-
-    @Override
-    public String deprecationMessage() { return deprecationMessage; }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/ReplJobResults.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/ReplJobResults.java b/rsc/src/main/java/com/cloudera/livy/rsc/ReplJobResults.java
deleted file mode 100644
index deef162..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/ReplJobResults.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.cloudera.livy.rsc;
-
-import com.cloudera.livy.rsc.driver.Statement;
-
-public class ReplJobResults {
-  public final Statement[] statements;
-
-  public ReplJobResults(Statement[] statements) {
-    this.statements = statements;
-  }
-
-  public ReplJobResults() {
-    this(null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/Utils.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/Utils.java b/rsc/src/main/java/com/cloudera/livy/rsc/Utils.java
deleted file mode 100644
index 993ea1e..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/Utils.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc;
-
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-
-/**
- * A few simple utility functions used by the code, mostly to avoid a direct dependency
- * on Guava.
- */
-public class Utils {
-
-  public static void checkArgument(boolean condition) {
-    if (!condition) {
-      throw new IllegalArgumentException();
-    }
-  }
-
-  public static void checkArgument(boolean condition, String msg, Object... args) {
-    if (!condition) {
-      throw new IllegalArgumentException(String.format(msg, args));
-    }
-  }
-
-  public static void checkState(boolean condition, String msg, Object... args) {
-    if (!condition) {
-      throw new IllegalStateException(String.format(msg, args));
-    }
-  }
-
-  public static void checkNotNull(Object o) {
-    if (o == null) {
-      throw new NullPointerException();
-    }
-  }
-
-  public static RuntimeException propagate(Throwable t) {
-    if (t instanceof RuntimeException) {
-      throw (RuntimeException) t;
-    } else {
-      throw new RuntimeException(t);
-    }
-  }
-
-  public static ThreadFactory newDaemonThreadFactory(final String nameFormat) {
-    return new ThreadFactory() {
-
-      private final AtomicInteger threadId = new AtomicInteger();
-
-      @Override
-      public Thread newThread(Runnable r) {
-        Thread t = new Thread(r);
-        t.setName(String.format(nameFormat, threadId.incrementAndGet()));
-        t.setDaemon(true);
-        return t;
-      }
-
-    };
-  }
-
-  public static String join(Iterable<String> strs, String sep) {
-    StringBuilder sb = new StringBuilder();
-    for (String s : strs) {
-      if (s != null && !s.isEmpty()) {
-        sb.append(s).append(sep);
-      }
-    }
-    if (sb.length() > 0) {
-      sb.setLength(sb.length() - sep.length());
-    }
-    return sb.toString();
-  }
-
-  public static String stackTraceAsString(Throwable t) {
-    StringBuilder sb = new StringBuilder();
-    sb.append(t.getClass().getName()).append(": ").append(t.getMessage());
-    for (StackTraceElement e : t.getStackTrace()) {
-      sb.append("\n");
-      sb.append(e.toString());
-    }
-    return sb.toString();
-  }
-
-  public static <T> void addListener(Future<T> future, final FutureListener<T> lsnr) {
-    future.addListener(new GenericFutureListener<Future<T>>() {
-      @Override
-      public void operationComplete(Future<T> f) throws Exception {
-        if (f.isSuccess()) {
-          lsnr.onSuccess(f.get());
-        } else {
-          lsnr.onFailure(f.cause());
-        }
-      }
-    });
-  }
-
-  private Utils() { }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/driver/AddFileJob.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/driver/AddFileJob.java b/rsc/src/main/java/com/cloudera/livy/rsc/driver/AddFileJob.java
deleted file mode 100644
index 82b0a69..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/driver/AddFileJob.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.cloudera.livy.rsc.driver;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobContext;
-
-public class AddFileJob implements Job<Object> {
-
-  private final String path;
-
-  AddFileJob() {
-    this(null);
-}
-
-  public AddFileJob(String path) {
-    this.path = path;
-}
-
-  @Override
-  public Object call(JobContext jc) throws Exception {
-    JobContextImpl jobContextImpl = (JobContextImpl)jc;
-    jobContextImpl.addFile(path);
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/driver/AddJarJob.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/driver/AddJarJob.java b/rsc/src/main/java/com/cloudera/livy/rsc/driver/AddJarJob.java
deleted file mode 100644
index d23f3b8..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/driver/AddJarJob.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc.driver;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobContext;
-
-public class AddJarJob implements Job<Object> {
-
-  private final String path;
-
-  // For serialization.
-  private AddJarJob() {
-    this(null);
-  }
-
-  public AddJarJob(String path) {
-    this.path = path;
-  }
-
-  @Override
-  public Object call(JobContext jc) throws Exception {
-    JobContextImpl jobContextImpl = (JobContextImpl)jc;
-    jobContextImpl.addJarOrPyFile(path);
-    return null;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/driver/BypassJob.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/driver/BypassJob.java b/rsc/src/main/java/com/cloudera/livy/rsc/driver/BypassJob.java
deleted file mode 100644
index 8ca2c8e..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/driver/BypassJob.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc.driver;
-
-import java.nio.ByteBuffer;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobContext;
-import com.cloudera.livy.client.common.BufferUtils;
-import com.cloudera.livy.client.common.Serializer;
-
-class BypassJob implements Job<byte[]> {
-
-  private final Serializer serializer;
-  private final byte[] serializedJob;
-
-  BypassJob(Serializer serializer, byte[] serializedJob) {
-    this.serializer = serializer;
-    this.serializedJob = serializedJob;
-  }
-
-  @Override
-  public byte[] call(JobContext jc) throws Exception {
-    Job<?> job = (Job<?>) serializer.deserialize(ByteBuffer.wrap(serializedJob));
-    Object result = job.call(jc);
-    byte[] serializedResult;
-    if (result != null) {
-      ByteBuffer data = serializer.serialize(result);
-      serializedResult = BufferUtils.toByteArray(data);
-    } else {
-      serializedResult = null;
-    }
-    return serializedResult;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/com/cloudera/livy/rsc/driver/BypassJobWrapper.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/driver/BypassJobWrapper.java b/rsc/src/main/java/com/cloudera/livy/rsc/driver/BypassJobWrapper.java
deleted file mode 100644
index b8951af..0000000
--- a/rsc/src/main/java/com/cloudera/livy/rsc/driver/BypassJobWrapper.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc.driver;
-
-import java.util.List;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobHandle;
-import com.cloudera.livy.rsc.BypassJobStatus;
-import com.cloudera.livy.rsc.Utils;
-
-public class BypassJobWrapper extends JobWrapper<byte[]> {
-
-  private volatile byte[] result;
-  private volatile Throwable error;
-  private volatile JobHandle.State state;
-  private volatile List<Integer> newSparkJobs;
-
-  public BypassJobWrapper(RSCDriver driver, String jobId, Job<byte[]> serializedJob) {
-    super(driver, jobId, serializedJob);
-    state = JobHandle.State.QUEUED;
-  }
-
-  @Override
-  public Void call() throws Exception {
-    state = JobHandle.State.STARTED;
-    return super.call();
-  }
-
-  @Override
-  protected synchronized void finished(byte[] result, Throwable error) {
-    if (error == null) {
-      this.result = result;
-      this.state = JobHandle.State.SUCCEEDED;
-    } else {
-      this.error = error;
-      this.state = JobHandle.State.FAILED;
-    }
-  }
-
-  @Override
-  boolean cancel() {
-    if (super.cancel()) {
-      this.state = JobHandle.State.CANCELLED;
-      return true;
-    }
-    return false;
-  }
-
-  @Override
-  protected void jobStarted() {
-    // Do nothing; just avoid sending data back to the driver.
-  }
-
-  synchronized BypassJobStatus getStatus() {
-    String stackTrace = error != null ? Utils.stackTraceAsString(error) : null;
-    return new BypassJobStatus(state, result, stackTrace);
-  }
-
-}