You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/07/05 03:06:49 UTC

[20/33] incubator-livy git commit: LIVY-375. Change Livy code package name to org.apache.livy

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/test/java/com/cloudera/livy/rsc/TestSparkClient.java
----------------------------------------------------------------------
diff --git a/rsc/src/test/java/com/cloudera/livy/rsc/TestSparkClient.java b/rsc/src/test/java/com/cloudera/livy/rsc/TestSparkClient.java
deleted file mode 100644
index 973d012..0000000
--- a/rsc/src/test/java/com/cloudera/livy/rsc/TestSparkClient.java
+++ /dev/null
@@ -1,533 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.jar.JarOutputStream;
-import java.util.zip.ZipEntry;
-
-import org.apache.spark.launcher.SparkLauncher;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobContext;
-import com.cloudera.livy.JobHandle;
-import com.cloudera.livy.LivyClient;
-import com.cloudera.livy.LivyClientBuilder;
-import com.cloudera.livy.client.common.Serializer;
-import com.cloudera.livy.rsc.rpc.RpcException;
-import com.cloudera.livy.test.jobs.Echo;
-import com.cloudera.livy.test.jobs.Failure;
-import com.cloudera.livy.test.jobs.FileReader;
-import com.cloudera.livy.test.jobs.GetCurrentUser;
-import com.cloudera.livy.test.jobs.SQLGetTweets;
-import com.cloudera.livy.test.jobs.Sleeper;
-import com.cloudera.livy.test.jobs.SmallCount;
-import static com.cloudera.livy.rsc.RSCConf.Entry.*;
-
-public class TestSparkClient {
-
-  private static final Logger LOG = LoggerFactory.getLogger(TestSparkClient.class);
-
-  // Timeouts are bad... mmmkay.
-  private static final long TIMEOUT = 100;
-
-  private Properties createConf(boolean local) {
-    Properties conf = new Properties();
-    if (local) {
-      conf.put(CLIENT_IN_PROCESS.key(), "true");
-      conf.put(SparkLauncher.SPARK_MASTER, "local");
-      conf.put("spark.app.name", "SparkClientSuite Local App");
-    } else {
-      String classpath = System.getProperty("java.class.path");
-      conf.put("spark.app.name", "SparkClientSuite Remote App");
-      conf.put(SparkLauncher.DRIVER_MEMORY, "512m");
-      conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, classpath);
-      conf.put(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH, classpath);
-    }
-
-    conf.put(LIVY_JARS.key(), "");
-    return conf;
-  }
-
-  @Test
-  public void testJobSubmission() throws Exception {
-    runTest(true, new TestFunction() {
-      @Override
-      public void call(LivyClient client) throws Exception {
-        JobHandle.Listener<String> listener = newListener();
-        JobHandle<String> handle = client.submit(new Echo<>("hello"));
-        handle.addListener(listener);
-        assertEquals("hello", handle.get(TIMEOUT, TimeUnit.SECONDS));
-
-        // Try an invalid state transition on the handle. This ensures that the actual state
-        // change we're interested in actually happened, since internally the handle serializes
-        // state changes.
-        assertFalse(((JobHandleImpl<String>)handle).changeState(JobHandle.State.SENT));
-
-        verify(listener).onJobStarted(handle);
-        verify(listener).onJobSucceeded(same(handle), eq(handle.get()));
-      }
-    });
-  }
-
-  @Test
-  public void testSimpleSparkJob() throws Exception {
-    runTest(true, new TestFunction() {
-      @Override
-      public void call(LivyClient client) throws Exception {
-        JobHandle<Long> handle = client.submit(new SmallCount(5));
-        assertEquals(Long.valueOf(5L), handle.get(TIMEOUT, TimeUnit.SECONDS));
-      }
-    });
-  }
-
-  @Test
-  public void testJobFailure() throws Exception {
-    runTest(true, new TestFunction() {
-      @Override
-      public void call(LivyClient client) throws Exception {
-        JobHandle.Listener<Void> listener = newListener();
-        JobHandle<Void> handle = client.submit(new Failure());
-        handle.addListener(listener);
-        try {
-          handle.get(TIMEOUT, TimeUnit.SECONDS);
-          fail("Should have thrown an exception.");
-        } catch (ExecutionException ee) {
-          assertTrue(ee.getCause().getMessage().contains(
-            Failure.JobFailureException.class.getName()));
-        }
-
-        // Try an invalid state transition on the handle. This ensures that the actual state
-        // change we're interested in actually happened, since internally the handle serializes
-        // state changes.
-        assertFalse(((JobHandleImpl<Void>)handle).changeState(JobHandle.State.SENT));
-
-        verify(listener).onJobStarted(handle);
-        verify(listener).onJobFailed(same(handle), any(Throwable.class));
-      }
-    });
-  }
-
-  @Test
-  public void testSyncRpc() throws Exception {
-    runTest(true, new TestFunction() {
-      @Override
-      public void call(LivyClient client) throws Exception {
-        Future<String> result = client.run(new Echo<>("Hello"));
-        assertEquals("Hello", result.get(TIMEOUT, TimeUnit.SECONDS));
-      }
-    });
-  }
-
-  @Test
-  public void testRemoteClient() throws Exception {
-    runTest(false, new TestFunction() {
-      @Override
-      public void call(LivyClient client) throws Exception {
-        JobHandle<Long> handle = client.submit(new SmallCount(5));
-        assertEquals(Long.valueOf(5L), handle.get(TIMEOUT, TimeUnit.SECONDS));
-      }
-    });
-  }
-
-  @Test
-  public void testAddJarsAndFiles() throws Exception {
-    runTest(true, new TestFunction() {
-      @Override
-      public void call(LivyClient client) throws Exception {
-        File jar = null;
-        File file = null;
-
-        try {
-          // Test that adding a jar to the remote context makes it show up in the classpath.
-          jar = File.createTempFile("test", ".jar");
-
-          JarOutputStream jarFile = new JarOutputStream(new FileOutputStream(jar));
-          jarFile.putNextEntry(new ZipEntry("test.resource"));
-          jarFile.write("test resource".getBytes("UTF-8"));
-          jarFile.closeEntry();
-          jarFile.close();
-
-          client.addJar(new URI("file:" + jar.getAbsolutePath()))
-            .get(TIMEOUT, TimeUnit.SECONDS);
-
-          // Need to run a Spark job to make sure the jar is added to the class loader. Monitoring
-          // SparkContext#addJar() doesn't mean much, we can only be sure jars have been distributed
-          // when we run a task after the jar has been added.
-          String result = client.submit(new FileReader("test.resource", true))
-            .get(TIMEOUT, TimeUnit.SECONDS);
-          assertEquals("test resource", result);
-
-          // Test that adding a file to the remote context makes it available to executors.
-          file = File.createTempFile("test", ".file");
-
-          FileOutputStream fileStream = new FileOutputStream(file);
-          fileStream.write("test file".getBytes("UTF-8"));
-          fileStream.close();
-
-          client.addJar(new URI("file:" + file.getAbsolutePath()))
-            .get(TIMEOUT, TimeUnit.SECONDS);
-
-          // The same applies to files added with "addFile". They're only guaranteed to be available
-          // to tasks started after the addFile() call completes.
-          result = client.submit(new FileReader(file.getName(), false))
-            .get(TIMEOUT, TimeUnit.SECONDS);
-          assertEquals("test file", result);
-        } finally {
-          if (jar != null) {
-            jar.delete();
-          }
-          if (file != null) {
-            file.delete();
-          }
-        }
-      }
-    });
-  }
-
-  @Test
-  public void testSparkSQLJob() throws Exception {
-    runTest(true, new TestFunction() {
-      @Override
-      void call(LivyClient client) throws Exception {
-        JobHandle<List<String>> handle = client.submit(new SQLGetTweets(false));
-        List<String> topTweets = handle.get(TIMEOUT, TimeUnit.SECONDS);
-        assertEquals(1, topTweets.size());
-        assertEquals("[Adventures With Coffee, Code, and Writing.,0]",
-                topTweets.get(0));
-      }
-    });
-  }
-
-  @Test
-  public void testHiveJob() throws Exception {
-    runTest(true, new TestFunction() {
-      @Override
-      void call(LivyClient client) throws Exception {
-        JobHandle<List<String>> handle = client.submit(new SQLGetTweets(true));
-        List<String> topTweets = handle.get(TIMEOUT, TimeUnit.SECONDS);
-        assertEquals(1, topTweets.size());
-        assertEquals("[Adventures With Coffee, Code, and Writing.,0]",
-                topTweets.get(0));
-      }
-    });
-  }
-
-  @Test
-  public void testStreamingContext() throws Exception {
-    runTest(true, new TestFunction() {
-      @Override
-      void call(LivyClient client) throws Exception {
-        JobHandle<Boolean> handle = client.submit(new SparkStreamingJob());
-        Boolean streamingContextCreated = handle.get(TIMEOUT, TimeUnit.SECONDS);
-        assertEquals(true, streamingContextCreated);
-      }
-    });
-  }
-
-  @Test
-  public void testImpersonation() throws Exception {
-    final String PROXY = "__proxy__";
-
-    runTest(false, new TestFunction() {
-      @Override
-      void config(Properties conf) {
-        conf.put(RSCConf.Entry.PROXY_USER.key(), PROXY);
-      }
-
-      @Override
-      void call(LivyClient client) throws Exception {
-        JobHandle<String> handle = client.submit(new GetCurrentUser());
-        String userName = handle.get(TIMEOUT, TimeUnit.SECONDS);
-        assertEquals(PROXY, userName);
-      }
-    });
-  }
-
-  @Test
-  public void testConnectToRunningContext() throws Exception {
-    runTest(false, new TestFunction() {
-      @Override
-      void call(LivyClient client) throws Exception {
-        URI uri = disconnectClient(client);
-
-        // If this tries to create a new context, it will fail because it's missing the
-        // needed configuration from createConf().
-        LivyClient newClient = new LivyClientBuilder()
-          .setURI(uri)
-          .build();
-
-        try {
-          JobHandle<String> handle = newClient.submit(new Echo<>("hello"));
-          String result = handle.get(TIMEOUT, TimeUnit.SECONDS);
-          assertEquals("hello", result);
-        } finally {
-          newClient.stop(true);
-        }
-      }
-    });
-  }
-
-  @Test
-  public void testServerIdleTimeout() throws Exception {
-    runTest(true, new TestFunction() {
-      @Override
-      void call(LivyClient client) throws Exception {
-        // Close the old client and wait a couple of seconds for the timeout to trigger.
-        URI uri = disconnectClient(client);
-        TimeUnit.SECONDS.sleep(2);
-
-        // Try to connect back with a new client, it should fail. Since there's no API to monitor
-        // the connection state, we try to enqueue a long-running job and make sure that it fails,
-        // in case the connection actually goes through.
-        try {
-          LivyClient newClient = new LivyClientBuilder()
-            .setURI(uri)
-            .build();
-
-          try {
-            newClient.submit(new Sleeper(TimeUnit.SECONDS.toMillis(TIMEOUT)))
-              .get(TIMEOUT, TimeUnit.SECONDS);
-          } catch (TimeoutException te) {
-            // Shouldn't have gotten here, but catch this so that we stop the client.
-            newClient.stop(true);
-          }
-          fail("Should have failed to contact RSC after idle timeout.");
-        } catch (Exception e) {
-          // Expected.
-        }
-      }
-
-      @Override
-      void config(Properties conf) {
-        conf.setProperty(SERVER_IDLE_TIMEOUT.key(), "1s");
-      }
-    });
-  }
-
-  @Test
-  public void testKillServerWhileSparkSubmitIsRunning() throws Exception {
-    Properties conf = createConf(true);
-    LivyClient client = null;
-    PipedInputStream stubStream = new PipedInputStream(new PipedOutputStream());
-    try {
-      Process mockSparkSubmit = mock(Process.class);
-      when(mockSparkSubmit.getInputStream()).thenReturn(stubStream);
-      when(mockSparkSubmit.getErrorStream()).thenReturn(stubStream);
-
-      // Block waitFor until process.destroy() is called.
-      final CountDownLatch waitForCalled = new CountDownLatch(1);
-      when(mockSparkSubmit.waitFor()).thenAnswer(new Answer<Integer>() {
-        @Override
-        public Integer answer(InvocationOnMock invocation) throws Throwable {
-          waitForCalled.await();
-          return 0;
-        }
-      });
-
-      // Verify process.destroy() is called.
-      final CountDownLatch destroyCalled = new CountDownLatch(1);
-      doAnswer(new Answer<Void>() {
-        @Override
-        public Void answer(InvocationOnMock invocation) throws Throwable {
-          destroyCalled.countDown();
-          return null;
-        }
-      }).when(mockSparkSubmit).destroy();
-
-      ContextLauncher.mockSparkSubmit = mockSparkSubmit;
-
-      client = new LivyClientBuilder(false).setURI(new URI("rsc:/"))
-        .setAll(conf)
-        .build();
-
-      client.stop(true);
-
-      assertTrue(destroyCalled.await(5, TimeUnit.SECONDS));
-      waitForCalled.countDown();
-    } catch (Exception e) {
-      // JUnit prints not so useful backtraces in test summary reports, and we don't see the
-      // actual source line of the exception, so print the exception to the logs.
-      LOG.error("Test threw exception.", e);
-      throw e;
-    } finally {
-      ContextLauncher.mockSparkSubmit = null;
-      stubStream.close();
-      if (client != null) {
-        client.stop(true);
-      }
-    }
-  }
-
-  @Test
-  public void testBypass() throws Exception {
-    runBypassTest(false);
-  }
-
-  @Test
-  public void testBypassSync() throws Exception {
-    runBypassTest(true);
-  }
-
-  private void runBypassTest(final boolean sync) throws Exception {
-    runTest(true, new TestFunction() {
-      @Override
-      public void call(LivyClient client) throws Exception {
-        Serializer s = new Serializer();
-        RSCClient lclient = (RSCClient) client;
-        ByteBuffer job = s.serialize(new Echo<>("hello"));
-        String jobId = lclient.bypass(job, sync);
-
-        // Try to fetch the result, trying several times until the timeout runs out, and
-        // backing off as attempts fail.
-        long deadline = System.nanoTime() + TimeUnit.NANOSECONDS.convert(TIMEOUT, TimeUnit.SECONDS);
-        long sleep = 100;
-        BypassJobStatus status = null;
-        while (System.nanoTime() < deadline) {
-          BypassJobStatus currStatus = lclient.getBypassJobStatus(jobId).get(TIMEOUT,
-            TimeUnit.SECONDS);
-          assertNotEquals(JobHandle.State.CANCELLED, currStatus.state);
-          assertNotEquals(JobHandle.State.FAILED, currStatus.state);
-          if (currStatus.state.equals(JobHandle.State.SUCCEEDED)) {
-            status = currStatus;
-            break;
-          } else if (deadline - System.nanoTime() > sleep * 2) {
-            Thread.sleep(sleep);
-            sleep *= 2;
-          }
-        }
-        assertNotNull("Failed to fetch bypass job status.", status);
-        assertEquals(JobHandle.State.SUCCEEDED, status.state);
-
-        String resultVal = (String) s.deserialize(ByteBuffer.wrap(status.result));
-        assertEquals("hello", resultVal);
-
-        // After the result is retrieved, the driver should stop tracking the job and release
-        // resources associated with it.
-        try {
-          lclient.getBypassJobStatus(jobId).get(TIMEOUT, TimeUnit.SECONDS);
-          fail("Should have failed to retrieve status of released job.");
-        } catch (ExecutionException ee) {
-          assertTrue(ee.getCause() instanceof RpcException);
-          assertTrue(ee.getCause().getMessage().contains(
-            "java.util.NoSuchElementException: " + jobId));
-        }
-      }
-    });
-  }
-
-  private <T> JobHandle.Listener<T> newListener() {
-    @SuppressWarnings("unchecked")
-    JobHandle.Listener<T> listener =
-      (JobHandle.Listener<T>) mock(JobHandle.Listener.class);
-    return listener;
-  }
-
-  private URI disconnectClient(LivyClient client) throws Exception {
-    ContextInfo ctx = ((RSCClient) client).getContextInfo();
-    URI uri = new URI(String.format("rsc://%s:%s@%s:%d", ctx.clientId, ctx.secret,
-      ctx.remoteAddress, ctx.remotePort));
-
-    // Close the old client and wait a couple of seconds for the timeout to trigger.
-    client.stop(false);
-    return uri;
-  }
-
-  private void runTest(boolean local, TestFunction test) throws Exception {
-    Properties conf = createConf(local);
-    LivyClient client = null;
-    try {
-      test.config(conf);
-      client = new LivyClientBuilder(false).setURI(new URI("rsc:/"))
-        .setAll(conf)
-        .build();
-
-      // Wait for the context to be up before running the test.
-      assertNull(client.submit(new PingJob()).get(TIMEOUT, TimeUnit.SECONDS));
-
-      test.call(client);
-    } catch (Exception e) {
-      // JUnit prints not so useful backtraces in test summary reports, and we don't see the
-      // actual source line of the exception, so print the exception to the logs.
-      LOG.error("Test threw exception.", e);
-      throw e;
-    } finally {
-      if (client != null) {
-        client.stop(true);
-      }
-    }
-  }
-
-  /* Since it's hard to test a streaming context, test that a
-   * streaming context has been created. Also checks that improper
-   * sequence of streaming context calls (i.e create, stop, retrieve)
-   * result in a failure.
-   */
-  private static class SparkStreamingJob implements Job<Boolean> {
-    @Override
-    public Boolean call(JobContext jc) throws Exception {
-      try {
-        jc.streamingctx();
-        fail("Access before creation: Should throw IllegalStateException");
-      } catch (IllegalStateException ex) {
-        // Expected.
-      }
-      try {
-        jc.stopStreamingCtx();
-        fail("Stop before creation: Should throw IllegalStateException");
-      } catch (IllegalStateException ex) {
-        // Expected.
-      }
-      try {
-        jc.createStreamingContext(1000L);
-        JavaStreamingContext streamingContext = jc.streamingctx();
-        jc.stopStreamingCtx();
-        jc.streamingctx();
-        fail();
-      } catch (IllegalStateException ex) {
-        // Expected.
-      }
-
-      jc.createStreamingContext(1000L);
-      JavaStreamingContext streamingContext = jc.streamingctx();
-      jc.stopStreamingCtx();
-      return streamingContext != null;
-    }
-  }
-
-  private abstract static class TestFunction {
-    abstract void call(LivyClient client) throws Exception;
-    void config(Properties conf) { }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestKryoMessageCodec.java
----------------------------------------------------------------------
diff --git a/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestKryoMessageCodec.java b/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestKryoMessageCodec.java
deleted file mode 100644
index a8ede98..0000000
--- a/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestKryoMessageCodec.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc.rpc;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.UnpooledByteBufAllocator;
-import io.netty.channel.embedded.EmbeddedChannel;
-import io.netty.handler.logging.LoggingHandler;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-public class TestKryoMessageCodec {
-
-  private static final String MESSAGE = "Hello World!";
-
-  @Test
-  public void testKryoCodec() throws Exception {
-    List<Object> objects = encodeAndDecode(MESSAGE, null);
-    assertEquals(1, objects.size());
-    assertEquals(MESSAGE, objects.get(0));
-  }
-
-  @Test
-  public void testFragmentation() throws Exception {
-    ByteBuf buf = newBuffer();
-    Object[] messages = { "msg1", "msg2" };
-    int[] indices = new int[messages.length];
-
-    KryoMessageCodec codec = new KryoMessageCodec(0);
-
-    for (int i = 0; i < messages.length; i++) {
-      codec.encode(null, messages[i], buf);
-      indices[i] = buf.writerIndex();
-    }
-
-    List<Object> objects = new ArrayList<>();
-
-    // Don't read enough data for the first message to be decoded.
-    codec.decode(null, buf.slice(0, indices[0] - 1), objects);
-    assertEquals(0, objects.size());
-
-    // Read enough data for just the first message to be decoded.
-    codec.decode(null, buf.slice(0, indices[0] + 1), objects);
-    assertEquals(1, objects.size());
-  }
-
-  @Test
-  public void testEmbeddedChannel() throws Exception {
-    EmbeddedChannel c = new EmbeddedChannel(
-      new LoggingHandler(getClass()),
-      new KryoMessageCodec(0));
-    c.writeAndFlush(MESSAGE);
-    assertEquals(1, c.outboundMessages().size());
-    assertFalse(MESSAGE.getClass().equals(c.outboundMessages().peek().getClass()));
-    c.writeInbound(c.readOutbound());
-    assertEquals(1, c.inboundMessages().size());
-    assertEquals(MESSAGE, c.readInbound());
-    c.close();
-  }
-
-  @Test
-  public void testAutoRegistration() throws Exception {
-    KryoMessageCodec codec = new KryoMessageCodec(0, TestMessage.class);
-    ByteBuf buf = newBuffer();
-    codec.encode(null, new TestMessage(), buf);
-
-    List<Object> out = new ArrayList<>();
-    codec.decode(null, buf, out);
-
-    assertEquals(1, out.size());
-    assertTrue(out.get(0) instanceof TestMessage);
-  }
-
-  @Test
-  public void testMaxMessageSize() throws Exception {
-    KryoMessageCodec codec = new KryoMessageCodec(1024);
-    ByteBuf buf = newBuffer();
-    codec.encode(null, new TestMessage(new byte[512]), buf);
-
-    try {
-      codec.encode(null, new TestMessage(new byte[1025]), buf);
-      fail("Should have failed to encode large message.");
-    } catch (IllegalArgumentException e) {
-      assertTrue(e.getMessage().indexOf("maximum allowed size") > 0);
-    }
-
-    KryoMessageCodec unlimited = new KryoMessageCodec(0);
-    buf = newBuffer();
-    unlimited.encode(null, new TestMessage(new byte[1025]), buf);
-
-    try {
-      List<Object> out = new ArrayList<>();
-      codec.decode(null, buf, out);
-      fail("Should have failed to decode large message.");
-    } catch (IllegalArgumentException e) {
-      assertTrue(e.getMessage().indexOf("maximum allowed size") > 0);
-    }
-  }
-
-  @Test
-  public void testNegativeMessageSize() throws Exception {
-    KryoMessageCodec codec = new KryoMessageCodec(1024);
-    ByteBuf buf = newBuffer();
-    buf.writeInt(-1);
-
-    try {
-      List<Object> out = new ArrayList<>();
-      codec.decode(null, buf, out);
-      fail("Should have failed to decode message with negative size.");
-    } catch (IllegalArgumentException e) {
-      assertTrue(e.getMessage().indexOf("must be positive") > 0);
-    }
-  }
-
-  @Test
-  public void testEncryptionOnly() throws Exception {
-    List<Object> objects = Collections.<Object>emptyList();
-    try {
-      objects = encodeAndDecode(MESSAGE, new TestEncryptionHandler(true, false));
-    } catch (Exception e) {
-      // Pass.
-    }
-    // Do this check in case the ciphertext actually makes sense in some way.
-    for (Object msg : objects) {
-      assertFalse(MESSAGE.equals(objects.get(0)));
-    }
-  }
-
-  @Test
-  public void testDecryptionOnly() throws Exception {
-    List<Object> objects = Collections.<Object>emptyList();
-    try {
-      objects = encodeAndDecode(MESSAGE, new TestEncryptionHandler(false, true));
-    } catch (Exception e) {
-      // Pass.
-    }
-    // Do this check in case the decrypted plaintext actually makes sense in some way.
-    for (Object msg : objects) {
-      assertFalse(MESSAGE.equals(objects.get(0)));
-    }
-  }
-
-  @Test
-  public void testEncryptDecrypt() throws Exception {
-    List<Object> objects = encodeAndDecode(MESSAGE, new TestEncryptionHandler(true, true));
-    assertEquals(1, objects.size());
-    assertEquals(MESSAGE, objects.get(0));
-  }
-
-  private List<Object> encodeAndDecode(Object message, KryoMessageCodec.EncryptionHandler eh)
-      throws Exception {
-    ByteBuf buf = newBuffer();
-    KryoMessageCodec codec = new KryoMessageCodec(0);
-    codec.setEncryptionHandler(eh);
-    codec.encode(null, message, buf);
-
-    List<Object> objects = new ArrayList<>();
-    codec.decode(null, buf, objects);
-    return objects;
-  }
-
-  private ByteBuf newBuffer() {
-    return UnpooledByteBufAllocator.DEFAULT.buffer(1024);
-  }
-
-  private static class TestMessage {
-    byte[] data;
-
-    TestMessage() {
-      this(null);
-    }
-
-    TestMessage(byte[] data) {
-      this.data = data;
-    }
-  }
-
-  private static class TestEncryptionHandler implements KryoMessageCodec.EncryptionHandler {
-
-    private static final byte KEY = 0x42;
-
-    private final boolean encrypt;
-    private final boolean decrypt;
-
-    TestEncryptionHandler(boolean encrypt, boolean decrypt) {
-      this.encrypt = encrypt;
-      this.decrypt = decrypt;
-    }
-
-    public byte[] wrap(byte[] data, int offset, int len) throws IOException {
-      return encrypt ? transform(data, offset, len) : data;
-    }
-
-    public byte[] unwrap(byte[] data, int offset, int len) throws IOException {
-      return decrypt ? transform(data, offset, len) : data;
-    }
-
-    public void dispose() throws IOException {
-
-    }
-
-    private byte[] transform(byte[] data, int offset, int len) {
-      byte[] dest = new byte[len];
-      for (int i = 0; i < len; i++) {
-        dest[i] = (byte) (data[offset + i] ^ KEY);
-      }
-      return dest;
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestRpc.java
----------------------------------------------------------------------
diff --git a/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestRpc.java b/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestRpc.java
deleted file mode 100644
index 48abe94..0000000
--- a/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestRpc.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc.rpc;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.SocketException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.security.sasl.SaslException;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.embedded.EmbeddedChannel;
-import io.netty.util.concurrent.Future;
-import org.apache.commons.io.IOUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-import com.cloudera.livy.rsc.FutureListener;
-import com.cloudera.livy.rsc.RSCConf;
-import com.cloudera.livy.rsc.Utils;
-import static com.cloudera.livy.rsc.RSCConf.Entry.*;
-
-public class TestRpc {
-
-  private static final Logger LOG = LoggerFactory.getLogger(TestRpc.class);
-
-  private Collection<Closeable> closeables;
-  private RSCConf emptyConfig;
-
-  @Before
-  public void setUp() {
-    closeables = new ArrayList<>();
-    emptyConfig = new RSCConf(null);
-  }
-
-  @After
-  public void cleanUp() throws Exception {
-    for (Closeable c : closeables) {
-      IOUtils.closeQuietly(c);
-    }
-  }
-
-  private <T extends Closeable> T autoClose(T closeable) {
-    closeables.add(closeable);
-    return closeable;
-  }
-
-  @Test
-  public void testRpcDispatcher() throws Exception {
-    Rpc serverRpc = autoClose(Rpc.createEmbedded(new TestDispatcher()));
-    Rpc clientRpc = autoClose(Rpc.createEmbedded(new TestDispatcher()));
-
-    TestMessage outbound = new TestMessage("Hello World!");
-    Future<TestMessage> call = clientRpc.call(outbound, TestMessage.class);
-
-    LOG.debug("Transferring messages...");
-    transfer(serverRpc, clientRpc);
-
-    TestMessage reply = call.get(10, TimeUnit.SECONDS);
-    assertEquals(outbound.message, reply.message);
-  }
-
-  @Test
-  public void testClientServer() throws Exception {
-    RpcServer server = autoClose(new RpcServer(emptyConfig));
-    Rpc[] rpcs = createRpcConnection(server);
-    Rpc serverRpc = rpcs[0];
-    Rpc client = rpcs[1];
-
-    TestMessage outbound = new TestMessage("Hello World!");
-    Future<TestMessage> call = client.call(outbound, TestMessage.class);
-    TestMessage reply = call.get(10, TimeUnit.SECONDS);
-    assertEquals(outbound.message, reply.message);
-
-    TestMessage another = new TestMessage("Hello again!");
-    Future<TestMessage> anotherCall = client.call(another, TestMessage.class);
-    TestMessage anotherReply = anotherCall.get(10, TimeUnit.SECONDS);
-    assertEquals(another.message, anotherReply.message);
-
-    String errorMsg = "This is an error.";
-    try {
-      client.call(new ErrorCall(errorMsg)).get(10, TimeUnit.SECONDS);
-    } catch (ExecutionException ee) {
-      assertTrue(ee.getCause() instanceof RpcException);
-      assertTrue(ee.getCause().getMessage().indexOf(errorMsg) >= 0);
-    }
-
-    // Test from server to client too.
-    TestMessage serverMsg = new TestMessage("Hello from the server!");
-    Future<TestMessage> serverCall = serverRpc.call(serverMsg, TestMessage.class);
-    TestMessage serverReply = serverCall.get(10, TimeUnit.SECONDS);
-    assertEquals(serverMsg.message, serverReply.message);
-  }
-
-  @Test
-  public void testBadHello() throws Exception {
-    RpcServer server = autoClose(new RpcServer(emptyConfig));
-    RpcServer.ClientCallback callback = mock(RpcServer.ClientCallback.class);
-
-    server.registerClient("client", "newClient", callback);
-    Future<Rpc> clientRpcFuture = Rpc.createClient(emptyConfig, server.getEventLoopGroup(),
-        "localhost", server.getPort(), "client", "wrongClient", new TestDispatcher());
-
-    try {
-      autoClose(clientRpcFuture.get(10, TimeUnit.SECONDS));
-      fail("Should have failed to create client with wrong secret.");
-    } catch (ExecutionException ee) {
-      // On failure, the SASL handler will throw an exception indicating that the SASL
-      // negotiation failed.
-      assertTrue("Unexpected exception: " + ee.getCause(),
-        ee.getCause() instanceof SaslException);
-    }
-
-    verify(callback, never()).onNewClient(any(Rpc.class));
-  }
-
-  @Test
-  public void testCloseListener() throws Exception {
-    RpcServer server = autoClose(new RpcServer(emptyConfig));
-    Rpc[] rpcs = createRpcConnection(server);
-    Rpc client = rpcs[1];
-
-    final AtomicInteger closeCount = new AtomicInteger();
-    Utils.addListener(client.getChannel().closeFuture(), new FutureListener<Void>() {
-      @Override
-      public void onSuccess(Void unused) {
-        closeCount.incrementAndGet();
-      }
-    });
-
-    client.close();
-    client.close();
-    assertEquals(1, closeCount.get());
-  }
-
-  @Test
-  public void testNotDeserializableRpc() throws Exception {
-    RpcServer server = autoClose(new RpcServer(emptyConfig));
-    Rpc[] rpcs = createRpcConnection(server);
-    Rpc client = rpcs[1];
-
-    try {
-      client.call(new NotDeserializable(42)).get(10, TimeUnit.SECONDS);
-    } catch (ExecutionException ee) {
-      assertTrue(ee.getCause() instanceof RpcException);
-      assertTrue(ee.getCause().getMessage().indexOf("KryoException") >= 0);
-    }
-  }
-
-  @Test
-  public void testEncryption() throws Exception {
-    RSCConf eConf = new RSCConf(null)
-      .setAll(emptyConfig)
-      .set(SASL_QOP, Rpc.SASL_AUTH_CONF);
-    RpcServer server = autoClose(new RpcServer(eConf));
-    Rpc[] rpcs = createRpcConnection(server, eConf);
-    Rpc client = rpcs[1];
-
-    TestMessage outbound = new TestMessage("Hello World!");
-    Future<TestMessage> call = client.call(outbound, TestMessage.class);
-    TestMessage reply = call.get(10, TimeUnit.SECONDS);
-    assertEquals(outbound.message, reply.message);
-  }
-
-  @Test
-  public void testPortRange() throws Exception {
-    String portRange = "a~b";
-    emptyConfig.set(LAUNCHER_PORT_RANGE, portRange);
-    try {
-      autoClose(new RpcServer(emptyConfig));
-    } catch (Exception ee) {
-      assertTrue(ee instanceof NumberFormatException);
-    }
-    portRange = "11000";
-    emptyConfig.set(LAUNCHER_PORT_RANGE, portRange);
-    try {
-      autoClose(new RpcServer(emptyConfig));
-    } catch (Exception ee) {
-      assertTrue(ee instanceof ArrayIndexOutOfBoundsException);
-    }
-    portRange = "11000~11110";
-    emptyConfig.set(LAUNCHER_PORT_RANGE, portRange);
-    String [] portRangeData = portRange.split("~");
-    int startPort = Integer.parseInt(portRangeData[0]);
-    int endPort = Integer.parseInt(portRangeData[1]);
-    RpcServer server = autoClose(new RpcServer(emptyConfig));
-    assertTrue(startPort <= server.getPort() && server.getPort() <= endPort);
-  }
-
-  private void transfer(Rpc serverRpc, Rpc clientRpc) {
-    EmbeddedChannel client = (EmbeddedChannel) clientRpc.getChannel();
-    EmbeddedChannel server = (EmbeddedChannel) serverRpc.getChannel();
-
-    server.runPendingTasks();
-    client.runPendingTasks();
-
-    int count = 0;
-    while (!client.outboundMessages().isEmpty()) {
-      server.writeInbound(client.readOutbound());
-      count++;
-    }
-    server.flush();
-    LOG.debug("Transferred {} outbound client messages.", count);
-
-    count = 0;
-    while (!server.outboundMessages().isEmpty()) {
-      client.writeInbound(server.readOutbound());
-      count++;
-    }
-    client.flush();
-    LOG.debug("Transferred {} outbound server messages.", count);
-  }
-
-  /**
-   * Creates a client connection between the server and a client.
-   *
-   * @return two-tuple (server rpc, client rpc)
-   */
-  private Rpc[] createRpcConnection(RpcServer server) throws Exception {
-    return createRpcConnection(server, emptyConfig);
-  }
-
-  private Rpc[] createRpcConnection(RpcServer server, RSCConf clientConf)
-      throws Exception {
-    String secret = server.createSecret();
-    ServerRpcCallback callback = new ServerRpcCallback();
-    server.registerClient("client", secret, callback);
-
-    Future<Rpc> clientRpcFuture = Rpc.createClient(clientConf, server.getEventLoopGroup(),
-        "localhost", server.getPort(), "client", secret, new TestDispatcher());
-
-    assertTrue("onNewClient() wasn't called.",
-      callback.onNewClientCalled.await(10, TimeUnit.SECONDS));
-    assertTrue("onSaslComplete() wasn't called.",
-      callback.onSaslCompleteCalled.await(10, TimeUnit.SECONDS));
-    assertNotNull(callback.client);
-    Rpc serverRpc = autoClose(callback.client);
-    Rpc clientRpc = autoClose(clientRpcFuture.get(10, TimeUnit.SECONDS));
-    return new Rpc[] { serverRpc, clientRpc };
-  }
-
-  private static class ServerRpcCallback implements RpcServer.ClientCallback {
-    final CountDownLatch onNewClientCalled = new CountDownLatch(1);
-    final CountDownLatch onSaslCompleteCalled = new CountDownLatch(1);
-    Rpc client;
-
-    @Override
-    public RpcDispatcher onNewClient(Rpc client) {
-      this.client = client;
-      onNewClientCalled.countDown();
-      return new TestDispatcher();
-    }
-
-    @Override
-    public void onSaslComplete(Rpc client) {
-      onSaslCompleteCalled.countDown();
-    }
-
-  }
-
-  private static class TestMessage {
-
-    final String message;
-
-    public TestMessage() {
-      this(null);
-    }
-
-    public TestMessage(String message) {
-      this.message = message;
-    }
-
-  }
-
-  private static class ErrorCall {
-
-    final String error;
-
-    public ErrorCall() {
-      this(null);
-    }
-
-    public ErrorCall(String error) {
-      this.error = error;
-    }
-
-  }
-
-  private static class NotDeserializable {
-
-    NotDeserializable(int unused) {
-
-    }
-
-  }
-
-  private static class TestDispatcher extends RpcDispatcher {
-    protected TestMessage handle(ChannelHandlerContext ctx, TestMessage msg) {
-      return msg;
-    }
-
-    protected void handle(ChannelHandlerContext ctx, ErrorCall msg) {
-      throw new IllegalArgumentException(msg.error);
-    }
-
-    protected void handle(ChannelHandlerContext ctx, NotDeserializable msg) {
-      // No op. Shouldn't actually be called, if it is, the test will fail.
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/test/java/org/apache/livy/rsc/TestJobHandle.java
----------------------------------------------------------------------
diff --git a/rsc/src/test/java/org/apache/livy/rsc/TestJobHandle.java b/rsc/src/test/java/org/apache/livy/rsc/TestJobHandle.java
new file mode 100644
index 0000000..e6161ed
--- /dev/null
+++ b/rsc/src/test/java/org/apache/livy/rsc/TestJobHandle.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.rsc;
+
+import io.netty.util.concurrent.Promise;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.livy.JobHandle;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestJobHandle {
+
+  @Mock private RSCClient client;
+  @Mock private Promise<Object> promise;
+  @Mock private JobHandle.Listener<Object> listener;
+  @Mock private JobHandle.Listener<Object> listener2;
+
+  @Test
+  public void testStateChanges() throws Exception {
+    JobHandleImpl<Object> handle = new JobHandleImpl<Object>(client, promise, "job");
+    handle.addListener(listener);
+
+    assertTrue(handle.changeState(JobHandle.State.QUEUED));
+    verify(listener).onJobQueued(handle);
+
+    assertTrue(handle.changeState(JobHandle.State.STARTED));
+    verify(listener).onJobStarted(handle);
+
+    assertTrue(handle.changeState(JobHandle.State.CANCELLED));
+    verify(listener).onJobCancelled(handle);
+
+    assertFalse(handle.changeState(JobHandle.State.STARTED));
+    assertFalse(handle.changeState(JobHandle.State.FAILED));
+    assertFalse(handle.changeState(JobHandle.State.SUCCEEDED));
+  }
+
+  @Test
+  public void testFailedJob() throws Exception {
+    JobHandleImpl<Object> handle = new JobHandleImpl<Object>(client, promise, "job");
+    handle.addListener(listener);
+
+    Throwable cause = new Exception();
+    when(promise.cause()).thenReturn(cause);
+
+    assertTrue(handle.changeState(JobHandle.State.FAILED));
+    verify(promise).cause();
+    verify(listener).onJobFailed(handle, cause);
+  }
+
+  @Test
+  public void testSucceededJob() throws Exception {
+    JobHandleImpl<Object> handle = new JobHandleImpl<Object>(client, promise, "job");
+    handle.addListener(listener);
+
+    Object result = new Exception();
+    when(promise.getNow()).thenReturn(result);
+
+    assertTrue(handle.changeState(JobHandle.State.SUCCEEDED));
+    verify(promise).getNow();
+    verify(listener).onJobSucceeded(handle, result);
+  }
+
+  @Test
+  public void testImmediateCallback() throws Exception {
+    JobHandleImpl<Object> handle = new JobHandleImpl<Object>(client, promise, "job");
+    assertTrue(handle.changeState(JobHandle.State.QUEUED));
+    handle.addListener(listener);
+    verify(listener).onJobQueued(handle);
+
+    handle.changeState(JobHandle.State.STARTED);
+    handle.changeState(JobHandle.State.CANCELLED);
+
+    handle.addListener(listener2);
+    verify(listener2).onJobCancelled(same(handle));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java
----------------------------------------------------------------------
diff --git a/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java b/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java
new file mode 100644
index 0000000..0663822
--- /dev/null
+++ b/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.rsc;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.jar.JarOutputStream;
+import java.util.zip.ZipEntry;
+
+import org.apache.spark.launcher.SparkLauncher;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.livy.Job;
+import org.apache.livy.JobContext;
+import org.apache.livy.JobHandle;
+import org.apache.livy.LivyClient;
+import org.apache.livy.LivyClientBuilder;
+import org.apache.livy.client.common.Serializer;
+import org.apache.livy.rsc.rpc.RpcException;
+import org.apache.livy.test.jobs.Echo;
+import org.apache.livy.test.jobs.Failure;
+import org.apache.livy.test.jobs.FileReader;
+import org.apache.livy.test.jobs.GetCurrentUser;
+import org.apache.livy.test.jobs.SQLGetTweets;
+import org.apache.livy.test.jobs.Sleeper;
+import org.apache.livy.test.jobs.SmallCount;
+import static org.apache.livy.rsc.RSCConf.Entry.*;
+
+public class TestSparkClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestSparkClient.class);
+
+  // Timeouts are bad... mmmkay.
+  private static final long TIMEOUT = 100;
+
+  private Properties createConf(boolean local) {
+    Properties conf = new Properties();
+    if (local) {
+      conf.put(CLIENT_IN_PROCESS.key(), "true");
+      conf.put(SparkLauncher.SPARK_MASTER, "local");
+      conf.put("spark.app.name", "SparkClientSuite Local App");
+    } else {
+      String classpath = System.getProperty("java.class.path");
+      conf.put("spark.app.name", "SparkClientSuite Remote App");
+      conf.put(SparkLauncher.DRIVER_MEMORY, "512m");
+      conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, classpath);
+      conf.put(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH, classpath);
+    }
+
+    conf.put(LIVY_JARS.key(), "");
+    return conf;
+  }
+
+  @Test
+  public void testJobSubmission() throws Exception {
+    runTest(true, new TestFunction() {
+      @Override
+      public void call(LivyClient client) throws Exception {
+        JobHandle.Listener<String> listener = newListener();
+        JobHandle<String> handle = client.submit(new Echo<>("hello"));
+        handle.addListener(listener);
+        assertEquals("hello", handle.get(TIMEOUT, TimeUnit.SECONDS));
+
+        // Try an invalid state transition on the handle. This ensures that the actual state
+        // change we're interested in actually happened, since internally the handle serializes
+        // state changes.
+        assertFalse(((JobHandleImpl<String>)handle).changeState(JobHandle.State.SENT));
+
+        verify(listener).onJobStarted(handle);
+        verify(listener).onJobSucceeded(same(handle), eq(handle.get()));
+      }
+    });
+  }
+
+  @Test
+  public void testSimpleSparkJob() throws Exception {
+    runTest(true, new TestFunction() {
+      @Override
+      public void call(LivyClient client) throws Exception {
+        JobHandle<Long> handle = client.submit(new SmallCount(5));
+        assertEquals(Long.valueOf(5L), handle.get(TIMEOUT, TimeUnit.SECONDS));
+      }
+    });
+  }
+
+  @Test
+  public void testJobFailure() throws Exception {
+    runTest(true, new TestFunction() {
+      @Override
+      public void call(LivyClient client) throws Exception {
+        JobHandle.Listener<Void> listener = newListener();
+        JobHandle<Void> handle = client.submit(new Failure());
+        handle.addListener(listener);
+        try {
+          handle.get(TIMEOUT, TimeUnit.SECONDS);
+          fail("Should have thrown an exception.");
+        } catch (ExecutionException ee) {
+          assertTrue(ee.getCause().getMessage().contains(
+            Failure.JobFailureException.class.getName()));
+        }
+
+        // Try an invalid state transition on the handle. This ensures that the actual state
+        // change we're interested in actually happened, since internally the handle serializes
+        // state changes.
+        assertFalse(((JobHandleImpl<Void>)handle).changeState(JobHandle.State.SENT));
+
+        verify(listener).onJobStarted(handle);
+        verify(listener).onJobFailed(same(handle), any(Throwable.class));
+      }
+    });
+  }
+
+  @Test
+  public void testSyncRpc() throws Exception {
+    runTest(true, new TestFunction() {
+      @Override
+      public void call(LivyClient client) throws Exception {
+        Future<String> result = client.run(new Echo<>("Hello"));
+        assertEquals("Hello", result.get(TIMEOUT, TimeUnit.SECONDS));
+      }
+    });
+  }
+
+  @Test
+  public void testRemoteClient() throws Exception {
+    runTest(false, new TestFunction() {
+      @Override
+      public void call(LivyClient client) throws Exception {
+        JobHandle<Long> handle = client.submit(new SmallCount(5));
+        assertEquals(Long.valueOf(5L), handle.get(TIMEOUT, TimeUnit.SECONDS));
+      }
+    });
+  }
+
+  @Test
+  public void testAddJarsAndFiles() throws Exception {
+    runTest(true, new TestFunction() {
+      @Override
+      public void call(LivyClient client) throws Exception {
+        File jar = null;
+        File file = null;
+
+        try {
+          // Test that adding a jar to the remote context makes it show up in the classpath.
+          jar = File.createTempFile("test", ".jar");
+
+          JarOutputStream jarFile = new JarOutputStream(new FileOutputStream(jar));
+          jarFile.putNextEntry(new ZipEntry("test.resource"));
+          jarFile.write("test resource".getBytes("UTF-8"));
+          jarFile.closeEntry();
+          jarFile.close();
+
+          client.addJar(new URI("file:" + jar.getAbsolutePath()))
+            .get(TIMEOUT, TimeUnit.SECONDS);
+
+          // Need to run a Spark job to make sure the jar is added to the class loader. Monitoring
+          // SparkContext#addJar() doesn't mean much, we can only be sure jars have been distributed
+          // when we run a task after the jar has been added.
+          String result = client.submit(new FileReader("test.resource", true))
+            .get(TIMEOUT, TimeUnit.SECONDS);
+          assertEquals("test resource", result);
+
+          // Test that adding a file to the remote context makes it available to executors.
+          file = File.createTempFile("test", ".file");
+
+          FileOutputStream fileStream = new FileOutputStream(file);
+          fileStream.write("test file".getBytes("UTF-8"));
+          fileStream.close();
+
+          client.addJar(new URI("file:" + file.getAbsolutePath()))
+            .get(TIMEOUT, TimeUnit.SECONDS);
+
+          // The same applies to files added with "addFile". They're only guaranteed to be available
+          // to tasks started after the addFile() call completes.
+          result = client.submit(new FileReader(file.getName(), false))
+            .get(TIMEOUT, TimeUnit.SECONDS);
+          assertEquals("test file", result);
+        } finally {
+          if (jar != null) {
+            jar.delete();
+          }
+          if (file != null) {
+            file.delete();
+          }
+        }
+      }
+    });
+  }
+
+  @Test
+  public void testSparkSQLJob() throws Exception {
+    runTest(true, new TestFunction() {
+      @Override
+      void call(LivyClient client) throws Exception {
+        JobHandle<List<String>> handle = client.submit(new SQLGetTweets(false));
+        List<String> topTweets = handle.get(TIMEOUT, TimeUnit.SECONDS);
+        assertEquals(1, topTweets.size());
+        assertEquals("[Adventures With Coffee, Code, and Writing.,0]",
+                topTweets.get(0));
+      }
+    });
+  }
+
+  @Test
+  public void testHiveJob() throws Exception {
+    runTest(true, new TestFunction() {
+      @Override
+      void call(LivyClient client) throws Exception {
+        JobHandle<List<String>> handle = client.submit(new SQLGetTweets(true));
+        List<String> topTweets = handle.get(TIMEOUT, TimeUnit.SECONDS);
+        assertEquals(1, topTweets.size());
+        assertEquals("[Adventures With Coffee, Code, and Writing.,0]",
+                topTweets.get(0));
+      }
+    });
+  }
+
+  @Test
+  public void testStreamingContext() throws Exception {
+    runTest(true, new TestFunction() {
+      @Override
+      void call(LivyClient client) throws Exception {
+        JobHandle<Boolean> handle = client.submit(new SparkStreamingJob());
+        Boolean streamingContextCreated = handle.get(TIMEOUT, TimeUnit.SECONDS);
+        assertEquals(true, streamingContextCreated);
+      }
+    });
+  }
+
+  @Test
+  public void testImpersonation() throws Exception {
+    final String PROXY = "__proxy__";
+
+    runTest(false, new TestFunction() {
+      @Override
+      void config(Properties conf) {
+        conf.put(RSCConf.Entry.PROXY_USER.key(), PROXY);
+      }
+
+      @Override
+      void call(LivyClient client) throws Exception {
+        JobHandle<String> handle = client.submit(new GetCurrentUser());
+        String userName = handle.get(TIMEOUT, TimeUnit.SECONDS);
+        assertEquals(PROXY, userName);
+      }
+    });
+  }
+
+  @Test
+  public void testConnectToRunningContext() throws Exception {
+    runTest(false, new TestFunction() {
+      @Override
+      void call(LivyClient client) throws Exception {
+        URI uri = disconnectClient(client);
+
+        // If this tries to create a new context, it will fail because it's missing the
+        // needed configuration from createConf().
+        LivyClient newClient = new LivyClientBuilder()
+          .setURI(uri)
+          .build();
+
+        try {
+          JobHandle<String> handle = newClient.submit(new Echo<>("hello"));
+          String result = handle.get(TIMEOUT, TimeUnit.SECONDS);
+          assertEquals("hello", result);
+        } finally {
+          newClient.stop(true);
+        }
+      }
+    });
+  }
+
+  @Test
+  public void testServerIdleTimeout() throws Exception {
+    runTest(true, new TestFunction() {
+      @Override
+      void call(LivyClient client) throws Exception {
+        // Close the old client and wait a couple of seconds for the timeout to trigger.
+        URI uri = disconnectClient(client);
+        TimeUnit.SECONDS.sleep(2);
+
+        // Try to connect back with a new client, it should fail. Since there's no API to monitor
+        // the connection state, we try to enqueue a long-running job and make sure that it fails,
+        // in case the connection actually goes through.
+        try {
+          LivyClient newClient = new LivyClientBuilder()
+            .setURI(uri)
+            .build();
+
+          try {
+            newClient.submit(new Sleeper(TimeUnit.SECONDS.toMillis(TIMEOUT)))
+              .get(TIMEOUT, TimeUnit.SECONDS);
+          } catch (TimeoutException te) {
+            // Shouldn't have gotten here, but catch this so that we stop the client.
+            newClient.stop(true);
+          }
+          fail("Should have failed to contact RSC after idle timeout.");
+        } catch (Exception e) {
+          // Expected.
+        }
+      }
+
+      @Override
+      void config(Properties conf) {
+        conf.setProperty(SERVER_IDLE_TIMEOUT.key(), "1s");
+      }
+    });
+  }
+
+  @Test
+  public void testKillServerWhileSparkSubmitIsRunning() throws Exception {
+    Properties conf = createConf(true);
+    LivyClient client = null;
+    PipedInputStream stubStream = new PipedInputStream(new PipedOutputStream());
+    try {
+      Process mockSparkSubmit = mock(Process.class);
+      when(mockSparkSubmit.getInputStream()).thenReturn(stubStream);
+      when(mockSparkSubmit.getErrorStream()).thenReturn(stubStream);
+
+      // Block waitFor until process.destroy() is called.
+      final CountDownLatch waitForCalled = new CountDownLatch(1);
+      when(mockSparkSubmit.waitFor()).thenAnswer(new Answer<Integer>() {
+        @Override
+        public Integer answer(InvocationOnMock invocation) throws Throwable {
+          waitForCalled.await();
+          return 0;
+        }
+      });
+
+      // Verify process.destroy() is called.
+      final CountDownLatch destroyCalled = new CountDownLatch(1);
+      doAnswer(new Answer<Void>() {
+        @Override
+        public Void answer(InvocationOnMock invocation) throws Throwable {
+          destroyCalled.countDown();
+          return null;
+        }
+      }).when(mockSparkSubmit).destroy();
+
+      ContextLauncher.mockSparkSubmit = mockSparkSubmit;
+
+      client = new LivyClientBuilder(false).setURI(new URI("rsc:/"))
+        .setAll(conf)
+        .build();
+
+      client.stop(true);
+
+      assertTrue(destroyCalled.await(5, TimeUnit.SECONDS));
+      waitForCalled.countDown();
+    } catch (Exception e) {
+      // JUnit prints not so useful backtraces in test summary reports, and we don't see the
+      // actual source line of the exception, so print the exception to the logs.
+      LOG.error("Test threw exception.", e);
+      throw e;
+    } finally {
+      ContextLauncher.mockSparkSubmit = null;
+      stubStream.close();
+      if (client != null) {
+        client.stop(true);
+      }
+    }
+  }
+
+  @Test
+  public void testBypass() throws Exception {
+    runBypassTest(false);
+  }
+
+  @Test
+  public void testBypassSync() throws Exception {
+    runBypassTest(true);
+  }
+
+  private void runBypassTest(final boolean sync) throws Exception {
+    runTest(true, new TestFunction() {
+      @Override
+      public void call(LivyClient client) throws Exception {
+        Serializer s = new Serializer();
+        RSCClient lclient = (RSCClient) client;
+        ByteBuffer job = s.serialize(new Echo<>("hello"));
+        String jobId = lclient.bypass(job, sync);
+
+        // Try to fetch the result, trying several times until the timeout runs out, and
+        // backing off as attempts fail.
+        long deadline = System.nanoTime() + TimeUnit.NANOSECONDS.convert(TIMEOUT, TimeUnit.SECONDS);
+        long sleep = 100;
+        BypassJobStatus status = null;
+        while (System.nanoTime() < deadline) {
+          BypassJobStatus currStatus = lclient.getBypassJobStatus(jobId).get(TIMEOUT,
+            TimeUnit.SECONDS);
+          assertNotEquals(JobHandle.State.CANCELLED, currStatus.state);
+          assertNotEquals(JobHandle.State.FAILED, currStatus.state);
+          if (currStatus.state.equals(JobHandle.State.SUCCEEDED)) {
+            status = currStatus;
+            break;
+          } else if (deadline - System.nanoTime() > sleep * 2) {
+            Thread.sleep(sleep);
+            sleep *= 2;
+          }
+        }
+        assertNotNull("Failed to fetch bypass job status.", status);
+        assertEquals(JobHandle.State.SUCCEEDED, status.state);
+
+        String resultVal = (String) s.deserialize(ByteBuffer.wrap(status.result));
+        assertEquals("hello", resultVal);
+
+        // After the result is retrieved, the driver should stop tracking the job and release
+        // resources associated with it.
+        try {
+          lclient.getBypassJobStatus(jobId).get(TIMEOUT, TimeUnit.SECONDS);
+          fail("Should have failed to retrieve status of released job.");
+        } catch (ExecutionException ee) {
+          assertTrue(ee.getCause() instanceof RpcException);
+          assertTrue(ee.getCause().getMessage().contains(
+            "java.util.NoSuchElementException: " + jobId));
+        }
+      }
+    });
+  }
+
+  private <T> JobHandle.Listener<T> newListener() {
+    @SuppressWarnings("unchecked")
+    JobHandle.Listener<T> listener =
+      (JobHandle.Listener<T>) mock(JobHandle.Listener.class);
+    return listener;
+  }
+
+  private URI disconnectClient(LivyClient client) throws Exception {
+    ContextInfo ctx = ((RSCClient) client).getContextInfo();
+    URI uri = new URI(String.format("rsc://%s:%s@%s:%d", ctx.clientId, ctx.secret,
+      ctx.remoteAddress, ctx.remotePort));
+
+    // Close the old client and wait a couple of seconds for the timeout to trigger.
+    client.stop(false);
+    return uri;
+  }
+
+  private void runTest(boolean local, TestFunction test) throws Exception {
+    Properties conf = createConf(local);
+    LivyClient client = null;
+    try {
+      test.config(conf);
+      client = new LivyClientBuilder(false).setURI(new URI("rsc:/"))
+        .setAll(conf)
+        .build();
+
+      // Wait for the context to be up before running the test.
+      assertNull(client.submit(new PingJob()).get(TIMEOUT, TimeUnit.SECONDS));
+
+      test.call(client);
+    } catch (Exception e) {
+      // JUnit prints not so useful backtraces in test summary reports, and we don't see the
+      // actual source line of the exception, so print the exception to the logs.
+      LOG.error("Test threw exception.", e);
+      throw e;
+    } finally {
+      if (client != null) {
+        client.stop(true);
+      }
+    }
+  }
+
+  /* Since it's hard to test a streaming context, test that a
+   * streaming context has been created. Also checks that improper
+   * sequence of streaming context calls (i.e create, stop, retrieve)
+   * result in a failure.
+   */
+  private static class SparkStreamingJob implements Job<Boolean> {
+    @Override
+    public Boolean call(JobContext jc) throws Exception {
+      try {
+        jc.streamingctx();
+        fail("Access before creation: Should throw IllegalStateException");
+      } catch (IllegalStateException ex) {
+        // Expected.
+      }
+      try {
+        jc.stopStreamingCtx();
+        fail("Stop before creation: Should throw IllegalStateException");
+      } catch (IllegalStateException ex) {
+        // Expected.
+      }
+      try {
+        jc.createStreamingContext(1000L);
+        JavaStreamingContext streamingContext = jc.streamingctx();
+        jc.stopStreamingCtx();
+        jc.streamingctx();
+        fail();
+      } catch (IllegalStateException ex) {
+        // Expected.
+      }
+
+      jc.createStreamingContext(1000L);
+      JavaStreamingContext streamingContext = jc.streamingctx();
+      jc.stopStreamingCtx();
+      return streamingContext != null;
+    }
+  }
+
+  private abstract static class TestFunction {
+    abstract void call(LivyClient client) throws Exception;
+    void config(Properties conf) { }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/test/java/org/apache/livy/rsc/rpc/TestKryoMessageCodec.java
----------------------------------------------------------------------
diff --git a/rsc/src/test/java/org/apache/livy/rsc/rpc/TestKryoMessageCodec.java b/rsc/src/test/java/org/apache/livy/rsc/rpc/TestKryoMessageCodec.java
new file mode 100644
index 0000000..a09ac43
--- /dev/null
+++ b/rsc/src/test/java/org/apache/livy/rsc/rpc/TestKryoMessageCodec.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.rsc.rpc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.logging.LoggingHandler;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestKryoMessageCodec {
+
+  private static final String MESSAGE = "Hello World!";
+
+  @Test
+  public void testKryoCodec() throws Exception {
+    List<Object> objects = encodeAndDecode(MESSAGE, null);
+    assertEquals(1, objects.size());
+    assertEquals(MESSAGE, objects.get(0));
+  }
+
+  @Test
+  public void testFragmentation() throws Exception {
+    ByteBuf buf = newBuffer();
+    Object[] messages = { "msg1", "msg2" };
+    int[] indices = new int[messages.length];
+
+    KryoMessageCodec codec = new KryoMessageCodec(0);
+
+    for (int i = 0; i < messages.length; i++) {
+      codec.encode(null, messages[i], buf);
+      indices[i] = buf.writerIndex();
+    }
+
+    List<Object> objects = new ArrayList<>();
+
+    // Don't read enough data for the first message to be decoded.
+    codec.decode(null, buf.slice(0, indices[0] - 1), objects);
+    assertEquals(0, objects.size());
+
+    // Read enough data for just the first message to be decoded.
+    codec.decode(null, buf.slice(0, indices[0] + 1), objects);
+    assertEquals(1, objects.size());
+  }
+
+  @Test
+  public void testEmbeddedChannel() throws Exception {
+    EmbeddedChannel c = new EmbeddedChannel(
+      new LoggingHandler(getClass()),
+      new KryoMessageCodec(0));
+    c.writeAndFlush(MESSAGE);
+    assertEquals(1, c.outboundMessages().size());
+    assertFalse(MESSAGE.getClass().equals(c.outboundMessages().peek().getClass()));
+    c.writeInbound(c.readOutbound());
+    assertEquals(1, c.inboundMessages().size());
+    assertEquals(MESSAGE, c.readInbound());
+    c.close();
+  }
+
+  @Test
+  public void testAutoRegistration() throws Exception {
+    KryoMessageCodec codec = new KryoMessageCodec(0, TestMessage.class);
+    ByteBuf buf = newBuffer();
+    codec.encode(null, new TestMessage(), buf);
+
+    List<Object> out = new ArrayList<>();
+    codec.decode(null, buf, out);
+
+    assertEquals(1, out.size());
+    assertTrue(out.get(0) instanceof TestMessage);
+  }
+
+  @Test
+  public void testMaxMessageSize() throws Exception {
+    KryoMessageCodec codec = new KryoMessageCodec(1024);
+    ByteBuf buf = newBuffer();
+    codec.encode(null, new TestMessage(new byte[512]), buf);
+
+    try {
+      codec.encode(null, new TestMessage(new byte[1025]), buf);
+      fail("Should have failed to encode large message.");
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getMessage().indexOf("maximum allowed size") > 0);
+    }
+
+    KryoMessageCodec unlimited = new KryoMessageCodec(0);
+    buf = newBuffer();
+    unlimited.encode(null, new TestMessage(new byte[1025]), buf);
+
+    try {
+      List<Object> out = new ArrayList<>();
+      codec.decode(null, buf, out);
+      fail("Should have failed to decode large message.");
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getMessage().indexOf("maximum allowed size") > 0);
+    }
+  }
+
+  @Test
+  public void testNegativeMessageSize() throws Exception {
+    KryoMessageCodec codec = new KryoMessageCodec(1024);
+    ByteBuf buf = newBuffer();
+    buf.writeInt(-1);
+
+    try {
+      List<Object> out = new ArrayList<>();
+      codec.decode(null, buf, out);
+      fail("Should have failed to decode message with negative size.");
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getMessage().indexOf("must be positive") > 0);
+    }
+  }
+
+  @Test
+  public void testEncryptionOnly() throws Exception {
+    List<Object> objects = Collections.<Object>emptyList();
+    try {
+      objects = encodeAndDecode(MESSAGE, new TestEncryptionHandler(true, false));
+    } catch (Exception e) {
+      // Pass.
+    }
+    // Do this check in case the ciphertext actually makes sense in some way.
+    for (Object msg : objects) {
+      assertFalse(MESSAGE.equals(objects.get(0)));
+    }
+  }
+
+  @Test
+  public void testDecryptionOnly() throws Exception {
+    List<Object> objects = Collections.<Object>emptyList();
+    try {
+      objects = encodeAndDecode(MESSAGE, new TestEncryptionHandler(false, true));
+    } catch (Exception e) {
+      // Pass.
+    }
+    // Do this check in case the decrypted plaintext actually makes sense in some way.
+    for (Object msg : objects) {
+      assertFalse(MESSAGE.equals(objects.get(0)));
+    }
+  }
+
+  @Test
+  public void testEncryptDecrypt() throws Exception {
+    List<Object> objects = encodeAndDecode(MESSAGE, new TestEncryptionHandler(true, true));
+    assertEquals(1, objects.size());
+    assertEquals(MESSAGE, objects.get(0));
+  }
+
+  private List<Object> encodeAndDecode(Object message, KryoMessageCodec.EncryptionHandler eh)
+      throws Exception {
+    ByteBuf buf = newBuffer();
+    KryoMessageCodec codec = new KryoMessageCodec(0);
+    codec.setEncryptionHandler(eh);
+    codec.encode(null, message, buf);
+
+    List<Object> objects = new ArrayList<>();
+    codec.decode(null, buf, objects);
+    return objects;
+  }
+
+  private ByteBuf newBuffer() {
+    return UnpooledByteBufAllocator.DEFAULT.buffer(1024);
+  }
+
+  private static class TestMessage {
+    byte[] data;
+
+    TestMessage() {
+      this(null);
+    }
+
+    TestMessage(byte[] data) {
+      this.data = data;
+    }
+  }
+
+  private static class TestEncryptionHandler implements KryoMessageCodec.EncryptionHandler {
+
+    private static final byte KEY = 0x42;
+
+    private final boolean encrypt;
+    private final boolean decrypt;
+
+    TestEncryptionHandler(boolean encrypt, boolean decrypt) {
+      this.encrypt = encrypt;
+      this.decrypt = decrypt;
+    }
+
+    public byte[] wrap(byte[] data, int offset, int len) throws IOException {
+      return encrypt ? transform(data, offset, len) : data;
+    }
+
+    public byte[] unwrap(byte[] data, int offset, int len) throws IOException {
+      return decrypt ? transform(data, offset, len) : data;
+    }
+
+    public void dispose() throws IOException {
+
+    }
+
+    private byte[] transform(byte[] data, int offset, int len) {
+      byte[] dest = new byte[len];
+      for (int i = 0; i < len; i++) {
+        dest[i] = (byte) (data[offset + i] ^ KEY);
+      }
+      return dest;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/test/java/org/apache/livy/rsc/rpc/TestRpc.java
----------------------------------------------------------------------
diff --git a/rsc/src/test/java/org/apache/livy/rsc/rpc/TestRpc.java b/rsc/src/test/java/org/apache/livy/rsc/rpc/TestRpc.java
new file mode 100644
index 0000000..8967906
--- /dev/null
+++ b/rsc/src/test/java/org/apache/livy/rsc/rpc/TestRpc.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.rsc.rpc;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.security.sasl.SaslException;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.util.concurrent.Future;
+import org.apache.commons.io.IOUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.livy.rsc.FutureListener;
+import org.apache.livy.rsc.RSCConf;
+import org.apache.livy.rsc.Utils;
+import static org.apache.livy.rsc.RSCConf.Entry.*;
+
+public class TestRpc {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestRpc.class);
+
+  private Collection<Closeable> closeables;
+  private RSCConf emptyConfig;
+
+  @Before
+  public void setUp() {
+    closeables = new ArrayList<>();
+    emptyConfig = new RSCConf(null);
+  }
+
+  @After
+  public void cleanUp() throws Exception {
+    for (Closeable c : closeables) {
+      IOUtils.closeQuietly(c);
+    }
+  }
+
+  private <T extends Closeable> T autoClose(T closeable) {
+    closeables.add(closeable);
+    return closeable;
+  }
+
+  @Test
+  public void testRpcDispatcher() throws Exception {
+    Rpc serverRpc = autoClose(Rpc.createEmbedded(new TestDispatcher()));
+    Rpc clientRpc = autoClose(Rpc.createEmbedded(new TestDispatcher()));
+
+    TestMessage outbound = new TestMessage("Hello World!");
+    Future<TestMessage> call = clientRpc.call(outbound, TestMessage.class);
+
+    LOG.debug("Transferring messages...");
+    transfer(serverRpc, clientRpc);
+
+    TestMessage reply = call.get(10, TimeUnit.SECONDS);
+    assertEquals(outbound.message, reply.message);
+  }
+
+  @Test
+  public void testClientServer() throws Exception {
+    RpcServer server = autoClose(new RpcServer(emptyConfig));
+    Rpc[] rpcs = createRpcConnection(server);
+    Rpc serverRpc = rpcs[0];
+    Rpc client = rpcs[1];
+
+    TestMessage outbound = new TestMessage("Hello World!");
+    Future<TestMessage> call = client.call(outbound, TestMessage.class);
+    TestMessage reply = call.get(10, TimeUnit.SECONDS);
+    assertEquals(outbound.message, reply.message);
+
+    TestMessage another = new TestMessage("Hello again!");
+    Future<TestMessage> anotherCall = client.call(another, TestMessage.class);
+    TestMessage anotherReply = anotherCall.get(10, TimeUnit.SECONDS);
+    assertEquals(another.message, anotherReply.message);
+
+    String errorMsg = "This is an error.";
+    try {
+      client.call(new ErrorCall(errorMsg)).get(10, TimeUnit.SECONDS);
+    } catch (ExecutionException ee) {
+      assertTrue(ee.getCause() instanceof RpcException);
+      assertTrue(ee.getCause().getMessage().indexOf(errorMsg) >= 0);
+    }
+
+    // Test from server to client too.
+    TestMessage serverMsg = new TestMessage("Hello from the server!");
+    Future<TestMessage> serverCall = serverRpc.call(serverMsg, TestMessage.class);
+    TestMessage serverReply = serverCall.get(10, TimeUnit.SECONDS);
+    assertEquals(serverMsg.message, serverReply.message);
+  }
+
+  @Test
+  public void testBadHello() throws Exception {
+    RpcServer server = autoClose(new RpcServer(emptyConfig));
+    RpcServer.ClientCallback callback = mock(RpcServer.ClientCallback.class);
+
+    server.registerClient("client", "newClient", callback);
+    Future<Rpc> clientRpcFuture = Rpc.createClient(emptyConfig, server.getEventLoopGroup(),
+        "localhost", server.getPort(), "client", "wrongClient", new TestDispatcher());
+
+    try {
+      autoClose(clientRpcFuture.get(10, TimeUnit.SECONDS));
+      fail("Should have failed to create client with wrong secret.");
+    } catch (ExecutionException ee) {
+      // On failure, the SASL handler will throw an exception indicating that the SASL
+      // negotiation failed.
+      assertTrue("Unexpected exception: " + ee.getCause(),
+        ee.getCause() instanceof SaslException);
+    }
+
+    verify(callback, never()).onNewClient(any(Rpc.class));
+  }
+
+  @Test
+  public void testCloseListener() throws Exception {
+    RpcServer server = autoClose(new RpcServer(emptyConfig));
+    Rpc[] rpcs = createRpcConnection(server);
+    Rpc client = rpcs[1];
+
+    final AtomicInteger closeCount = new AtomicInteger();
+    Utils.addListener(client.getChannel().closeFuture(), new FutureListener<Void>() {
+      @Override
+      public void onSuccess(Void unused) {
+        closeCount.incrementAndGet();
+      }
+    });
+
+    client.close();
+    client.close();
+    assertEquals(1, closeCount.get());
+  }
+
+  @Test
+  public void testNotDeserializableRpc() throws Exception {
+    RpcServer server = autoClose(new RpcServer(emptyConfig));
+    Rpc[] rpcs = createRpcConnection(server);
+    Rpc client = rpcs[1];
+
+    try {
+      client.call(new NotDeserializable(42)).get(10, TimeUnit.SECONDS);
+    } catch (ExecutionException ee) {
+      assertTrue(ee.getCause() instanceof RpcException);
+      assertTrue(ee.getCause().getMessage().indexOf("KryoException") >= 0);
+    }
+  }
+
+  @Test
+  public void testEncryption() throws Exception {
+    RSCConf eConf = new RSCConf(null)
+      .setAll(emptyConfig)
+      .set(SASL_QOP, Rpc.SASL_AUTH_CONF);
+    RpcServer server = autoClose(new RpcServer(eConf));
+    Rpc[] rpcs = createRpcConnection(server, eConf);
+    Rpc client = rpcs[1];
+
+    TestMessage outbound = new TestMessage("Hello World!");
+    Future<TestMessage> call = client.call(outbound, TestMessage.class);
+    TestMessage reply = call.get(10, TimeUnit.SECONDS);
+    assertEquals(outbound.message, reply.message);
+  }
+
+  @Test
+  public void testPortRange() throws Exception {
+    String portRange = "a~b";
+    emptyConfig.set(LAUNCHER_PORT_RANGE, portRange);
+    try {
+      autoClose(new RpcServer(emptyConfig));
+    } catch (Exception ee) {
+      assertTrue(ee instanceof NumberFormatException);
+    }
+    portRange = "11000";
+    emptyConfig.set(LAUNCHER_PORT_RANGE, portRange);
+    try {
+      autoClose(new RpcServer(emptyConfig));
+    } catch (Exception ee) {
+      assertTrue(ee instanceof ArrayIndexOutOfBoundsException);
+    }
+    portRange = "11000~11110";
+    emptyConfig.set(LAUNCHER_PORT_RANGE, portRange);
+    String [] portRangeData = portRange.split("~");
+    int startPort = Integer.parseInt(portRangeData[0]);
+    int endPort = Integer.parseInt(portRangeData[1]);
+    RpcServer server = autoClose(new RpcServer(emptyConfig));
+    assertTrue(startPort <= server.getPort() && server.getPort() <= endPort);
+  }
+
+  private void transfer(Rpc serverRpc, Rpc clientRpc) {
+    EmbeddedChannel client = (EmbeddedChannel) clientRpc.getChannel();
+    EmbeddedChannel server = (EmbeddedChannel) serverRpc.getChannel();
+
+    server.runPendingTasks();
+    client.runPendingTasks();
+
+    int count = 0;
+    while (!client.outboundMessages().isEmpty()) {
+      server.writeInbound(client.readOutbound());
+      count++;
+    }
+    server.flush();
+    LOG.debug("Transferred {} outbound client messages.", count);
+
+    count = 0;
+    while (!server.outboundMessages().isEmpty()) {
+      client.writeInbound(server.readOutbound());
+      count++;
+    }
+    client.flush();
+    LOG.debug("Transferred {} outbound server messages.", count);
+  }
+
+  /**
+   * Creates a client connection between the server and a client.
+   *
+   * @return two-tuple (server rpc, client rpc)
+   */
+  private Rpc[] createRpcConnection(RpcServer server) throws Exception {
+    return createRpcConnection(server, emptyConfig);
+  }
+
+  private Rpc[] createRpcConnection(RpcServer server, RSCConf clientConf)
+      throws Exception {
+    String secret = server.createSecret();
+    ServerRpcCallback callback = new ServerRpcCallback();
+    server.registerClient("client", secret, callback);
+
+    Future<Rpc> clientRpcFuture = Rpc.createClient(clientConf, server.getEventLoopGroup(),
+        "localhost", server.getPort(), "client", secret, new TestDispatcher());
+
+    assertTrue("onNewClient() wasn't called.",
+      callback.onNewClientCalled.await(10, TimeUnit.SECONDS));
+    assertTrue("onSaslComplete() wasn't called.",
+      callback.onSaslCompleteCalled.await(10, TimeUnit.SECONDS));
+    assertNotNull(callback.client);
+    Rpc serverRpc = autoClose(callback.client);
+    Rpc clientRpc = autoClose(clientRpcFuture.get(10, TimeUnit.SECONDS));
+    return new Rpc[] { serverRpc, clientRpc };
+  }
+
+  private static class ServerRpcCallback implements RpcServer.ClientCallback {
+    final CountDownLatch onNewClientCalled = new CountDownLatch(1);
+    final CountDownLatch onSaslCompleteCalled = new CountDownLatch(1);
+    Rpc client;
+
+    @Override
+    public RpcDispatcher onNewClient(Rpc client) {
+      this.client = client;
+      onNewClientCalled.countDown();
+      return new TestDispatcher();
+    }
+
+    @Override
+    public void onSaslComplete(Rpc client) {
+      onSaslCompleteCalled.countDown();
+    }
+
+  }
+
+  private static class TestMessage {
+
+    final String message;
+
+    public TestMessage() {
+      this(null);
+    }
+
+    public TestMessage(String message) {
+      this.message = message;
+    }
+
+  }
+
+  private static class ErrorCall {
+
+    final String error;
+
+    public ErrorCall() {
+      this(null);
+    }
+
+    public ErrorCall(String error) {
+      this.error = error;
+    }
+
+  }
+
+  private static class NotDeserializable {
+
+    NotDeserializable(int unused) {
+
+    }
+
+  }
+
+  private static class TestDispatcher extends RpcDispatcher {
+    protected TestMessage handle(ChannelHandlerContext ctx, TestMessage msg) {
+      return msg;
+    }
+
+    protected void handle(ChannelHandlerContext ctx, ErrorCall msg) {
+      throw new IllegalArgumentException(msg.error);
+    }
+
+    protected void handle(ChannelHandlerContext ctx, NotDeserializable msg) {
+      // No op. Shouldn't actually be called, if it is, the test will fail.
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/pom.xml
----------------------------------------------------------------------
diff --git a/scala-api/pom.xml b/scala-api/pom.xml
index e3e63b0..7e4a4b2 100644
--- a/scala-api/pom.xml
+++ b/scala-api/pom.xml
@@ -21,37 +21,37 @@
   <modelVersion>4.0.0</modelVersion>
 
   <parent>
-    <groupId>com.cloudera.livy</groupId>
+    <groupId>org.apache.livy</groupId>
     <artifactId>multi-scala-project-root</artifactId>
-    <version>0.4.0-SNAPSHOT</version>
+    <version>0.4.0-incubating-SNAPSHOT</version>
     <relativePath>../scala/pom.xml</relativePath>
   </parent>
 
   <artifactId>livy-scala-api-parent</artifactId>
-  <version>0.4.0-SNAPSHOT</version>
+  <version>0.4.0-incubating-SNAPSHOT</version>
   <packaging>pom</packaging>
 
   <dependencies>
     <dependency>
-      <groupId>com.cloudera.livy</groupId>
+      <groupId>org.apache.livy</groupId>
       <artifactId>livy-api</artifactId>
       <version>${project.version}</version>
       <scope>provided</scope>
     </dependency>
     <dependency>
-      <groupId>com.cloudera.livy</groupId>
+      <groupId>org.apache.livy</groupId>
       <artifactId>livy-rsc</artifactId>
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>com.cloudera.livy</groupId>
+      <groupId>org.apache.livy</groupId>
       <artifactId>livy-core_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>com.cloudera.livy</groupId>
+      <groupId>org.apache.livy</groupId>
       <artifactId>livy-core_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
       <type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/scala-2.10/pom.xml
----------------------------------------------------------------------
diff --git a/scala-api/scala-2.10/pom.xml b/scala-api/scala-2.10/pom.xml
index e509fe4..96f6bf8 100644
--- a/scala-api/scala-2.10/pom.xml
+++ b/scala-api/scala-2.10/pom.xml
@@ -17,15 +17,15 @@
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
-  <groupId>com.cloudera.livy</groupId>
+  <groupId>org.apache.livy</groupId>
   <artifactId>livy-scala-api_2.10</artifactId>
-  <version>0.4.0-SNAPSHOT</version>
+  <version>0.4.0-incubating-SNAPSHOT</version>
   <packaging>jar</packaging>
 
   <parent>
-    <groupId>com.cloudera.livy</groupId>
+    <groupId>org.apache.livy</groupId>
     <artifactId>livy-scala-api-parent</artifactId>
-    <version>0.4.0-SNAPSHOT</version>
+    <version>0.4.0-incubating-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/scala-2.11/pom.xml
----------------------------------------------------------------------
diff --git a/scala-api/scala-2.11/pom.xml b/scala-api/scala-2.11/pom.xml
index 1b0509a..3690347 100644
--- a/scala-api/scala-2.11/pom.xml
+++ b/scala-api/scala-2.11/pom.xml
@@ -17,15 +17,15 @@
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
-  <groupId>com.cloudera.livy</groupId>
+  <groupId>org.apache.livy</groupId>
   <artifactId>livy-scala-api_2.11</artifactId>
-  <version>0.4.0-SNAPSHOT</version>
+  <version>0.4.0-incubating-SNAPSHOT</version>
   <packaging>jar</packaging>
 
   <parent>
-    <groupId>com.cloudera.livy</groupId>
+    <groupId>org.apache.livy</groupId>
     <artifactId>livy-scala-api-parent</artifactId>
-    <version>0.4.0-SNAPSHOT</version>
+    <version>0.4.0-incubating-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>