You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/07/05 03:06:49 UTC
[20/33] incubator-livy git commit: LIVY-375. Change Livy code package
name to org.apache.livy
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/test/java/com/cloudera/livy/rsc/TestSparkClient.java
----------------------------------------------------------------------
diff --git a/rsc/src/test/java/com/cloudera/livy/rsc/TestSparkClient.java b/rsc/src/test/java/com/cloudera/livy/rsc/TestSparkClient.java
deleted file mode 100644
index 973d012..0000000
--- a/rsc/src/test/java/com/cloudera/livy/rsc/TestSparkClient.java
+++ /dev/null
@@ -1,533 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.jar.JarOutputStream;
-import java.util.zip.ZipEntry;
-
-import org.apache.spark.launcher.SparkLauncher;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobContext;
-import com.cloudera.livy.JobHandle;
-import com.cloudera.livy.LivyClient;
-import com.cloudera.livy.LivyClientBuilder;
-import com.cloudera.livy.client.common.Serializer;
-import com.cloudera.livy.rsc.rpc.RpcException;
-import com.cloudera.livy.test.jobs.Echo;
-import com.cloudera.livy.test.jobs.Failure;
-import com.cloudera.livy.test.jobs.FileReader;
-import com.cloudera.livy.test.jobs.GetCurrentUser;
-import com.cloudera.livy.test.jobs.SQLGetTweets;
-import com.cloudera.livy.test.jobs.Sleeper;
-import com.cloudera.livy.test.jobs.SmallCount;
-import static com.cloudera.livy.rsc.RSCConf.Entry.*;
-
-public class TestSparkClient {
-
- private static final Logger LOG = LoggerFactory.getLogger(TestSparkClient.class);
-
- // Timeouts are bad... mmmkay.
- private static final long TIMEOUT = 100;
-
- private Properties createConf(boolean local) {
- Properties conf = new Properties();
- if (local) {
- conf.put(CLIENT_IN_PROCESS.key(), "true");
- conf.put(SparkLauncher.SPARK_MASTER, "local");
- conf.put("spark.app.name", "SparkClientSuite Local App");
- } else {
- String classpath = System.getProperty("java.class.path");
- conf.put("spark.app.name", "SparkClientSuite Remote App");
- conf.put(SparkLauncher.DRIVER_MEMORY, "512m");
- conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, classpath);
- conf.put(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH, classpath);
- }
-
- conf.put(LIVY_JARS.key(), "");
- return conf;
- }
-
- @Test
- public void testJobSubmission() throws Exception {
- runTest(true, new TestFunction() {
- @Override
- public void call(LivyClient client) throws Exception {
- JobHandle.Listener<String> listener = newListener();
- JobHandle<String> handle = client.submit(new Echo<>("hello"));
- handle.addListener(listener);
- assertEquals("hello", handle.get(TIMEOUT, TimeUnit.SECONDS));
-
- // Try an invalid state transition on the handle. This ensures that the actual state
- // change we're interested in actually happened, since internally the handle serializes
- // state changes.
- assertFalse(((JobHandleImpl<String>)handle).changeState(JobHandle.State.SENT));
-
- verify(listener).onJobStarted(handle);
- verify(listener).onJobSucceeded(same(handle), eq(handle.get()));
- }
- });
- }
-
- @Test
- public void testSimpleSparkJob() throws Exception {
- runTest(true, new TestFunction() {
- @Override
- public void call(LivyClient client) throws Exception {
- JobHandle<Long> handle = client.submit(new SmallCount(5));
- assertEquals(Long.valueOf(5L), handle.get(TIMEOUT, TimeUnit.SECONDS));
- }
- });
- }
-
- @Test
- public void testJobFailure() throws Exception {
- runTest(true, new TestFunction() {
- @Override
- public void call(LivyClient client) throws Exception {
- JobHandle.Listener<Void> listener = newListener();
- JobHandle<Void> handle = client.submit(new Failure());
- handle.addListener(listener);
- try {
- handle.get(TIMEOUT, TimeUnit.SECONDS);
- fail("Should have thrown an exception.");
- } catch (ExecutionException ee) {
- assertTrue(ee.getCause().getMessage().contains(
- Failure.JobFailureException.class.getName()));
- }
-
- // Try an invalid state transition on the handle. This ensures that the actual state
- // change we're interested in actually happened, since internally the handle serializes
- // state changes.
- assertFalse(((JobHandleImpl<Void>)handle).changeState(JobHandle.State.SENT));
-
- verify(listener).onJobStarted(handle);
- verify(listener).onJobFailed(same(handle), any(Throwable.class));
- }
- });
- }
-
- @Test
- public void testSyncRpc() throws Exception {
- runTest(true, new TestFunction() {
- @Override
- public void call(LivyClient client) throws Exception {
- Future<String> result = client.run(new Echo<>("Hello"));
- assertEquals("Hello", result.get(TIMEOUT, TimeUnit.SECONDS));
- }
- });
- }
-
- @Test
- public void testRemoteClient() throws Exception {
- runTest(false, new TestFunction() {
- @Override
- public void call(LivyClient client) throws Exception {
- JobHandle<Long> handle = client.submit(new SmallCount(5));
- assertEquals(Long.valueOf(5L), handle.get(TIMEOUT, TimeUnit.SECONDS));
- }
- });
- }
-
- @Test
- public void testAddJarsAndFiles() throws Exception {
- runTest(true, new TestFunction() {
- @Override
- public void call(LivyClient client) throws Exception {
- File jar = null;
- File file = null;
-
- try {
- // Test that adding a jar to the remote context makes it show up in the classpath.
- jar = File.createTempFile("test", ".jar");
-
- JarOutputStream jarFile = new JarOutputStream(new FileOutputStream(jar));
- jarFile.putNextEntry(new ZipEntry("test.resource"));
- jarFile.write("test resource".getBytes("UTF-8"));
- jarFile.closeEntry();
- jarFile.close();
-
- client.addJar(new URI("file:" + jar.getAbsolutePath()))
- .get(TIMEOUT, TimeUnit.SECONDS);
-
- // Need to run a Spark job to make sure the jar is added to the class loader. Monitoring
- // SparkContext#addJar() doesn't mean much, we can only be sure jars have been distributed
- // when we run a task after the jar has been added.
- String result = client.submit(new FileReader("test.resource", true))
- .get(TIMEOUT, TimeUnit.SECONDS);
- assertEquals("test resource", result);
-
- // Test that adding a file to the remote context makes it available to executors.
- file = File.createTempFile("test", ".file");
-
- FileOutputStream fileStream = new FileOutputStream(file);
- fileStream.write("test file".getBytes("UTF-8"));
- fileStream.close();
-
- client.addJar(new URI("file:" + file.getAbsolutePath()))
- .get(TIMEOUT, TimeUnit.SECONDS);
-
- // The same applies to files added with "addFile". They're only guaranteed to be available
- // to tasks started after the addFile() call completes.
- result = client.submit(new FileReader(file.getName(), false))
- .get(TIMEOUT, TimeUnit.SECONDS);
- assertEquals("test file", result);
- } finally {
- if (jar != null) {
- jar.delete();
- }
- if (file != null) {
- file.delete();
- }
- }
- }
- });
- }
-
- @Test
- public void testSparkSQLJob() throws Exception {
- runTest(true, new TestFunction() {
- @Override
- void call(LivyClient client) throws Exception {
- JobHandle<List<String>> handle = client.submit(new SQLGetTweets(false));
- List<String> topTweets = handle.get(TIMEOUT, TimeUnit.SECONDS);
- assertEquals(1, topTweets.size());
- assertEquals("[Adventures With Coffee, Code, and Writing.,0]",
- topTweets.get(0));
- }
- });
- }
-
- @Test
- public void testHiveJob() throws Exception {
- runTest(true, new TestFunction() {
- @Override
- void call(LivyClient client) throws Exception {
- JobHandle<List<String>> handle = client.submit(new SQLGetTweets(true));
- List<String> topTweets = handle.get(TIMEOUT, TimeUnit.SECONDS);
- assertEquals(1, topTweets.size());
- assertEquals("[Adventures With Coffee, Code, and Writing.,0]",
- topTweets.get(0));
- }
- });
- }
-
- @Test
- public void testStreamingContext() throws Exception {
- runTest(true, new TestFunction() {
- @Override
- void call(LivyClient client) throws Exception {
- JobHandle<Boolean> handle = client.submit(new SparkStreamingJob());
- Boolean streamingContextCreated = handle.get(TIMEOUT, TimeUnit.SECONDS);
- assertEquals(true, streamingContextCreated);
- }
- });
- }
-
- @Test
- public void testImpersonation() throws Exception {
- final String PROXY = "__proxy__";
-
- runTest(false, new TestFunction() {
- @Override
- void config(Properties conf) {
- conf.put(RSCConf.Entry.PROXY_USER.key(), PROXY);
- }
-
- @Override
- void call(LivyClient client) throws Exception {
- JobHandle<String> handle = client.submit(new GetCurrentUser());
- String userName = handle.get(TIMEOUT, TimeUnit.SECONDS);
- assertEquals(PROXY, userName);
- }
- });
- }
-
- @Test
- public void testConnectToRunningContext() throws Exception {
- runTest(false, new TestFunction() {
- @Override
- void call(LivyClient client) throws Exception {
- URI uri = disconnectClient(client);
-
- // If this tries to create a new context, it will fail because it's missing the
- // needed configuration from createConf().
- LivyClient newClient = new LivyClientBuilder()
- .setURI(uri)
- .build();
-
- try {
- JobHandle<String> handle = newClient.submit(new Echo<>("hello"));
- String result = handle.get(TIMEOUT, TimeUnit.SECONDS);
- assertEquals("hello", result);
- } finally {
- newClient.stop(true);
- }
- }
- });
- }
-
- @Test
- public void testServerIdleTimeout() throws Exception {
- runTest(true, new TestFunction() {
- @Override
- void call(LivyClient client) throws Exception {
- // Close the old client and wait a couple of seconds for the timeout to trigger.
- URI uri = disconnectClient(client);
- TimeUnit.SECONDS.sleep(2);
-
- // Try to connect back with a new client, it should fail. Since there's no API to monitor
- // the connection state, we try to enqueue a long-running job and make sure that it fails,
- // in case the connection actually goes through.
- try {
- LivyClient newClient = new LivyClientBuilder()
- .setURI(uri)
- .build();
-
- try {
- newClient.submit(new Sleeper(TimeUnit.SECONDS.toMillis(TIMEOUT)))
- .get(TIMEOUT, TimeUnit.SECONDS);
- } catch (TimeoutException te) {
- // Shouldn't have gotten here, but catch this so that we stop the client.
- newClient.stop(true);
- }
- fail("Should have failed to contact RSC after idle timeout.");
- } catch (Exception e) {
- // Expected.
- }
- }
-
- @Override
- void config(Properties conf) {
- conf.setProperty(SERVER_IDLE_TIMEOUT.key(), "1s");
- }
- });
- }
-
- @Test
- public void testKillServerWhileSparkSubmitIsRunning() throws Exception {
- Properties conf = createConf(true);
- LivyClient client = null;
- PipedInputStream stubStream = new PipedInputStream(new PipedOutputStream());
- try {
- Process mockSparkSubmit = mock(Process.class);
- when(mockSparkSubmit.getInputStream()).thenReturn(stubStream);
- when(mockSparkSubmit.getErrorStream()).thenReturn(stubStream);
-
- // Block waitFor until process.destroy() is called.
- final CountDownLatch waitForCalled = new CountDownLatch(1);
- when(mockSparkSubmit.waitFor()).thenAnswer(new Answer<Integer>() {
- @Override
- public Integer answer(InvocationOnMock invocation) throws Throwable {
- waitForCalled.await();
- return 0;
- }
- });
-
- // Verify process.destroy() is called.
- final CountDownLatch destroyCalled = new CountDownLatch(1);
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- destroyCalled.countDown();
- return null;
- }
- }).when(mockSparkSubmit).destroy();
-
- ContextLauncher.mockSparkSubmit = mockSparkSubmit;
-
- client = new LivyClientBuilder(false).setURI(new URI("rsc:/"))
- .setAll(conf)
- .build();
-
- client.stop(true);
-
- assertTrue(destroyCalled.await(5, TimeUnit.SECONDS));
- waitForCalled.countDown();
- } catch (Exception e) {
- // JUnit prints not so useful backtraces in test summary reports, and we don't see the
- // actual source line of the exception, so print the exception to the logs.
- LOG.error("Test threw exception.", e);
- throw e;
- } finally {
- ContextLauncher.mockSparkSubmit = null;
- stubStream.close();
- if (client != null) {
- client.stop(true);
- }
- }
- }
-
- @Test
- public void testBypass() throws Exception {
- runBypassTest(false);
- }
-
- @Test
- public void testBypassSync() throws Exception {
- runBypassTest(true);
- }
-
- private void runBypassTest(final boolean sync) throws Exception {
- runTest(true, new TestFunction() {
- @Override
- public void call(LivyClient client) throws Exception {
- Serializer s = new Serializer();
- RSCClient lclient = (RSCClient) client;
- ByteBuffer job = s.serialize(new Echo<>("hello"));
- String jobId = lclient.bypass(job, sync);
-
- // Try to fetch the result, trying several times until the timeout runs out, and
- // backing off as attempts fail.
- long deadline = System.nanoTime() + TimeUnit.NANOSECONDS.convert(TIMEOUT, TimeUnit.SECONDS);
- long sleep = 100;
- BypassJobStatus status = null;
- while (System.nanoTime() < deadline) {
- BypassJobStatus currStatus = lclient.getBypassJobStatus(jobId).get(TIMEOUT,
- TimeUnit.SECONDS);
- assertNotEquals(JobHandle.State.CANCELLED, currStatus.state);
- assertNotEquals(JobHandle.State.FAILED, currStatus.state);
- if (currStatus.state.equals(JobHandle.State.SUCCEEDED)) {
- status = currStatus;
- break;
- } else if (deadline - System.nanoTime() > sleep * 2) {
- Thread.sleep(sleep);
- sleep *= 2;
- }
- }
- assertNotNull("Failed to fetch bypass job status.", status);
- assertEquals(JobHandle.State.SUCCEEDED, status.state);
-
- String resultVal = (String) s.deserialize(ByteBuffer.wrap(status.result));
- assertEquals("hello", resultVal);
-
- // After the result is retrieved, the driver should stop tracking the job and release
- // resources associated with it.
- try {
- lclient.getBypassJobStatus(jobId).get(TIMEOUT, TimeUnit.SECONDS);
- fail("Should have failed to retrieve status of released job.");
- } catch (ExecutionException ee) {
- assertTrue(ee.getCause() instanceof RpcException);
- assertTrue(ee.getCause().getMessage().contains(
- "java.util.NoSuchElementException: " + jobId));
- }
- }
- });
- }
-
- private <T> JobHandle.Listener<T> newListener() {
- @SuppressWarnings("unchecked")
- JobHandle.Listener<T> listener =
- (JobHandle.Listener<T>) mock(JobHandle.Listener.class);
- return listener;
- }
-
- private URI disconnectClient(LivyClient client) throws Exception {
- ContextInfo ctx = ((RSCClient) client).getContextInfo();
- URI uri = new URI(String.format("rsc://%s:%s@%s:%d", ctx.clientId, ctx.secret,
- ctx.remoteAddress, ctx.remotePort));
-
- // Close the old client and wait a couple of seconds for the timeout to trigger.
- client.stop(false);
- return uri;
- }
-
- private void runTest(boolean local, TestFunction test) throws Exception {
- Properties conf = createConf(local);
- LivyClient client = null;
- try {
- test.config(conf);
- client = new LivyClientBuilder(false).setURI(new URI("rsc:/"))
- .setAll(conf)
- .build();
-
- // Wait for the context to be up before running the test.
- assertNull(client.submit(new PingJob()).get(TIMEOUT, TimeUnit.SECONDS));
-
- test.call(client);
- } catch (Exception e) {
- // JUnit prints not so useful backtraces in test summary reports, and we don't see the
- // actual source line of the exception, so print the exception to the logs.
- LOG.error("Test threw exception.", e);
- throw e;
- } finally {
- if (client != null) {
- client.stop(true);
- }
- }
- }
-
- /* Since it's hard to test a streaming context, test that a
- * streaming context has been created. Also checks that improper
- * sequence of streaming context calls (i.e create, stop, retrieve)
- * result in a failure.
- */
- private static class SparkStreamingJob implements Job<Boolean> {
- @Override
- public Boolean call(JobContext jc) throws Exception {
- try {
- jc.streamingctx();
- fail("Access before creation: Should throw IllegalStateException");
- } catch (IllegalStateException ex) {
- // Expected.
- }
- try {
- jc.stopStreamingCtx();
- fail("Stop before creation: Should throw IllegalStateException");
- } catch (IllegalStateException ex) {
- // Expected.
- }
- try {
- jc.createStreamingContext(1000L);
- JavaStreamingContext streamingContext = jc.streamingctx();
- jc.stopStreamingCtx();
- jc.streamingctx();
- fail();
- } catch (IllegalStateException ex) {
- // Expected.
- }
-
- jc.createStreamingContext(1000L);
- JavaStreamingContext streamingContext = jc.streamingctx();
- jc.stopStreamingCtx();
- return streamingContext != null;
- }
- }
-
- private abstract static class TestFunction {
- abstract void call(LivyClient client) throws Exception;
- void config(Properties conf) { }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestKryoMessageCodec.java
----------------------------------------------------------------------
diff --git a/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestKryoMessageCodec.java b/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestKryoMessageCodec.java
deleted file mode 100644
index a8ede98..0000000
--- a/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestKryoMessageCodec.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc.rpc;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.UnpooledByteBufAllocator;
-import io.netty.channel.embedded.EmbeddedChannel;
-import io.netty.handler.logging.LoggingHandler;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-public class TestKryoMessageCodec {
-
- private static final String MESSAGE = "Hello World!";
-
- @Test
- public void testKryoCodec() throws Exception {
- List<Object> objects = encodeAndDecode(MESSAGE, null);
- assertEquals(1, objects.size());
- assertEquals(MESSAGE, objects.get(0));
- }
-
- @Test
- public void testFragmentation() throws Exception {
- ByteBuf buf = newBuffer();
- Object[] messages = { "msg1", "msg2" };
- int[] indices = new int[messages.length];
-
- KryoMessageCodec codec = new KryoMessageCodec(0);
-
- for (int i = 0; i < messages.length; i++) {
- codec.encode(null, messages[i], buf);
- indices[i] = buf.writerIndex();
- }
-
- List<Object> objects = new ArrayList<>();
-
- // Don't read enough data for the first message to be decoded.
- codec.decode(null, buf.slice(0, indices[0] - 1), objects);
- assertEquals(0, objects.size());
-
- // Read enough data for just the first message to be decoded.
- codec.decode(null, buf.slice(0, indices[0] + 1), objects);
- assertEquals(1, objects.size());
- }
-
- @Test
- public void testEmbeddedChannel() throws Exception {
- EmbeddedChannel c = new EmbeddedChannel(
- new LoggingHandler(getClass()),
- new KryoMessageCodec(0));
- c.writeAndFlush(MESSAGE);
- assertEquals(1, c.outboundMessages().size());
- assertFalse(MESSAGE.getClass().equals(c.outboundMessages().peek().getClass()));
- c.writeInbound(c.readOutbound());
- assertEquals(1, c.inboundMessages().size());
- assertEquals(MESSAGE, c.readInbound());
- c.close();
- }
-
- @Test
- public void testAutoRegistration() throws Exception {
- KryoMessageCodec codec = new KryoMessageCodec(0, TestMessage.class);
- ByteBuf buf = newBuffer();
- codec.encode(null, new TestMessage(), buf);
-
- List<Object> out = new ArrayList<>();
- codec.decode(null, buf, out);
-
- assertEquals(1, out.size());
- assertTrue(out.get(0) instanceof TestMessage);
- }
-
- @Test
- public void testMaxMessageSize() throws Exception {
- KryoMessageCodec codec = new KryoMessageCodec(1024);
- ByteBuf buf = newBuffer();
- codec.encode(null, new TestMessage(new byte[512]), buf);
-
- try {
- codec.encode(null, new TestMessage(new byte[1025]), buf);
- fail("Should have failed to encode large message.");
- } catch (IllegalArgumentException e) {
- assertTrue(e.getMessage().indexOf("maximum allowed size") > 0);
- }
-
- KryoMessageCodec unlimited = new KryoMessageCodec(0);
- buf = newBuffer();
- unlimited.encode(null, new TestMessage(new byte[1025]), buf);
-
- try {
- List<Object> out = new ArrayList<>();
- codec.decode(null, buf, out);
- fail("Should have failed to decode large message.");
- } catch (IllegalArgumentException e) {
- assertTrue(e.getMessage().indexOf("maximum allowed size") > 0);
- }
- }
-
- @Test
- public void testNegativeMessageSize() throws Exception {
- KryoMessageCodec codec = new KryoMessageCodec(1024);
- ByteBuf buf = newBuffer();
- buf.writeInt(-1);
-
- try {
- List<Object> out = new ArrayList<>();
- codec.decode(null, buf, out);
- fail("Should have failed to decode message with negative size.");
- } catch (IllegalArgumentException e) {
- assertTrue(e.getMessage().indexOf("must be positive") > 0);
- }
- }
-
- @Test
- public void testEncryptionOnly() throws Exception {
- List<Object> objects = Collections.<Object>emptyList();
- try {
- objects = encodeAndDecode(MESSAGE, new TestEncryptionHandler(true, false));
- } catch (Exception e) {
- // Pass.
- }
- // Do this check in case the ciphertext actually makes sense in some way.
- for (Object msg : objects) {
- assertFalse(MESSAGE.equals(objects.get(0)));
- }
- }
-
- @Test
- public void testDecryptionOnly() throws Exception {
- List<Object> objects = Collections.<Object>emptyList();
- try {
- objects = encodeAndDecode(MESSAGE, new TestEncryptionHandler(false, true));
- } catch (Exception e) {
- // Pass.
- }
- // Do this check in case the decrypted plaintext actually makes sense in some way.
- for (Object msg : objects) {
- assertFalse(MESSAGE.equals(objects.get(0)));
- }
- }
-
- @Test
- public void testEncryptDecrypt() throws Exception {
- List<Object> objects = encodeAndDecode(MESSAGE, new TestEncryptionHandler(true, true));
- assertEquals(1, objects.size());
- assertEquals(MESSAGE, objects.get(0));
- }
-
- private List<Object> encodeAndDecode(Object message, KryoMessageCodec.EncryptionHandler eh)
- throws Exception {
- ByteBuf buf = newBuffer();
- KryoMessageCodec codec = new KryoMessageCodec(0);
- codec.setEncryptionHandler(eh);
- codec.encode(null, message, buf);
-
- List<Object> objects = new ArrayList<>();
- codec.decode(null, buf, objects);
- return objects;
- }
-
- private ByteBuf newBuffer() {
- return UnpooledByteBufAllocator.DEFAULT.buffer(1024);
- }
-
- private static class TestMessage {
- byte[] data;
-
- TestMessage() {
- this(null);
- }
-
- TestMessage(byte[] data) {
- this.data = data;
- }
- }
-
- private static class TestEncryptionHandler implements KryoMessageCodec.EncryptionHandler {
-
- private static final byte KEY = 0x42;
-
- private final boolean encrypt;
- private final boolean decrypt;
-
- TestEncryptionHandler(boolean encrypt, boolean decrypt) {
- this.encrypt = encrypt;
- this.decrypt = decrypt;
- }
-
- public byte[] wrap(byte[] data, int offset, int len) throws IOException {
- return encrypt ? transform(data, offset, len) : data;
- }
-
- public byte[] unwrap(byte[] data, int offset, int len) throws IOException {
- return decrypt ? transform(data, offset, len) : data;
- }
-
- public void dispose() throws IOException {
-
- }
-
- private byte[] transform(byte[] data, int offset, int len) {
- byte[] dest = new byte[len];
- for (int i = 0; i < len; i++) {
- dest[i] = (byte) (data[offset + i] ^ KEY);
- }
- return dest;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestRpc.java
----------------------------------------------------------------------
diff --git a/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestRpc.java b/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestRpc.java
deleted file mode 100644
index 48abe94..0000000
--- a/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestRpc.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc.rpc;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.SocketException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.security.sasl.SaslException;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.embedded.EmbeddedChannel;
-import io.netty.util.concurrent.Future;
-import org.apache.commons.io.IOUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-import com.cloudera.livy.rsc.FutureListener;
-import com.cloudera.livy.rsc.RSCConf;
-import com.cloudera.livy.rsc.Utils;
-import static com.cloudera.livy.rsc.RSCConf.Entry.*;
-
-public class TestRpc {
-
- private static final Logger LOG = LoggerFactory.getLogger(TestRpc.class);
-
- private Collection<Closeable> closeables;
- private RSCConf emptyConfig;
-
- @Before
- public void setUp() {
- closeables = new ArrayList<>();
- emptyConfig = new RSCConf(null);
- }
-
- @After
- public void cleanUp() throws Exception {
- for (Closeable c : closeables) {
- IOUtils.closeQuietly(c);
- }
- }
-
- private <T extends Closeable> T autoClose(T closeable) {
- closeables.add(closeable);
- return closeable;
- }
-
- @Test
- public void testRpcDispatcher() throws Exception {
- Rpc serverRpc = autoClose(Rpc.createEmbedded(new TestDispatcher()));
- Rpc clientRpc = autoClose(Rpc.createEmbedded(new TestDispatcher()));
-
- TestMessage outbound = new TestMessage("Hello World!");
- Future<TestMessage> call = clientRpc.call(outbound, TestMessage.class);
-
- LOG.debug("Transferring messages...");
- transfer(serverRpc, clientRpc);
-
- TestMessage reply = call.get(10, TimeUnit.SECONDS);
- assertEquals(outbound.message, reply.message);
- }
-
- @Test
- public void testClientServer() throws Exception {
- RpcServer server = autoClose(new RpcServer(emptyConfig));
- Rpc[] rpcs = createRpcConnection(server);
- Rpc serverRpc = rpcs[0];
- Rpc client = rpcs[1];
-
- TestMessage outbound = new TestMessage("Hello World!");
- Future<TestMessage> call = client.call(outbound, TestMessage.class);
- TestMessage reply = call.get(10, TimeUnit.SECONDS);
- assertEquals(outbound.message, reply.message);
-
- TestMessage another = new TestMessage("Hello again!");
- Future<TestMessage> anotherCall = client.call(another, TestMessage.class);
- TestMessage anotherReply = anotherCall.get(10, TimeUnit.SECONDS);
- assertEquals(another.message, anotherReply.message);
-
- String errorMsg = "This is an error.";
- try {
- client.call(new ErrorCall(errorMsg)).get(10, TimeUnit.SECONDS);
- } catch (ExecutionException ee) {
- assertTrue(ee.getCause() instanceof RpcException);
- assertTrue(ee.getCause().getMessage().indexOf(errorMsg) >= 0);
- }
-
- // Test from server to client too.
- TestMessage serverMsg = new TestMessage("Hello from the server!");
- Future<TestMessage> serverCall = serverRpc.call(serverMsg, TestMessage.class);
- TestMessage serverReply = serverCall.get(10, TimeUnit.SECONDS);
- assertEquals(serverMsg.message, serverReply.message);
- }
-
- @Test
- public void testBadHello() throws Exception {
- RpcServer server = autoClose(new RpcServer(emptyConfig));
- RpcServer.ClientCallback callback = mock(RpcServer.ClientCallback.class);
-
- server.registerClient("client", "newClient", callback);
- Future<Rpc> clientRpcFuture = Rpc.createClient(emptyConfig, server.getEventLoopGroup(),
- "localhost", server.getPort(), "client", "wrongClient", new TestDispatcher());
-
- try {
- autoClose(clientRpcFuture.get(10, TimeUnit.SECONDS));
- fail("Should have failed to create client with wrong secret.");
- } catch (ExecutionException ee) {
- // On failure, the SASL handler will throw an exception indicating that the SASL
- // negotiation failed.
- assertTrue("Unexpected exception: " + ee.getCause(),
- ee.getCause() instanceof SaslException);
- }
-
- verify(callback, never()).onNewClient(any(Rpc.class));
- }
-
- @Test
- public void testCloseListener() throws Exception {
- RpcServer server = autoClose(new RpcServer(emptyConfig));
- Rpc[] rpcs = createRpcConnection(server);
- Rpc client = rpcs[1];
-
- final AtomicInteger closeCount = new AtomicInteger();
- Utils.addListener(client.getChannel().closeFuture(), new FutureListener<Void>() {
- @Override
- public void onSuccess(Void unused) {
- closeCount.incrementAndGet();
- }
- });
-
- client.close();
- client.close();
- assertEquals(1, closeCount.get());
- }
-
- @Test
- public void testNotDeserializableRpc() throws Exception {
- RpcServer server = autoClose(new RpcServer(emptyConfig));
- Rpc[] rpcs = createRpcConnection(server);
- Rpc client = rpcs[1];
-
- try {
- client.call(new NotDeserializable(42)).get(10, TimeUnit.SECONDS);
- } catch (ExecutionException ee) {
- assertTrue(ee.getCause() instanceof RpcException);
- assertTrue(ee.getCause().getMessage().indexOf("KryoException") >= 0);
- }
- }
-
- @Test
- public void testEncryption() throws Exception {
- RSCConf eConf = new RSCConf(null)
- .setAll(emptyConfig)
- .set(SASL_QOP, Rpc.SASL_AUTH_CONF);
- RpcServer server = autoClose(new RpcServer(eConf));
- Rpc[] rpcs = createRpcConnection(server, eConf);
- Rpc client = rpcs[1];
-
- TestMessage outbound = new TestMessage("Hello World!");
- Future<TestMessage> call = client.call(outbound, TestMessage.class);
- TestMessage reply = call.get(10, TimeUnit.SECONDS);
- assertEquals(outbound.message, reply.message);
- }
-
- @Test
- public void testPortRange() throws Exception {
- String portRange = "a~b";
- emptyConfig.set(LAUNCHER_PORT_RANGE, portRange);
- try {
- autoClose(new RpcServer(emptyConfig));
- } catch (Exception ee) {
- assertTrue(ee instanceof NumberFormatException);
- }
- portRange = "11000";
- emptyConfig.set(LAUNCHER_PORT_RANGE, portRange);
- try {
- autoClose(new RpcServer(emptyConfig));
- } catch (Exception ee) {
- assertTrue(ee instanceof ArrayIndexOutOfBoundsException);
- }
- portRange = "11000~11110";
- emptyConfig.set(LAUNCHER_PORT_RANGE, portRange);
- String [] portRangeData = portRange.split("~");
- int startPort = Integer.parseInt(portRangeData[0]);
- int endPort = Integer.parseInt(portRangeData[1]);
- RpcServer server = autoClose(new RpcServer(emptyConfig));
- assertTrue(startPort <= server.getPort() && server.getPort() <= endPort);
- }
-
- private void transfer(Rpc serverRpc, Rpc clientRpc) {
- EmbeddedChannel client = (EmbeddedChannel) clientRpc.getChannel();
- EmbeddedChannel server = (EmbeddedChannel) serverRpc.getChannel();
-
- server.runPendingTasks();
- client.runPendingTasks();
-
- int count = 0;
- while (!client.outboundMessages().isEmpty()) {
- server.writeInbound(client.readOutbound());
- count++;
- }
- server.flush();
- LOG.debug("Transferred {} outbound client messages.", count);
-
- count = 0;
- while (!server.outboundMessages().isEmpty()) {
- client.writeInbound(server.readOutbound());
- count++;
- }
- client.flush();
- LOG.debug("Transferred {} outbound server messages.", count);
- }
-
- /**
- * Creates a client connection between the server and a client.
- *
- * @return two-tuple (server rpc, client rpc)
- */
- private Rpc[] createRpcConnection(RpcServer server) throws Exception {
- return createRpcConnection(server, emptyConfig);
- }
-
- private Rpc[] createRpcConnection(RpcServer server, RSCConf clientConf)
- throws Exception {
- String secret = server.createSecret();
- ServerRpcCallback callback = new ServerRpcCallback();
- server.registerClient("client", secret, callback);
-
- Future<Rpc> clientRpcFuture = Rpc.createClient(clientConf, server.getEventLoopGroup(),
- "localhost", server.getPort(), "client", secret, new TestDispatcher());
-
- assertTrue("onNewClient() wasn't called.",
- callback.onNewClientCalled.await(10, TimeUnit.SECONDS));
- assertTrue("onSaslComplete() wasn't called.",
- callback.onSaslCompleteCalled.await(10, TimeUnit.SECONDS));
- assertNotNull(callback.client);
- Rpc serverRpc = autoClose(callback.client);
- Rpc clientRpc = autoClose(clientRpcFuture.get(10, TimeUnit.SECONDS));
- return new Rpc[] { serverRpc, clientRpc };
- }
-
- private static class ServerRpcCallback implements RpcServer.ClientCallback {
- final CountDownLatch onNewClientCalled = new CountDownLatch(1);
- final CountDownLatch onSaslCompleteCalled = new CountDownLatch(1);
- Rpc client;
-
- @Override
- public RpcDispatcher onNewClient(Rpc client) {
- this.client = client;
- onNewClientCalled.countDown();
- return new TestDispatcher();
- }
-
- @Override
- public void onSaslComplete(Rpc client) {
- onSaslCompleteCalled.countDown();
- }
-
- }
-
- private static class TestMessage {
-
- final String message;
-
- public TestMessage() {
- this(null);
- }
-
- public TestMessage(String message) {
- this.message = message;
- }
-
- }
-
- private static class ErrorCall {
-
- final String error;
-
- public ErrorCall() {
- this(null);
- }
-
- public ErrorCall(String error) {
- this.error = error;
- }
-
- }
-
- private static class NotDeserializable {
-
- NotDeserializable(int unused) {
-
- }
-
- }
-
- private static class TestDispatcher extends RpcDispatcher {
- protected TestMessage handle(ChannelHandlerContext ctx, TestMessage msg) {
- return msg;
- }
-
- protected void handle(ChannelHandlerContext ctx, ErrorCall msg) {
- throw new IllegalArgumentException(msg.error);
- }
-
- protected void handle(ChannelHandlerContext ctx, NotDeserializable msg) {
- // No op. Shouldn't actually be called, if it is, the test will fail.
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/test/java/org/apache/livy/rsc/TestJobHandle.java
----------------------------------------------------------------------
diff --git a/rsc/src/test/java/org/apache/livy/rsc/TestJobHandle.java b/rsc/src/test/java/org/apache/livy/rsc/TestJobHandle.java
new file mode 100644
index 0000000..e6161ed
--- /dev/null
+++ b/rsc/src/test/java/org/apache/livy/rsc/TestJobHandle.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.rsc;
+
+import io.netty.util.concurrent.Promise;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.livy.JobHandle;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestJobHandle {
+
+ @Mock private RSCClient client;
+ @Mock private Promise<Object> promise;
+ @Mock private JobHandle.Listener<Object> listener;
+ @Mock private JobHandle.Listener<Object> listener2;
+
+ @Test
+ public void testStateChanges() throws Exception {
+ JobHandleImpl<Object> handle = new JobHandleImpl<Object>(client, promise, "job");
+ handle.addListener(listener);
+
+ assertTrue(handle.changeState(JobHandle.State.QUEUED));
+ verify(listener).onJobQueued(handle);
+
+ assertTrue(handle.changeState(JobHandle.State.STARTED));
+ verify(listener).onJobStarted(handle);
+
+ assertTrue(handle.changeState(JobHandle.State.CANCELLED));
+ verify(listener).onJobCancelled(handle);
+
+ assertFalse(handle.changeState(JobHandle.State.STARTED));
+ assertFalse(handle.changeState(JobHandle.State.FAILED));
+ assertFalse(handle.changeState(JobHandle.State.SUCCEEDED));
+ }
+
+ @Test
+ public void testFailedJob() throws Exception {
+ JobHandleImpl<Object> handle = new JobHandleImpl<Object>(client, promise, "job");
+ handle.addListener(listener);
+
+ Throwable cause = new Exception();
+ when(promise.cause()).thenReturn(cause);
+
+ assertTrue(handle.changeState(JobHandle.State.FAILED));
+ verify(promise).cause();
+ verify(listener).onJobFailed(handle, cause);
+ }
+
+ @Test
+ public void testSucceededJob() throws Exception {
+ JobHandleImpl<Object> handle = new JobHandleImpl<Object>(client, promise, "job");
+ handle.addListener(listener);
+
+ Object result = new Exception();
+ when(promise.getNow()).thenReturn(result);
+
+ assertTrue(handle.changeState(JobHandle.State.SUCCEEDED));
+ verify(promise).getNow();
+ verify(listener).onJobSucceeded(handle, result);
+ }
+
+ @Test
+ public void testImmediateCallback() throws Exception {
+ JobHandleImpl<Object> handle = new JobHandleImpl<Object>(client, promise, "job");
+ assertTrue(handle.changeState(JobHandle.State.QUEUED));
+ handle.addListener(listener);
+ verify(listener).onJobQueued(handle);
+
+ handle.changeState(JobHandle.State.STARTED);
+ handle.changeState(JobHandle.State.CANCELLED);
+
+ handle.addListener(listener2);
+ verify(listener2).onJobCancelled(same(handle));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java
----------------------------------------------------------------------
diff --git a/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java b/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java
new file mode 100644
index 0000000..0663822
--- /dev/null
+++ b/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.rsc;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.jar.JarOutputStream;
+import java.util.zip.ZipEntry;
+
+import org.apache.spark.launcher.SparkLauncher;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.livy.Job;
+import org.apache.livy.JobContext;
+import org.apache.livy.JobHandle;
+import org.apache.livy.LivyClient;
+import org.apache.livy.LivyClientBuilder;
+import org.apache.livy.client.common.Serializer;
+import org.apache.livy.rsc.rpc.RpcException;
+import org.apache.livy.test.jobs.Echo;
+import org.apache.livy.test.jobs.Failure;
+import org.apache.livy.test.jobs.FileReader;
+import org.apache.livy.test.jobs.GetCurrentUser;
+import org.apache.livy.test.jobs.SQLGetTweets;
+import org.apache.livy.test.jobs.Sleeper;
+import org.apache.livy.test.jobs.SmallCount;
+import static org.apache.livy.rsc.RSCConf.Entry.*;
+
+public class TestSparkClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestSparkClient.class);
+
+ // Timeouts are bad... mmmkay.
+ private static final long TIMEOUT = 100;
+
+ private Properties createConf(boolean local) {
+ Properties conf = new Properties();
+ if (local) {
+ conf.put(CLIENT_IN_PROCESS.key(), "true");
+ conf.put(SparkLauncher.SPARK_MASTER, "local");
+ conf.put("spark.app.name", "SparkClientSuite Local App");
+ } else {
+ String classpath = System.getProperty("java.class.path");
+ conf.put("spark.app.name", "SparkClientSuite Remote App");
+ conf.put(SparkLauncher.DRIVER_MEMORY, "512m");
+ conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, classpath);
+ conf.put(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH, classpath);
+ }
+
+ conf.put(LIVY_JARS.key(), "");
+ return conf;
+ }
+
+ @Test
+ public void testJobSubmission() throws Exception {
+ runTest(true, new TestFunction() {
+ @Override
+ public void call(LivyClient client) throws Exception {
+ JobHandle.Listener<String> listener = newListener();
+ JobHandle<String> handle = client.submit(new Echo<>("hello"));
+ handle.addListener(listener);
+ assertEquals("hello", handle.get(TIMEOUT, TimeUnit.SECONDS));
+
+ // Try an invalid state transition on the handle. This ensures that the actual state
+ // change we're interested in actually happened, since internally the handle serializes
+ // state changes.
+ assertFalse(((JobHandleImpl<String>)handle).changeState(JobHandle.State.SENT));
+
+ verify(listener).onJobStarted(handle);
+ verify(listener).onJobSucceeded(same(handle), eq(handle.get()));
+ }
+ });
+ }
+
+ @Test
+ public void testSimpleSparkJob() throws Exception {
+ runTest(true, new TestFunction() {
+ @Override
+ public void call(LivyClient client) throws Exception {
+ JobHandle<Long> handle = client.submit(new SmallCount(5));
+ assertEquals(Long.valueOf(5L), handle.get(TIMEOUT, TimeUnit.SECONDS));
+ }
+ });
+ }
+
+ @Test
+ public void testJobFailure() throws Exception {
+ runTest(true, new TestFunction() {
+ @Override
+ public void call(LivyClient client) throws Exception {
+ JobHandle.Listener<Void> listener = newListener();
+ JobHandle<Void> handle = client.submit(new Failure());
+ handle.addListener(listener);
+ try {
+ handle.get(TIMEOUT, TimeUnit.SECONDS);
+ fail("Should have thrown an exception.");
+ } catch (ExecutionException ee) {
+ assertTrue(ee.getCause().getMessage().contains(
+ Failure.JobFailureException.class.getName()));
+ }
+
+ // Try an invalid state transition on the handle. This ensures that the actual state
+ // change we're interested in actually happened, since internally the handle serializes
+ // state changes.
+ assertFalse(((JobHandleImpl<Void>)handle).changeState(JobHandle.State.SENT));
+
+ verify(listener).onJobStarted(handle);
+ verify(listener).onJobFailed(same(handle), any(Throwable.class));
+ }
+ });
+ }
+
+ @Test
+ public void testSyncRpc() throws Exception {
+ runTest(true, new TestFunction() {
+ @Override
+ public void call(LivyClient client) throws Exception {
+ Future<String> result = client.run(new Echo<>("Hello"));
+ assertEquals("Hello", result.get(TIMEOUT, TimeUnit.SECONDS));
+ }
+ });
+ }
+
+ @Test
+ public void testRemoteClient() throws Exception {
+ runTest(false, new TestFunction() {
+ @Override
+ public void call(LivyClient client) throws Exception {
+ JobHandle<Long> handle = client.submit(new SmallCount(5));
+ assertEquals(Long.valueOf(5L), handle.get(TIMEOUT, TimeUnit.SECONDS));
+ }
+ });
+ }
+
+ @Test
+ public void testAddJarsAndFiles() throws Exception {
+ runTest(true, new TestFunction() {
+ @Override
+ public void call(LivyClient client) throws Exception {
+ File jar = null;
+ File file = null;
+
+ try {
+ // Test that adding a jar to the remote context makes it show up in the classpath.
+ jar = File.createTempFile("test", ".jar");
+
+ JarOutputStream jarFile = new JarOutputStream(new FileOutputStream(jar));
+ jarFile.putNextEntry(new ZipEntry("test.resource"));
+ jarFile.write("test resource".getBytes("UTF-8"));
+ jarFile.closeEntry();
+ jarFile.close();
+
+ client.addJar(new URI("file:" + jar.getAbsolutePath()))
+ .get(TIMEOUT, TimeUnit.SECONDS);
+
+ // Need to run a Spark job to make sure the jar is added to the class loader. Monitoring
+ // SparkContext#addJar() doesn't mean much, we can only be sure jars have been distributed
+ // when we run a task after the jar has been added.
+ String result = client.submit(new FileReader("test.resource", true))
+ .get(TIMEOUT, TimeUnit.SECONDS);
+ assertEquals("test resource", result);
+
+ // Test that adding a file to the remote context makes it available to executors.
+ file = File.createTempFile("test", ".file");
+
+ FileOutputStream fileStream = new FileOutputStream(file);
+ fileStream.write("test file".getBytes("UTF-8"));
+ fileStream.close();
+
+ client.addJar(new URI("file:" + file.getAbsolutePath()))
+ .get(TIMEOUT, TimeUnit.SECONDS);
+
+ // The same applies to files added with "addFile". They're only guaranteed to be available
+ // to tasks started after the addFile() call completes.
+ result = client.submit(new FileReader(file.getName(), false))
+ .get(TIMEOUT, TimeUnit.SECONDS);
+ assertEquals("test file", result);
+ } finally {
+ if (jar != null) {
+ jar.delete();
+ }
+ if (file != null) {
+ file.delete();
+ }
+ }
+ }
+ });
+ }
+
+ @Test
+ public void testSparkSQLJob() throws Exception {
+ runTest(true, new TestFunction() {
+ @Override
+ void call(LivyClient client) throws Exception {
+ JobHandle<List<String>> handle = client.submit(new SQLGetTweets(false));
+ List<String> topTweets = handle.get(TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(1, topTweets.size());
+ assertEquals("[Adventures With Coffee, Code, and Writing.,0]",
+ topTweets.get(0));
+ }
+ });
+ }
+
+ @Test
+ public void testHiveJob() throws Exception {
+ runTest(true, new TestFunction() {
+ @Override
+ void call(LivyClient client) throws Exception {
+ JobHandle<List<String>> handle = client.submit(new SQLGetTweets(true));
+ List<String> topTweets = handle.get(TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(1, topTweets.size());
+ assertEquals("[Adventures With Coffee, Code, and Writing.,0]",
+ topTweets.get(0));
+ }
+ });
+ }
+
+ @Test
+ public void testStreamingContext() throws Exception {
+ runTest(true, new TestFunction() {
+ @Override
+ void call(LivyClient client) throws Exception {
+ JobHandle<Boolean> handle = client.submit(new SparkStreamingJob());
+ Boolean streamingContextCreated = handle.get(TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(true, streamingContextCreated);
+ }
+ });
+ }
+
+ @Test
+ public void testImpersonation() throws Exception {
+ final String PROXY = "__proxy__";
+
+ runTest(false, new TestFunction() {
+ @Override
+ void config(Properties conf) {
+ conf.put(RSCConf.Entry.PROXY_USER.key(), PROXY);
+ }
+
+ @Override
+ void call(LivyClient client) throws Exception {
+ JobHandle<String> handle = client.submit(new GetCurrentUser());
+ String userName = handle.get(TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(PROXY, userName);
+ }
+ });
+ }
+
+ @Test
+ public void testConnectToRunningContext() throws Exception {
+ runTest(false, new TestFunction() {
+ @Override
+ void call(LivyClient client) throws Exception {
+ URI uri = disconnectClient(client);
+
+ // If this tries to create a new context, it will fail because it's missing the
+ // needed configuration from createConf().
+ LivyClient newClient = new LivyClientBuilder()
+ .setURI(uri)
+ .build();
+
+ try {
+ JobHandle<String> handle = newClient.submit(new Echo<>("hello"));
+ String result = handle.get(TIMEOUT, TimeUnit.SECONDS);
+ assertEquals("hello", result);
+ } finally {
+ newClient.stop(true);
+ }
+ }
+ });
+ }
+
+ @Test
+ public void testServerIdleTimeout() throws Exception {
+ runTest(true, new TestFunction() {
+ @Override
+ void call(LivyClient client) throws Exception {
+ // Close the old client and wait a couple of seconds for the timeout to trigger.
+ URI uri = disconnectClient(client);
+ TimeUnit.SECONDS.sleep(2);
+
+ // Try to connect back with a new client, it should fail. Since there's no API to monitor
+ // the connection state, we try to enqueue a long-running job and make sure that it fails,
+ // in case the connection actually goes through.
+ try {
+ LivyClient newClient = new LivyClientBuilder()
+ .setURI(uri)
+ .build();
+
+ try {
+ newClient.submit(new Sleeper(TimeUnit.SECONDS.toMillis(TIMEOUT)))
+ .get(TIMEOUT, TimeUnit.SECONDS);
+ } catch (TimeoutException te) {
+ // Shouldn't have gotten here, but catch this so that we stop the client.
+ newClient.stop(true);
+ }
+ fail("Should have failed to contact RSC after idle timeout.");
+ } catch (Exception e) {
+ // Expected.
+ }
+ }
+
+ @Override
+ void config(Properties conf) {
+ conf.setProperty(SERVER_IDLE_TIMEOUT.key(), "1s");
+ }
+ });
+ }
+
+ @Test
+ public void testKillServerWhileSparkSubmitIsRunning() throws Exception {
+ Properties conf = createConf(true);
+ LivyClient client = null;
+ PipedInputStream stubStream = new PipedInputStream(new PipedOutputStream());
+ try {
+ Process mockSparkSubmit = mock(Process.class);
+ when(mockSparkSubmit.getInputStream()).thenReturn(stubStream);
+ when(mockSparkSubmit.getErrorStream()).thenReturn(stubStream);
+
+ // Block waitFor until process.destroy() is called.
+ final CountDownLatch waitForCalled = new CountDownLatch(1);
+ when(mockSparkSubmit.waitFor()).thenAnswer(new Answer<Integer>() {
+ @Override
+ public Integer answer(InvocationOnMock invocation) throws Throwable {
+ waitForCalled.await();
+ return 0;
+ }
+ });
+
+ // Verify process.destroy() is called.
+ final CountDownLatch destroyCalled = new CountDownLatch(1);
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ destroyCalled.countDown();
+ return null;
+ }
+ }).when(mockSparkSubmit).destroy();
+
+ ContextLauncher.mockSparkSubmit = mockSparkSubmit;
+
+ client = new LivyClientBuilder(false).setURI(new URI("rsc:/"))
+ .setAll(conf)
+ .build();
+
+ client.stop(true);
+
+ assertTrue(destroyCalled.await(5, TimeUnit.SECONDS));
+ waitForCalled.countDown();
+ } catch (Exception e) {
+ // JUnit prints not so useful backtraces in test summary reports, and we don't see the
+ // actual source line of the exception, so print the exception to the logs.
+ LOG.error("Test threw exception.", e);
+ throw e;
+ } finally {
+ ContextLauncher.mockSparkSubmit = null;
+ stubStream.close();
+ if (client != null) {
+ client.stop(true);
+ }
+ }
+ }
+
+ @Test
+ public void testBypass() throws Exception {
+ runBypassTest(false);
+ }
+
+ @Test
+ public void testBypassSync() throws Exception {
+ runBypassTest(true);
+ }
+
+ private void runBypassTest(final boolean sync) throws Exception {
+ runTest(true, new TestFunction() {
+ @Override
+ public void call(LivyClient client) throws Exception {
+ Serializer s = new Serializer();
+ RSCClient lclient = (RSCClient) client;
+ ByteBuffer job = s.serialize(new Echo<>("hello"));
+ String jobId = lclient.bypass(job, sync);
+
+ // Try to fetch the result, trying several times until the timeout runs out, and
+ // backing off as attempts fail.
+ long deadline = System.nanoTime() + TimeUnit.NANOSECONDS.convert(TIMEOUT, TimeUnit.SECONDS);
+ long sleep = 100;
+ BypassJobStatus status = null;
+ while (System.nanoTime() < deadline) {
+ BypassJobStatus currStatus = lclient.getBypassJobStatus(jobId).get(TIMEOUT,
+ TimeUnit.SECONDS);
+ assertNotEquals(JobHandle.State.CANCELLED, currStatus.state);
+ assertNotEquals(JobHandle.State.FAILED, currStatus.state);
+ if (currStatus.state.equals(JobHandle.State.SUCCEEDED)) {
+ status = currStatus;
+ break;
+ } else if (deadline - System.nanoTime() > sleep * 2) {
+ Thread.sleep(sleep);
+ sleep *= 2;
+ }
+ }
+ assertNotNull("Failed to fetch bypass job status.", status);
+ assertEquals(JobHandle.State.SUCCEEDED, status.state);
+
+ String resultVal = (String) s.deserialize(ByteBuffer.wrap(status.result));
+ assertEquals("hello", resultVal);
+
+ // After the result is retrieved, the driver should stop tracking the job and release
+ // resources associated with it.
+ try {
+ lclient.getBypassJobStatus(jobId).get(TIMEOUT, TimeUnit.SECONDS);
+ fail("Should have failed to retrieve status of released job.");
+ } catch (ExecutionException ee) {
+ assertTrue(ee.getCause() instanceof RpcException);
+ assertTrue(ee.getCause().getMessage().contains(
+ "java.util.NoSuchElementException: " + jobId));
+ }
+ }
+ });
+ }
+
+ private <T> JobHandle.Listener<T> newListener() {
+ @SuppressWarnings("unchecked")
+ JobHandle.Listener<T> listener =
+ (JobHandle.Listener<T>) mock(JobHandle.Listener.class);
+ return listener;
+ }
+
+ private URI disconnectClient(LivyClient client) throws Exception {
+ ContextInfo ctx = ((RSCClient) client).getContextInfo();
+ URI uri = new URI(String.format("rsc://%s:%s@%s:%d", ctx.clientId, ctx.secret,
+ ctx.remoteAddress, ctx.remotePort));
+
+ // Close the old client and wait a couple of seconds for the timeout to trigger.
+ client.stop(false);
+ return uri;
+ }
+
+ private void runTest(boolean local, TestFunction test) throws Exception {
+ Properties conf = createConf(local);
+ LivyClient client = null;
+ try {
+ test.config(conf);
+ client = new LivyClientBuilder(false).setURI(new URI("rsc:/"))
+ .setAll(conf)
+ .build();
+
+ // Wait for the context to be up before running the test.
+ assertNull(client.submit(new PingJob()).get(TIMEOUT, TimeUnit.SECONDS));
+
+ test.call(client);
+ } catch (Exception e) {
+ // JUnit prints not so useful backtraces in test summary reports, and we don't see the
+ // actual source line of the exception, so print the exception to the logs.
+ LOG.error("Test threw exception.", e);
+ throw e;
+ } finally {
+ if (client != null) {
+ client.stop(true);
+ }
+ }
+ }
+
+ /* Since it's hard to test a streaming context, test that a
+ * streaming context has been created. Also checks that improper
+ * sequence of streaming context calls (i.e create, stop, retrieve)
+ * result in a failure.
+ */
+ private static class SparkStreamingJob implements Job<Boolean> {
+ @Override
+ public Boolean call(JobContext jc) throws Exception {
+ try {
+ jc.streamingctx();
+ fail("Access before creation: Should throw IllegalStateException");
+ } catch (IllegalStateException ex) {
+ // Expected.
+ }
+ try {
+ jc.stopStreamingCtx();
+ fail("Stop before creation: Should throw IllegalStateException");
+ } catch (IllegalStateException ex) {
+ // Expected.
+ }
+ try {
+ jc.createStreamingContext(1000L);
+ JavaStreamingContext streamingContext = jc.streamingctx();
+ jc.stopStreamingCtx();
+ jc.streamingctx();
+ fail();
+ } catch (IllegalStateException ex) {
+ // Expected.
+ }
+
+ jc.createStreamingContext(1000L);
+ JavaStreamingContext streamingContext = jc.streamingctx();
+ jc.stopStreamingCtx();
+ return streamingContext != null;
+ }
+ }
+
+ private abstract static class TestFunction {
+ abstract void call(LivyClient client) throws Exception;
+ void config(Properties conf) { }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/test/java/org/apache/livy/rsc/rpc/TestKryoMessageCodec.java
----------------------------------------------------------------------
diff --git a/rsc/src/test/java/org/apache/livy/rsc/rpc/TestKryoMessageCodec.java b/rsc/src/test/java/org/apache/livy/rsc/rpc/TestKryoMessageCodec.java
new file mode 100644
index 0000000..a09ac43
--- /dev/null
+++ b/rsc/src/test/java/org/apache/livy/rsc/rpc/TestKryoMessageCodec.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.rsc.rpc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.logging.LoggingHandler;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestKryoMessageCodec {
+
+ private static final String MESSAGE = "Hello World!";
+
+ @Test
+ public void testKryoCodec() throws Exception {
+ List<Object> objects = encodeAndDecode(MESSAGE, null);
+ assertEquals(1, objects.size());
+ assertEquals(MESSAGE, objects.get(0));
+ }
+
+ @Test
+ public void testFragmentation() throws Exception {
+ ByteBuf buf = newBuffer();
+ Object[] messages = { "msg1", "msg2" };
+ int[] indices = new int[messages.length];
+
+ KryoMessageCodec codec = new KryoMessageCodec(0);
+
+ for (int i = 0; i < messages.length; i++) {
+ codec.encode(null, messages[i], buf);
+ indices[i] = buf.writerIndex();
+ }
+
+ List<Object> objects = new ArrayList<>();
+
+ // Don't read enough data for the first message to be decoded.
+ codec.decode(null, buf.slice(0, indices[0] - 1), objects);
+ assertEquals(0, objects.size());
+
+ // Read enough data for just the first message to be decoded.
+ codec.decode(null, buf.slice(0, indices[0] + 1), objects);
+ assertEquals(1, objects.size());
+ }
+
+ @Test
+ public void testEmbeddedChannel() throws Exception {
+ EmbeddedChannel c = new EmbeddedChannel(
+ new LoggingHandler(getClass()),
+ new KryoMessageCodec(0));
+ c.writeAndFlush(MESSAGE);
+ assertEquals(1, c.outboundMessages().size());
+ assertFalse(MESSAGE.getClass().equals(c.outboundMessages().peek().getClass()));
+ c.writeInbound(c.readOutbound());
+ assertEquals(1, c.inboundMessages().size());
+ assertEquals(MESSAGE, c.readInbound());
+ c.close();
+ }
+
+ @Test
+ public void testAutoRegistration() throws Exception {
+ KryoMessageCodec codec = new KryoMessageCodec(0, TestMessage.class);
+ ByteBuf buf = newBuffer();
+ codec.encode(null, new TestMessage(), buf);
+
+ List<Object> out = new ArrayList<>();
+ codec.decode(null, buf, out);
+
+ assertEquals(1, out.size());
+ assertTrue(out.get(0) instanceof TestMessage);
+ }
+
+ @Test
+ public void testMaxMessageSize() throws Exception {
+ KryoMessageCodec codec = new KryoMessageCodec(1024);
+ ByteBuf buf = newBuffer();
+ codec.encode(null, new TestMessage(new byte[512]), buf);
+
+ try {
+ codec.encode(null, new TestMessage(new byte[1025]), buf);
+ fail("Should have failed to encode large message.");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().indexOf("maximum allowed size") > 0);
+ }
+
+ KryoMessageCodec unlimited = new KryoMessageCodec(0);
+ buf = newBuffer();
+ unlimited.encode(null, new TestMessage(new byte[1025]), buf);
+
+ try {
+ List<Object> out = new ArrayList<>();
+ codec.decode(null, buf, out);
+ fail("Should have failed to decode large message.");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().indexOf("maximum allowed size") > 0);
+ }
+ }
+
+ @Test
+ public void testNegativeMessageSize() throws Exception {
+ KryoMessageCodec codec = new KryoMessageCodec(1024);
+ ByteBuf buf = newBuffer();
+ buf.writeInt(-1);
+
+ try {
+ List<Object> out = new ArrayList<>();
+ codec.decode(null, buf, out);
+ fail("Should have failed to decode message with negative size.");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().indexOf("must be positive") > 0);
+ }
+ }
+
+ @Test
+ public void testEncryptionOnly() throws Exception {
+ List<Object> objects = Collections.<Object>emptyList();
+ try {
+ objects = encodeAndDecode(MESSAGE, new TestEncryptionHandler(true, false));
+ } catch (Exception e) {
+ // Pass.
+ }
+ // Do this check in case the ciphertext actually makes sense in some way.
+ for (Object msg : objects) {
+ assertFalse(MESSAGE.equals(objects.get(0)));
+ }
+ }
+
+ @Test
+ public void testDecryptionOnly() throws Exception {
+ List<Object> objects = Collections.<Object>emptyList();
+ try {
+ objects = encodeAndDecode(MESSAGE, new TestEncryptionHandler(false, true));
+ } catch (Exception e) {
+ // Pass.
+ }
+ // Do this check in case the decrypted plaintext actually makes sense in some way.
+ for (Object msg : objects) {
+ assertFalse(MESSAGE.equals(objects.get(0)));
+ }
+ }
+
+ @Test
+ public void testEncryptDecrypt() throws Exception {
+ List<Object> objects = encodeAndDecode(MESSAGE, new TestEncryptionHandler(true, true));
+ assertEquals(1, objects.size());
+ assertEquals(MESSAGE, objects.get(0));
+ }
+
+ private List<Object> encodeAndDecode(Object message, KryoMessageCodec.EncryptionHandler eh)
+ throws Exception {
+ ByteBuf buf = newBuffer();
+ KryoMessageCodec codec = new KryoMessageCodec(0);
+ codec.setEncryptionHandler(eh);
+ codec.encode(null, message, buf);
+
+ List<Object> objects = new ArrayList<>();
+ codec.decode(null, buf, objects);
+ return objects;
+ }
+
+ private ByteBuf newBuffer() {
+ return UnpooledByteBufAllocator.DEFAULT.buffer(1024);
+ }
+
+ private static class TestMessage {
+ byte[] data;
+
+ TestMessage() {
+ this(null);
+ }
+
+ TestMessage(byte[] data) {
+ this.data = data;
+ }
+ }
+
+ private static class TestEncryptionHandler implements KryoMessageCodec.EncryptionHandler {
+
+ private static final byte KEY = 0x42;
+
+ private final boolean encrypt;
+ private final boolean decrypt;
+
+ TestEncryptionHandler(boolean encrypt, boolean decrypt) {
+ this.encrypt = encrypt;
+ this.decrypt = decrypt;
+ }
+
+ public byte[] wrap(byte[] data, int offset, int len) throws IOException {
+ return encrypt ? transform(data, offset, len) : data;
+ }
+
+ public byte[] unwrap(byte[] data, int offset, int len) throws IOException {
+ return decrypt ? transform(data, offset, len) : data;
+ }
+
+ public void dispose() throws IOException {
+
+ }
+
+ private byte[] transform(byte[] data, int offset, int len) {
+ byte[] dest = new byte[len];
+ for (int i = 0; i < len; i++) {
+ dest[i] = (byte) (data[offset + i] ^ KEY);
+ }
+ return dest;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/test/java/org/apache/livy/rsc/rpc/TestRpc.java
----------------------------------------------------------------------
diff --git a/rsc/src/test/java/org/apache/livy/rsc/rpc/TestRpc.java b/rsc/src/test/java/org/apache/livy/rsc/rpc/TestRpc.java
new file mode 100644
index 0000000..8967906
--- /dev/null
+++ b/rsc/src/test/java/org/apache/livy/rsc/rpc/TestRpc.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.rsc.rpc;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.security.sasl.SaslException;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.util.concurrent.Future;
+import org.apache.commons.io.IOUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.livy.rsc.FutureListener;
+import org.apache.livy.rsc.RSCConf;
+import org.apache.livy.rsc.Utils;
+import static org.apache.livy.rsc.RSCConf.Entry.*;
+
+public class TestRpc {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestRpc.class);
+
+ private Collection<Closeable> closeables;
+ private RSCConf emptyConfig;
+
+ @Before
+ public void setUp() {
+ closeables = new ArrayList<>();
+ emptyConfig = new RSCConf(null);
+ }
+
+ @After
+ public void cleanUp() throws Exception {
+ for (Closeable c : closeables) {
+ IOUtils.closeQuietly(c);
+ }
+ }
+
+ private <T extends Closeable> T autoClose(T closeable) {
+ closeables.add(closeable);
+ return closeable;
+ }
+
+ @Test
+ public void testRpcDispatcher() throws Exception {
+ Rpc serverRpc = autoClose(Rpc.createEmbedded(new TestDispatcher()));
+ Rpc clientRpc = autoClose(Rpc.createEmbedded(new TestDispatcher()));
+
+ TestMessage outbound = new TestMessage("Hello World!");
+ Future<TestMessage> call = clientRpc.call(outbound, TestMessage.class);
+
+ LOG.debug("Transferring messages...");
+ transfer(serverRpc, clientRpc);
+
+ TestMessage reply = call.get(10, TimeUnit.SECONDS);
+ assertEquals(outbound.message, reply.message);
+ }
+
+ @Test
+ public void testClientServer() throws Exception {
+ RpcServer server = autoClose(new RpcServer(emptyConfig));
+ Rpc[] rpcs = createRpcConnection(server);
+ Rpc serverRpc = rpcs[0];
+ Rpc client = rpcs[1];
+
+ TestMessage outbound = new TestMessage("Hello World!");
+ Future<TestMessage> call = client.call(outbound, TestMessage.class);
+ TestMessage reply = call.get(10, TimeUnit.SECONDS);
+ assertEquals(outbound.message, reply.message);
+
+ TestMessage another = new TestMessage("Hello again!");
+ Future<TestMessage> anotherCall = client.call(another, TestMessage.class);
+ TestMessage anotherReply = anotherCall.get(10, TimeUnit.SECONDS);
+ assertEquals(another.message, anotherReply.message);
+
+ String errorMsg = "This is an error.";
+ try {
+ client.call(new ErrorCall(errorMsg)).get(10, TimeUnit.SECONDS);
+ } catch (ExecutionException ee) {
+ assertTrue(ee.getCause() instanceof RpcException);
+ assertTrue(ee.getCause().getMessage().indexOf(errorMsg) >= 0);
+ }
+
+ // Test from server to client too.
+ TestMessage serverMsg = new TestMessage("Hello from the server!");
+ Future<TestMessage> serverCall = serverRpc.call(serverMsg, TestMessage.class);
+ TestMessage serverReply = serverCall.get(10, TimeUnit.SECONDS);
+ assertEquals(serverMsg.message, serverReply.message);
+ }
+
+ @Test
+ public void testBadHello() throws Exception {
+ RpcServer server = autoClose(new RpcServer(emptyConfig));
+ RpcServer.ClientCallback callback = mock(RpcServer.ClientCallback.class);
+
+ server.registerClient("client", "newClient", callback);
+ Future<Rpc> clientRpcFuture = Rpc.createClient(emptyConfig, server.getEventLoopGroup(),
+ "localhost", server.getPort(), "client", "wrongClient", new TestDispatcher());
+
+ try {
+ autoClose(clientRpcFuture.get(10, TimeUnit.SECONDS));
+ fail("Should have failed to create client with wrong secret.");
+ } catch (ExecutionException ee) {
+ // On failure, the SASL handler will throw an exception indicating that the SASL
+ // negotiation failed.
+ assertTrue("Unexpected exception: " + ee.getCause(),
+ ee.getCause() instanceof SaslException);
+ }
+
+ verify(callback, never()).onNewClient(any(Rpc.class));
+ }
+
+ @Test
+ public void testCloseListener() throws Exception {
+ RpcServer server = autoClose(new RpcServer(emptyConfig));
+ Rpc[] rpcs = createRpcConnection(server);
+ Rpc client = rpcs[1];
+
+ final AtomicInteger closeCount = new AtomicInteger();
+ Utils.addListener(client.getChannel().closeFuture(), new FutureListener<Void>() {
+ @Override
+ public void onSuccess(Void unused) {
+ closeCount.incrementAndGet();
+ }
+ });
+
+ client.close();
+ client.close();
+ assertEquals(1, closeCount.get());
+ }
+
+ @Test
+ public void testNotDeserializableRpc() throws Exception {
+ RpcServer server = autoClose(new RpcServer(emptyConfig));
+ Rpc[] rpcs = createRpcConnection(server);
+ Rpc client = rpcs[1];
+
+ try {
+ client.call(new NotDeserializable(42)).get(10, TimeUnit.SECONDS);
+ } catch (ExecutionException ee) {
+ assertTrue(ee.getCause() instanceof RpcException);
+ assertTrue(ee.getCause().getMessage().indexOf("KryoException") >= 0);
+ }
+ }
+
+ @Test
+ public void testEncryption() throws Exception {
+ RSCConf eConf = new RSCConf(null)
+ .setAll(emptyConfig)
+ .set(SASL_QOP, Rpc.SASL_AUTH_CONF);
+ RpcServer server = autoClose(new RpcServer(eConf));
+ Rpc[] rpcs = createRpcConnection(server, eConf);
+ Rpc client = rpcs[1];
+
+ TestMessage outbound = new TestMessage("Hello World!");
+ Future<TestMessage> call = client.call(outbound, TestMessage.class);
+ TestMessage reply = call.get(10, TimeUnit.SECONDS);
+ assertEquals(outbound.message, reply.message);
+ }
+
+ @Test
+ public void testPortRange() throws Exception {
+ String portRange = "a~b";
+ emptyConfig.set(LAUNCHER_PORT_RANGE, portRange);
+ try {
+ autoClose(new RpcServer(emptyConfig));
+ } catch (Exception ee) {
+ assertTrue(ee instanceof NumberFormatException);
+ }
+ portRange = "11000";
+ emptyConfig.set(LAUNCHER_PORT_RANGE, portRange);
+ try {
+ autoClose(new RpcServer(emptyConfig));
+ } catch (Exception ee) {
+ assertTrue(ee instanceof ArrayIndexOutOfBoundsException);
+ }
+ portRange = "11000~11110";
+ emptyConfig.set(LAUNCHER_PORT_RANGE, portRange);
+ String [] portRangeData = portRange.split("~");
+ int startPort = Integer.parseInt(portRangeData[0]);
+ int endPort = Integer.parseInt(portRangeData[1]);
+ RpcServer server = autoClose(new RpcServer(emptyConfig));
+ assertTrue(startPort <= server.getPort() && server.getPort() <= endPort);
+ }
+
+ private void transfer(Rpc serverRpc, Rpc clientRpc) {
+ EmbeddedChannel client = (EmbeddedChannel) clientRpc.getChannel();
+ EmbeddedChannel server = (EmbeddedChannel) serverRpc.getChannel();
+
+ server.runPendingTasks();
+ client.runPendingTasks();
+
+ int count = 0;
+ while (!client.outboundMessages().isEmpty()) {
+ server.writeInbound(client.readOutbound());
+ count++;
+ }
+ server.flush();
+ LOG.debug("Transferred {} outbound client messages.", count);
+
+ count = 0;
+ while (!server.outboundMessages().isEmpty()) {
+ client.writeInbound(server.readOutbound());
+ count++;
+ }
+ client.flush();
+ LOG.debug("Transferred {} outbound server messages.", count);
+ }
+
+ /**
+ * Creates a client connection between the server and a client.
+ *
+ * @return two-tuple (server rpc, client rpc)
+ */
+ private Rpc[] createRpcConnection(RpcServer server) throws Exception {
+ return createRpcConnection(server, emptyConfig);
+ }
+
+ private Rpc[] createRpcConnection(RpcServer server, RSCConf clientConf)
+ throws Exception {
+ String secret = server.createSecret();
+ ServerRpcCallback callback = new ServerRpcCallback();
+ server.registerClient("client", secret, callback);
+
+ Future<Rpc> clientRpcFuture = Rpc.createClient(clientConf, server.getEventLoopGroup(),
+ "localhost", server.getPort(), "client", secret, new TestDispatcher());
+
+ assertTrue("onNewClient() wasn't called.",
+ callback.onNewClientCalled.await(10, TimeUnit.SECONDS));
+ assertTrue("onSaslComplete() wasn't called.",
+ callback.onSaslCompleteCalled.await(10, TimeUnit.SECONDS));
+ assertNotNull(callback.client);
+ Rpc serverRpc = autoClose(callback.client);
+ Rpc clientRpc = autoClose(clientRpcFuture.get(10, TimeUnit.SECONDS));
+ return new Rpc[] { serverRpc, clientRpc };
+ }
+
+ private static class ServerRpcCallback implements RpcServer.ClientCallback {
+ final CountDownLatch onNewClientCalled = new CountDownLatch(1);
+ final CountDownLatch onSaslCompleteCalled = new CountDownLatch(1);
+ Rpc client;
+
+ @Override
+ public RpcDispatcher onNewClient(Rpc client) {
+ this.client = client;
+ onNewClientCalled.countDown();
+ return new TestDispatcher();
+ }
+
+ @Override
+ public void onSaslComplete(Rpc client) {
+ onSaslCompleteCalled.countDown();
+ }
+
+ }
+
+ private static class TestMessage {
+
+ final String message;
+
+ public TestMessage() {
+ this(null);
+ }
+
+ public TestMessage(String message) {
+ this.message = message;
+ }
+
+ }
+
+ private static class ErrorCall {
+
+ final String error;
+
+ public ErrorCall() {
+ this(null);
+ }
+
+ public ErrorCall(String error) {
+ this.error = error;
+ }
+
+ }
+
+ private static class NotDeserializable {
+
+ NotDeserializable(int unused) {
+
+ }
+
+ }
+
+ private static class TestDispatcher extends RpcDispatcher {
+ protected TestMessage handle(ChannelHandlerContext ctx, TestMessage msg) {
+ return msg;
+ }
+
+ protected void handle(ChannelHandlerContext ctx, ErrorCall msg) {
+ throw new IllegalArgumentException(msg.error);
+ }
+
+ protected void handle(ChannelHandlerContext ctx, NotDeserializable msg) {
+ // No op. Shouldn't actually be called, if it is, the test will fail.
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/pom.xml
----------------------------------------------------------------------
diff --git a/scala-api/pom.xml b/scala-api/pom.xml
index e3e63b0..7e4a4b2 100644
--- a/scala-api/pom.xml
+++ b/scala-api/pom.xml
@@ -21,37 +21,37 @@
<modelVersion>4.0.0</modelVersion>
<parent>
- <groupId>com.cloudera.livy</groupId>
+ <groupId>org.apache.livy</groupId>
<artifactId>multi-scala-project-root</artifactId>
- <version>0.4.0-SNAPSHOT</version>
+ <version>0.4.0-incubating-SNAPSHOT</version>
<relativePath>../scala/pom.xml</relativePath>
</parent>
<artifactId>livy-scala-api-parent</artifactId>
- <version>0.4.0-SNAPSHOT</version>
+ <version>0.4.0-incubating-SNAPSHOT</version>
<packaging>pom</packaging>
<dependencies>
<dependency>
- <groupId>com.cloudera.livy</groupId>
+ <groupId>org.apache.livy</groupId>
<artifactId>livy-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>com.cloudera.livy</groupId>
+ <groupId>org.apache.livy</groupId>
<artifactId>livy-rsc</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
- <groupId>com.cloudera.livy</groupId>
+ <groupId>org.apache.livy</groupId>
<artifactId>livy-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
- <groupId>com.cloudera.livy</groupId>
+ <groupId>org.apache.livy</groupId>
<artifactId>livy-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/scala-2.10/pom.xml
----------------------------------------------------------------------
diff --git a/scala-api/scala-2.10/pom.xml b/scala-api/scala-2.10/pom.xml
index e509fe4..96f6bf8 100644
--- a/scala-api/scala-2.10/pom.xml
+++ b/scala-api/scala-2.10/pom.xml
@@ -17,15 +17,15 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
- <groupId>com.cloudera.livy</groupId>
+ <groupId>org.apache.livy</groupId>
<artifactId>livy-scala-api_2.10</artifactId>
- <version>0.4.0-SNAPSHOT</version>
+ <version>0.4.0-incubating-SNAPSHOT</version>
<packaging>jar</packaging>
<parent>
- <groupId>com.cloudera.livy</groupId>
+ <groupId>org.apache.livy</groupId>
<artifactId>livy-scala-api-parent</artifactId>
- <version>0.4.0-SNAPSHOT</version>
+ <version>0.4.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/scala-2.11/pom.xml
----------------------------------------------------------------------
diff --git a/scala-api/scala-2.11/pom.xml b/scala-api/scala-2.11/pom.xml
index 1b0509a..3690347 100644
--- a/scala-api/scala-2.11/pom.xml
+++ b/scala-api/scala-2.11/pom.xml
@@ -17,15 +17,15 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
- <groupId>com.cloudera.livy</groupId>
+ <groupId>org.apache.livy</groupId>
<artifactId>livy-scala-api_2.11</artifactId>
- <version>0.4.0-SNAPSHOT</version>
+ <version>0.4.0-incubating-SNAPSHOT</version>
<packaging>jar</packaging>
<parent>
- <groupId>com.cloudera.livy</groupId>
+ <groupId>org.apache.livy</groupId>
<artifactId>livy-scala-api-parent</artifactId>
- <version>0.4.0-SNAPSHOT</version>
+ <version>0.4.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>