You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/07/05 03:07:01 UTC
[32/33] incubator-livy git commit: LIVY-375. Change Livy code package
name to org.apache.livy
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/client-common/src/main/java/com/cloudera/livy/client/common/AbstractJobHandle.java
----------------------------------------------------------------------
diff --git a/client-common/src/main/java/com/cloudera/livy/client/common/AbstractJobHandle.java b/client-common/src/main/java/com/cloudera/livy/client/common/AbstractJobHandle.java
deleted file mode 100644
index cb1b4e8..0000000
--- a/client-common/src/main/java/com/cloudera/livy/client/common/AbstractJobHandle.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.client.common;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import com.cloudera.livy.JobHandle;
-import com.cloudera.livy.annotations.Private;
-
-@Private
-public abstract class AbstractJobHandle<T> implements JobHandle<T> {
-
- protected final List<Listener<T>> listeners;
- protected volatile State state;
-
- protected AbstractJobHandle() {
- this.listeners = new LinkedList<>();
- this.state = State.SENT;
- }
-
- @Override
- public State getState() {
- return state;
- }
-
- @Override
- public void addListener(Listener<T> l) {
- synchronized (listeners) {
- listeners.add(l);
- fireStateChange(state, l);
- }
- }
-
- /**
- * Changes the state of this job handle, making sure that illegal state transitions are ignored.
- * Fires events appropriately.
- *
- * As a rule, state transitions can only occur if the current state is "higher" than the current
- * state (i.e., has a higher ordinal number) and is not a "final" state. "Final" states are
- * CANCELLED, FAILED and SUCCEEDED, defined here in the code as having an ordinal number higher
- * than the CANCELLED enum constant.
- */
- public boolean changeState(State newState) {
- synchronized (listeners) {
- if (newState.ordinal() > state.ordinal() && state.ordinal() < State.CANCELLED.ordinal()) {
- state = newState;
- for (Listener<T> l : listeners) {
- fireStateChange(newState, l);
- }
- return true;
- }
- return false;
- }
- }
-
- protected abstract T result();
- protected abstract Throwable error();
-
- private void fireStateChange(State s, Listener<T> l) {
- switch (s) {
- case SENT:
- break;
- case QUEUED:
- l.onJobQueued(this);
- break;
- case STARTED:
- l.onJobStarted(this);
- break;
- case CANCELLED:
- l.onJobCancelled(this);
- break;
- case FAILED:
- l.onJobFailed(this, error());
- break;
- case SUCCEEDED:
- try {
- l.onJobSucceeded(this, result());
- } catch (Exception e) {
- // Shouldn't really happen.
- throw new IllegalStateException(e);
- }
- break;
- default:
- throw new IllegalStateException();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/client-common/src/main/java/com/cloudera/livy/client/common/BufferUtils.java
----------------------------------------------------------------------
diff --git a/client-common/src/main/java/com/cloudera/livy/client/common/BufferUtils.java b/client-common/src/main/java/com/cloudera/livy/client/common/BufferUtils.java
deleted file mode 100644
index 3084ae5..0000000
--- a/client-common/src/main/java/com/cloudera/livy/client/common/BufferUtils.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.client.common;
-
-import java.nio.ByteBuffer;
-
-import com.cloudera.livy.annotations.Private;
-
-/**
- * Utility methods for dealing with byte buffers and byte arrays.
- */
-@Private
-public class BufferUtils {
-
- public static byte[] toByteArray(ByteBuffer buf) {
- byte[] bytes;
- if (buf.hasArray() && buf.arrayOffset() == 0 &&
- buf.remaining() == buf.array().length) {
- bytes = buf.array();
- } else {
- bytes = new byte[buf.remaining()];
- buf.get(bytes);
- }
- return bytes;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/client-common/src/main/java/com/cloudera/livy/client/common/ClientConf.java
----------------------------------------------------------------------
diff --git a/client-common/src/main/java/com/cloudera/livy/client/common/ClientConf.java b/client-common/src/main/java/com/cloudera/livy/client/common/ClientConf.java
deleted file mode 100644
index 035c75e..0000000
--- a/client-common/src/main/java/com/cloudera/livy/client/common/ClientConf.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.client.common;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.livy.annotations.Private;
-
-/**
- * Base class with common functionality for type-safe configuration objects.
- */
-@Private
-public abstract class ClientConf<T extends ClientConf>
- implements Iterable<Map.Entry<String, String>> {
-
- protected Logger LOG = LoggerFactory.getLogger(getClass());
-
- public static interface ConfEntry {
-
- /** The key in the configuration file. */
- String key();
-
- /**
- * The default value, which also defines the type of the config. Supported types:
- * Boolean, Integer, Long, String. <code>null</code> maps to String.
- */
- Object dflt();
-
- }
-
- private static final Map<String, TimeUnit> TIME_SUFFIXES;
-
- public static final boolean TEST_MODE = Boolean.parseBoolean(System.getenv("LIVY_TEST"));
-
- static {
- TIME_SUFFIXES = new HashMap<>();
- TIME_SUFFIXES.put("us", TimeUnit.MICROSECONDS);
- TIME_SUFFIXES.put("ms", TimeUnit.MILLISECONDS);
- TIME_SUFFIXES.put("s", TimeUnit.SECONDS);
- TIME_SUFFIXES.put("m", TimeUnit.MINUTES);
- TIME_SUFFIXES.put("min", TimeUnit.MINUTES);
- TIME_SUFFIXES.put("h", TimeUnit.HOURS);
- TIME_SUFFIXES.put("d", TimeUnit.DAYS);
- }
-
- protected final ConcurrentMap<String, String> config;
-
- protected ClientConf(Properties config) {
- this.config = new ConcurrentHashMap<>();
- if (config != null) {
- for (String key : config.stringPropertyNames()) {
- logDeprecationWarning(key);
- this.config.put(key, config.getProperty(key));
- }
- }
- }
-
- public String get(String key) {
- String val = config.get(key);
- if (val != null) {
- return val;
- }
- DeprecatedConf depConf = getConfigsWithAlternatives().get(key);
- if (depConf != null) {
- return config.get(depConf.key());
- } else {
- return val;
- }
- }
-
- @SuppressWarnings("unchecked")
- public T set(String key, String value) {
- logDeprecationWarning(key);
- config.put(key, value);
- return (T) this;
- }
-
- @SuppressWarnings("unchecked")
- public T setIfMissing(String key, String value) {
- if (config.putIfAbsent(key, value) == null) {
- logDeprecationWarning(key);
- }
- return (T) this;
- }
-
- @SuppressWarnings("unchecked")
- public T setAll(ClientConf<?> other) {
- for (Map.Entry<String, String> e : other) {
- set(e.getKey(), e.getValue());
- }
- return (T) this;
- }
-
- public String get(ConfEntry e) {
- Object value = get(e, String.class);
- return (String) (value != null ? value : e.dflt());
- }
-
- public boolean getBoolean(ConfEntry e) {
- String val = get(e, Boolean.class);
- if (val != null) {
- return Boolean.parseBoolean(val);
- } else {
- return (Boolean) e.dflt();
- }
- }
-
- public int getInt(ConfEntry e) {
- String val = get(e, Integer.class);
- if (val != null) {
- return Integer.parseInt(val);
- } else {
- return (Integer) e.dflt();
- }
- }
-
- public long getLong(ConfEntry e) {
- String val = get(e, Long.class);
- if (val != null) {
- return Long.parseLong(val);
- } else {
- return (Long) e.dflt();
- }
- }
-
- public long getTimeAsMs(ConfEntry e) {
- String time = get(e, String.class);
- if (time == null) {
- check(e.dflt() != null,
- "ConfEntry %s doesn't have a default value, cannot convert to time value.", e.key());
- time = (String) e.dflt();
- }
-
- Matcher m = Pattern.compile("(-?[0-9]+)([a-z]+)?").matcher(time.toLowerCase());
- if (!m.matches()) {
- throw new IllegalArgumentException("Invalid time string: " + time);
- }
-
- long val = Long.parseLong(m.group(1));
- String suffix = m.group(2);
-
- if (suffix != null && !TIME_SUFFIXES.containsKey(suffix)) {
- throw new IllegalArgumentException("Invalid suffix: \"" + suffix + "\"");
- }
-
- return TimeUnit.MILLISECONDS.convert(val,
- suffix != null ? TIME_SUFFIXES.get(suffix) : TimeUnit.MILLISECONDS);
- }
-
- @SuppressWarnings("unchecked")
- public T set(ConfEntry e, Object value) {
- check(typesMatch(value, e.dflt()), "Value doesn't match configuration entry type for %s.",
- e.key());
- if (value == null) {
- config.remove(e.key());
- } else {
- logDeprecationWarning(e.key());
- config.put(e.key(), value.toString());
- }
- return (T) this;
- }
-
- @Override
- public Iterator<Map.Entry<String, String>> iterator() {
- return config.entrySet().iterator();
- }
-
- private String get(ConfEntry e, Class<?> requestedType) {
- check(getType(e.dflt()).equals(requestedType), "Invalid type conversion requested for %s.",
- e.key());
- return this.get(e.key());
- }
-
- private boolean typesMatch(Object test, Object expected) {
- return test == null || getType(test).equals(getType(expected));
- }
-
- private Class<?> getType(Object o) {
- return (o != null) ? o.getClass() : String.class;
- }
-
- private void check(boolean test, String message, Object... args) {
- if (!test) {
- throw new IllegalArgumentException(String.format(message, args));
- }
- }
-
- /** Logs a warning message if the given config key is deprecated. */
- private void logDeprecationWarning(String key) {
- DeprecatedConf altConfs = getConfigsWithAlternatives().get(key);
- if (altConfs != null) {
- LOG.warn("The configuration key " + altConfs.key() + " has been deprecated as of Livy "
- + altConfs.version() + " and may be removed in the future. Please use the new key "
- + key + " instead.");
- return;
- }
-
- DeprecatedConf depConfs = getDeprecatedConfigs().get(key);
- if (depConfs != null) {
- LOG.warn("The configuration key " + depConfs.key() + " has been deprecated as of Livy "
- + depConfs.version() + " and may be removed in the future. "
- + depConfs.deprecationMessage());
- }
- }
-
- /** Maps valid key to DeprecatedConf with the deprecated key. */
- protected abstract Map<String, DeprecatedConf> getConfigsWithAlternatives();
-
- /** Maps deprecated key to DeprecatedConf with the same key. */
- protected abstract Map<String, DeprecatedConf> getDeprecatedConfigs();
-
- public static interface DeprecatedConf {
-
- /** The key in the configuration file. */
- String key();
-
- /** The Livy version in which the key was deprecated. */
- String version();
-
- /** Message to include in the deprecation warning for configs without alternatives */
- String deprecationMessage();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/client-common/src/main/java/com/cloudera/livy/client/common/HttpMessages.java
----------------------------------------------------------------------
diff --git a/client-common/src/main/java/com/cloudera/livy/client/common/HttpMessages.java b/client-common/src/main/java/com/cloudera/livy/client/common/HttpMessages.java
deleted file mode 100644
index 28a86bf..0000000
--- a/client-common/src/main/java/com/cloudera/livy/client/common/HttpMessages.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.client.common;
-
-import java.util.List;
-import java.util.Map;
-
-import com.cloudera.livy.JobHandle.State;
-import com.cloudera.livy.annotations.Private;
-
-/**
- * There are the Java representations of the JSON messages used by the client protocol.
- *
- * Note that Jackson requires an empty constructor (or annotations) to be able to instantiate
- * types, so the extra noise is necessary here.
- */
-@Private
-public class HttpMessages {
-
- public static interface ClientMessage {
-
- }
-
- public static class CreateClientRequest implements ClientMessage {
-
- public final Map<String, String> conf;
-
- public CreateClientRequest(Map<String, String> conf) {
- this.conf = conf;
- }
-
- private CreateClientRequest() {
- this(null);
- }
-
- }
-
- public static class SessionInfo implements ClientMessage {
-
- public final int id;
- public final String appId;
- public final String owner;
- public final String proxyUser;
- public final String state;
- public final String kind;
- public final Map<String, String> appInfo;
- public final List<String> log;
-
- public SessionInfo(int id, String appId, String owner, String proxyUser, String state,
- String kind, Map<String, String> appInfo, List<String> log) {
- this.id = id;
- this.appId = appId;
- this.owner = owner;
- this.proxyUser = proxyUser;
- this.state = state;
- this.kind = kind;
- this.appInfo = appInfo;
- this.log = log;
- }
-
- private SessionInfo() {
- this(-1, null, null, null, null, null, null, null);
- }
-
- }
-
- public static class SerializedJob implements ClientMessage {
-
- public final byte[] job;
-
- public SerializedJob(byte[] job) {
- this.job = job;
- }
-
- private SerializedJob() {
- this(null);
- }
-
- }
-
- public static class AddResource implements ClientMessage {
-
- public final String uri;
-
- public AddResource(String uri) {
- this.uri = uri;
- }
-
- private AddResource() {
- this(null);
- }
-
- }
-
- public static class JobStatus implements ClientMessage {
-
- public final long id;
- public final State state;
- public final byte[] result;
- public final String error;
-
- public JobStatus(long id, State state, byte[] result, String error) {
- this.id = id;
- this.state = state;
- this.error = error;
-
- // json4s, at least, seems confused about whether a "null" in the JSON payload should
- // become a null array or a byte array with length 0. Since there shouldn't be any
- // valid serialized object in a byte array of size 0, translate that to null.
- this.result = (result != null && result.length > 0) ? result : null;
-
- if (this.result != null && state != State.SUCCEEDED) {
- throw new IllegalArgumentException("Result cannot be set unless job succeeded.");
- }
- // The check for "result" is not completely correct, but is here to make the unit tests work.
- if (this.result == null && error != null && state != State.FAILED) {
- throw new IllegalArgumentException("Error cannot be set unless job failed.");
- }
- }
-
- private JobStatus() {
- this(-1, null, null, null);
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/client-common/src/main/java/com/cloudera/livy/client/common/Serializer.java
----------------------------------------------------------------------
diff --git a/client-common/src/main/java/com/cloudera/livy/client/common/Serializer.java b/client-common/src/main/java/com/cloudera/livy/client/common/Serializer.java
deleted file mode 100644
index 29d82d4..0000000
--- a/client-common/src/main/java/com/cloudera/livy/client/common/Serializer.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.client.common;
-
-import java.io.ByteArrayOutputStream;
-import java.nio.ByteBuffer;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.shaded.org.objenesis.strategy.StdInstantiatorStrategy;
-
-import com.cloudera.livy.annotations.Private;
-
-/**
- * Utility class to serialize user data using Kryo.
- */
-@Private
-public class Serializer {
-
- // Kryo docs say 0-8 are taken. Strange things happen if you don't set an ID when registering
- // classes.
- private static final int REG_ID_BASE = 16;
-
- private final ThreadLocal<Kryo> kryos;
-
- public Serializer(final Class<?>... klasses) {
- this.kryos = new ThreadLocal<Kryo>() {
- @Override
- protected Kryo initialValue() {
- Kryo kryo = new Kryo();
- int count = 0;
- for (Class<?> klass : klasses) {
- kryo.register(klass, REG_ID_BASE + count);
- count++;
- }
- kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
- kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
- return kryo;
- }
- };
- }
-
- public Object deserialize(ByteBuffer data) {
- byte[] b = new byte[data.remaining()];
- data.get(b);
- Input kryoIn = new Input(b);
- return kryos.get().readClassAndObject(kryoIn);
- }
-
- public ByteBuffer serialize(Object data) {
- ByteBufferOutputStream out = new ByteBufferOutputStream();
- Output kryoOut = new Output(out);
- kryos.get().writeClassAndObject(kryoOut, data);
- kryoOut.flush();
- return out.getBuffer();
- }
-
- private static class ByteBufferOutputStream extends ByteArrayOutputStream {
-
- public ByteBuffer getBuffer() {
- ByteBuffer result = ByteBuffer.wrap(buf, 0, count);
- buf = null;
- reset();
- return result;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/client-common/src/main/java/com/cloudera/livy/client/common/TestUtils.java
----------------------------------------------------------------------
diff --git a/client-common/src/main/java/com/cloudera/livy/client/common/TestUtils.java b/client-common/src/main/java/com/cloudera/livy/client/common/TestUtils.java
deleted file mode 100644
index 18bb13a..0000000
--- a/client-common/src/main/java/com/cloudera/livy/client/common/TestUtils.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.client.common;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.StandardOpenOption;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import com.cloudera.livy.annotations.Private;
-
-/**
- * Utility methods used by Livy tests.
- */
-@Private
-public class TestUtils {
-
- /**
- * Returns JVM arguments that enable jacoco on a process to be run. The returned arguments
- * create a new, unique output file in the same directory referenced by the "jacoco.args"
- * system property.
- *
- * @return JVM arguments, or null.
- */
- public static String getJacocoArgs() {
- String jacocoArgs = System.getProperty("jacoco.args");
- if (jacocoArgs == null) {
- return null;
- }
-
- Pattern p = Pattern.compile("(.+?destfile=)(.+?)(,.+)?");
- Matcher m = p.matcher(jacocoArgs);
- if (!m.matches()) {
- return null;
- }
-
- String fileName = new File(m.group(2)).getName();
- File outputDir = new File(m.group(2)).getParentFile();
-
- File newFile;
- while (true) {
- int newId = outputDir.list().length;
- newFile = new File(outputDir, "jacoco-" + newId + ".exec");
- try {
- Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW).close();
- break;
- } catch (IOException ioe) {
- // Try again.
- }
- }
-
- StringBuilder newArgs = new StringBuilder();
- newArgs.append(m.group(1));
- newArgs.append(newFile.getAbsolutePath());
- if (m.group(3) != null) {
- newArgs.append(m.group(3));
- }
-
- return newArgs.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/client-common/src/main/java/org/apache/livy/client/common/AbstractJobHandle.java
----------------------------------------------------------------------
diff --git a/client-common/src/main/java/org/apache/livy/client/common/AbstractJobHandle.java b/client-common/src/main/java/org/apache/livy/client/common/AbstractJobHandle.java
new file mode 100644
index 0000000..8d3c307
--- /dev/null
+++ b/client-common/src/main/java/org/apache/livy/client/common/AbstractJobHandle.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.client.common;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.livy.JobHandle;
+import org.apache.livy.annotations.Private;
+
+@Private
+public abstract class AbstractJobHandle<T> implements JobHandle<T> {
+
+ protected final List<Listener<T>> listeners;
+ protected volatile State state;
+
+ protected AbstractJobHandle() {
+ this.listeners = new LinkedList<>();
+ this.state = State.SENT;
+ }
+
+ @Override
+ public State getState() {
+ return state;
+ }
+
+ @Override
+ public void addListener(Listener<T> l) {
+ synchronized (listeners) {
+ listeners.add(l);
+ fireStateChange(state, l);
+ }
+ }
+
+ /**
+ * Changes the state of this job handle, making sure that illegal state transitions are ignored.
+ * Fires events appropriately.
+ *
+ * As a rule, state transitions can only occur if the current state is "higher" than the current
+ * state (i.e., has a higher ordinal number) and is not a "final" state. "Final" states are
+ * CANCELLED, FAILED and SUCCEEDED, defined here in the code as having an ordinal number higher
+ * than the CANCELLED enum constant.
+ */
+ public boolean changeState(State newState) {
+ synchronized (listeners) {
+ if (newState.ordinal() > state.ordinal() && state.ordinal() < State.CANCELLED.ordinal()) {
+ state = newState;
+ for (Listener<T> l : listeners) {
+ fireStateChange(newState, l);
+ }
+ return true;
+ }
+ return false;
+ }
+ }
+
+ protected abstract T result();
+ protected abstract Throwable error();
+
+ private void fireStateChange(State s, Listener<T> l) {
+ switch (s) {
+ case SENT:
+ break;
+ case QUEUED:
+ l.onJobQueued(this);
+ break;
+ case STARTED:
+ l.onJobStarted(this);
+ break;
+ case CANCELLED:
+ l.onJobCancelled(this);
+ break;
+ case FAILED:
+ l.onJobFailed(this, error());
+ break;
+ case SUCCEEDED:
+ try {
+ l.onJobSucceeded(this, result());
+ } catch (Exception e) {
+ // Shouldn't really happen.
+ throw new IllegalStateException(e);
+ }
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/client-common/src/main/java/org/apache/livy/client/common/BufferUtils.java
----------------------------------------------------------------------
diff --git a/client-common/src/main/java/org/apache/livy/client/common/BufferUtils.java b/client-common/src/main/java/org/apache/livy/client/common/BufferUtils.java
new file mode 100644
index 0000000..55f434d
--- /dev/null
+++ b/client-common/src/main/java/org/apache/livy/client/common/BufferUtils.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.client.common;
+
+import java.nio.ByteBuffer;
+
+import org.apache.livy.annotations.Private;
+
+/**
+ * Utility methods for dealing with byte buffers and byte arrays.
+ */
+@Private
+public class BufferUtils {
+
+ public static byte[] toByteArray(ByteBuffer buf) {
+ byte[] bytes;
+ if (buf.hasArray() && buf.arrayOffset() == 0 &&
+ buf.remaining() == buf.array().length) {
+ bytes = buf.array();
+ } else {
+ bytes = new byte[buf.remaining()];
+ buf.get(bytes);
+ }
+ return bytes;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/client-common/src/main/java/org/apache/livy/client/common/ClientConf.java
----------------------------------------------------------------------
diff --git a/client-common/src/main/java/org/apache/livy/client/common/ClientConf.java b/client-common/src/main/java/org/apache/livy/client/common/ClientConf.java
new file mode 100644
index 0000000..52fcb6b
--- /dev/null
+++ b/client-common/src/main/java/org/apache/livy/client/common/ClientConf.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.client.common;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.livy.annotations.Private;
+
+/**
+ * Base class with common functionality for type-safe configuration objects.
+ */
+@Private
+public abstract class ClientConf<T extends ClientConf>
+ implements Iterable<Map.Entry<String, String>> {
+
+ protected Logger LOG = LoggerFactory.getLogger(getClass());
+
+ public static interface ConfEntry {
+
+ /** The key in the configuration file. */
+ String key();
+
+ /**
+ * The default value, which also defines the type of the config. Supported types:
+ * Boolean, Integer, Long, String. <code>null</code> maps to String.
+ */
+ Object dflt();
+
+ }
+
+ private static final Map<String, TimeUnit> TIME_SUFFIXES;
+
+ public static final boolean TEST_MODE = Boolean.parseBoolean(System.getenv("LIVY_TEST"));
+
+ static {
+ TIME_SUFFIXES = new HashMap<>();
+ TIME_SUFFIXES.put("us", TimeUnit.MICROSECONDS);
+ TIME_SUFFIXES.put("ms", TimeUnit.MILLISECONDS);
+ TIME_SUFFIXES.put("s", TimeUnit.SECONDS);
+ TIME_SUFFIXES.put("m", TimeUnit.MINUTES);
+ TIME_SUFFIXES.put("min", TimeUnit.MINUTES);
+ TIME_SUFFIXES.put("h", TimeUnit.HOURS);
+ TIME_SUFFIXES.put("d", TimeUnit.DAYS);
+ }
+
+ protected final ConcurrentMap<String, String> config;
+
+ protected ClientConf(Properties config) {
+ this.config = new ConcurrentHashMap<>();
+ if (config != null) {
+ for (String key : config.stringPropertyNames()) {
+ logDeprecationWarning(key);
+ this.config.put(key, config.getProperty(key));
+ }
+ }
+ }
+
+ public String get(String key) {
+ String val = config.get(key);
+ if (val != null) {
+ return val;
+ }
+ DeprecatedConf depConf = getConfigsWithAlternatives().get(key);
+ if (depConf != null) {
+ return config.get(depConf.key());
+ } else {
+ return val;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public T set(String key, String value) {
+ logDeprecationWarning(key);
+ config.put(key, value);
+ return (T) this;
+ }
+
+ @SuppressWarnings("unchecked")
+ public T setIfMissing(String key, String value) {
+ if (config.putIfAbsent(key, value) == null) {
+ logDeprecationWarning(key);
+ }
+ return (T) this;
+ }
+
+ @SuppressWarnings("unchecked")
+ public T setAll(ClientConf<?> other) {
+ for (Map.Entry<String, String> e : other) {
+ set(e.getKey(), e.getValue());
+ }
+ return (T) this;
+ }
+
+ public String get(ConfEntry e) {
+ Object value = get(e, String.class);
+ return (String) (value != null ? value : e.dflt());
+ }
+
+ public boolean getBoolean(ConfEntry e) {
+ String val = get(e, Boolean.class);
+ if (val != null) {
+ return Boolean.parseBoolean(val);
+ } else {
+ return (Boolean) e.dflt();
+ }
+ }
+
+ public int getInt(ConfEntry e) {
+ String val = get(e, Integer.class);
+ if (val != null) {
+ return Integer.parseInt(val);
+ } else {
+ return (Integer) e.dflt();
+ }
+ }
+
+ public long getLong(ConfEntry e) {
+ String val = get(e, Long.class);
+ if (val != null) {
+ return Long.parseLong(val);
+ } else {
+ return (Long) e.dflt();
+ }
+ }
+
+ public long getTimeAsMs(ConfEntry e) {
+ String time = get(e, String.class);
+ if (time == null) {
+ check(e.dflt() != null,
+ "ConfEntry %s doesn't have a default value, cannot convert to time value.", e.key());
+ time = (String) e.dflt();
+ }
+
+ Matcher m = Pattern.compile("(-?[0-9]+)([a-z]+)?").matcher(time.toLowerCase());
+ if (!m.matches()) {
+ throw new IllegalArgumentException("Invalid time string: " + time);
+ }
+
+ long val = Long.parseLong(m.group(1));
+ String suffix = m.group(2);
+
+ if (suffix != null && !TIME_SUFFIXES.containsKey(suffix)) {
+ throw new IllegalArgumentException("Invalid suffix: \"" + suffix + "\"");
+ }
+
+ return TimeUnit.MILLISECONDS.convert(val,
+ suffix != null ? TIME_SUFFIXES.get(suffix) : TimeUnit.MILLISECONDS);
+ }
+
+ @SuppressWarnings("unchecked")
+ public T set(ConfEntry e, Object value) {
+ check(typesMatch(value, e.dflt()), "Value doesn't match configuration entry type for %s.",
+ e.key());
+ if (value == null) {
+ config.remove(e.key());
+ } else {
+ logDeprecationWarning(e.key());
+ config.put(e.key(), value.toString());
+ }
+ return (T) this;
+ }
+
+ @Override
+ public Iterator<Map.Entry<String, String>> iterator() {
+ return config.entrySet().iterator();
+ }
+
+ private String get(ConfEntry e, Class<?> requestedType) {
+ check(getType(e.dflt()).equals(requestedType), "Invalid type conversion requested for %s.",
+ e.key());
+ return this.get(e.key());
+ }
+
+ private boolean typesMatch(Object test, Object expected) {
+ return test == null || getType(test).equals(getType(expected));
+ }
+
+ private Class<?> getType(Object o) {
+ return (o != null) ? o.getClass() : String.class;
+ }
+
+ private void check(boolean test, String message, Object... args) {
+ if (!test) {
+ throw new IllegalArgumentException(String.format(message, args));
+ }
+ }
+
+ /** Logs a warning message if the given config key is deprecated. */
+ private void logDeprecationWarning(String key) {
+ DeprecatedConf altConfs = getConfigsWithAlternatives().get(key);
+ if (altConfs != null) {
+ LOG.warn("The configuration key " + altConfs.key() + " has been deprecated as of Livy "
+ + altConfs.version() + " and may be removed in the future. Please use the new key "
+ + key + " instead.");
+ return;
+ }
+
+ DeprecatedConf depConfs = getDeprecatedConfigs().get(key);
+ if (depConfs != null) {
+ LOG.warn("The configuration key " + depConfs.key() + " has been deprecated as of Livy "
+ + depConfs.version() + " and may be removed in the future. "
+ + depConfs.deprecationMessage());
+ }
+ }
+
+ /** Maps valid key to DeprecatedConf with the deprecated key. */
+ protected abstract Map<String, DeprecatedConf> getConfigsWithAlternatives();
+
+ /** Maps deprecated key to DeprecatedConf with the same key. */
+ protected abstract Map<String, DeprecatedConf> getDeprecatedConfigs();
+
+ public static interface DeprecatedConf {
+
+ /** The key in the configuration file. */
+ String key();
+
+ /** The Livy version in which the key was deprecated. */
+ String version();
+
+ /** Message to include in the deprecation warning for configs without alternatives */
+ String deprecationMessage();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
----------------------------------------------------------------------
diff --git a/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java b/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
new file mode 100644
index 0000000..99ce900
--- /dev/null
+++ b/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.client.common;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.livy.JobHandle.State;
+import org.apache.livy.annotations.Private;
+
+/**
+ * There are the Java representations of the JSON messages used by the client protocol.
+ *
+ * Note that Jackson requires an empty constructor (or annotations) to be able to instantiate
+ * types, so the extra noise is necessary here.
+ */
+@Private
+public class HttpMessages {
+
+ public static interface ClientMessage {
+
+ }
+
+ public static class CreateClientRequest implements ClientMessage {
+
+ public final Map<String, String> conf;
+
+ public CreateClientRequest(Map<String, String> conf) {
+ this.conf = conf;
+ }
+
+ private CreateClientRequest() {
+ this(null);
+ }
+
+ }
+
+ public static class SessionInfo implements ClientMessage {
+
+ public final int id;
+ public final String appId;
+ public final String owner;
+ public final String proxyUser;
+ public final String state;
+ public final String kind;
+ public final Map<String, String> appInfo;
+ public final List<String> log;
+
+ public SessionInfo(int id, String appId, String owner, String proxyUser, String state,
+ String kind, Map<String, String> appInfo, List<String> log) {
+ this.id = id;
+ this.appId = appId;
+ this.owner = owner;
+ this.proxyUser = proxyUser;
+ this.state = state;
+ this.kind = kind;
+ this.appInfo = appInfo;
+ this.log = log;
+ }
+
+ private SessionInfo() {
+ this(-1, null, null, null, null, null, null, null);
+ }
+
+ }
+
+ public static class SerializedJob implements ClientMessage {
+
+ public final byte[] job;
+
+ public SerializedJob(byte[] job) {
+ this.job = job;
+ }
+
+ private SerializedJob() {
+ this(null);
+ }
+
+ }
+
+ public static class AddResource implements ClientMessage {
+
+ public final String uri;
+
+ public AddResource(String uri) {
+ this.uri = uri;
+ }
+
+ private AddResource() {
+ this(null);
+ }
+
+ }
+
+ public static class JobStatus implements ClientMessage {
+
+ public final long id;
+ public final State state;
+ public final byte[] result;
+ public final String error;
+
+ public JobStatus(long id, State state, byte[] result, String error) {
+ this.id = id;
+ this.state = state;
+ this.error = error;
+
+ // json4s, at least, seems confused about whether a "null" in the JSON payload should
+ // become a null array or a byte array with length 0. Since there shouldn't be any
+ // valid serialized object in a byte array of size 0, translate that to null.
+ this.result = (result != null && result.length > 0) ? result : null;
+
+ if (this.result != null && state != State.SUCCEEDED) {
+ throw new IllegalArgumentException("Result cannot be set unless job succeeded.");
+ }
+ // The check for "result" is not completely correct, but is here to make the unit tests work.
+ if (this.result == null && error != null && state != State.FAILED) {
+ throw new IllegalArgumentException("Error cannot be set unless job failed.");
+ }
+ }
+
+ private JobStatus() {
+ this(-1, null, null, null);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/client-common/src/main/java/org/apache/livy/client/common/Serializer.java
----------------------------------------------------------------------
diff --git a/client-common/src/main/java/org/apache/livy/client/common/Serializer.java b/client-common/src/main/java/org/apache/livy/client/common/Serializer.java
new file mode 100644
index 0000000..3ea3f56
--- /dev/null
+++ b/client-common/src/main/java/org/apache/livy/client/common/Serializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.client.common;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.shaded.org.objenesis.strategy.StdInstantiatorStrategy;
+
+import org.apache.livy.annotations.Private;
+
+/**
+ * Utility class to serialize user data using Kryo.
+ */
+@Private
+public class Serializer {
+
+ // Kryo docs say 0-8 are taken. Strange things happen if you don't set an ID when registering
+ // classes.
+ private static final int REG_ID_BASE = 16;
+
+ private final ThreadLocal<Kryo> kryos;
+
+ public Serializer(final Class<?>... klasses) {
+ this.kryos = new ThreadLocal<Kryo>() {
+ @Override
+ protected Kryo initialValue() {
+ Kryo kryo = new Kryo();
+ int count = 0;
+ for (Class<?> klass : klasses) {
+ kryo.register(klass, REG_ID_BASE + count);
+ count++;
+ }
+ kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
+ kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+ return kryo;
+ }
+ };
+ }
+
+ public Object deserialize(ByteBuffer data) {
+ byte[] b = new byte[data.remaining()];
+ data.get(b);
+ Input kryoIn = new Input(b);
+ return kryos.get().readClassAndObject(kryoIn);
+ }
+
+ public ByteBuffer serialize(Object data) {
+ ByteBufferOutputStream out = new ByteBufferOutputStream();
+ Output kryoOut = new Output(out);
+ kryos.get().writeClassAndObject(kryoOut, data);
+ kryoOut.flush();
+ return out.getBuffer();
+ }
+
+ private static class ByteBufferOutputStream extends ByteArrayOutputStream {
+
+ public ByteBuffer getBuffer() {
+ ByteBuffer result = ByteBuffer.wrap(buf, 0, count);
+ buf = null;
+ reset();
+ return result;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/client-common/src/main/java/org/apache/livy/client/common/TestUtils.java
----------------------------------------------------------------------
diff --git a/client-common/src/main/java/org/apache/livy/client/common/TestUtils.java b/client-common/src/main/java/org/apache/livy/client/common/TestUtils.java
new file mode 100644
index 0000000..e1e450c
--- /dev/null
+++ b/client-common/src/main/java/org/apache/livy/client/common/TestUtils.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.client.common;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.livy.annotations.Private;
+
+/**
+ * Utility methods used by Livy tests.
+ */
+@Private
+public class TestUtils {
+
+ /**
+ * Returns JVM arguments that enable jacoco on a process to be run. The returned arguments
+ * create a new, unique output file in the same directory referenced by the "jacoco.args"
+ * system property.
+ *
+ * @return JVM arguments, or null.
+ */
+ public static String getJacocoArgs() {
+ String jacocoArgs = System.getProperty("jacoco.args");
+ if (jacocoArgs == null) {
+ return null;
+ }
+
+ Pattern p = Pattern.compile("(.+?destfile=)(.+?)(,.+)?");
+ Matcher m = p.matcher(jacocoArgs);
+ if (!m.matches()) {
+ return null;
+ }
+
+ String fileName = new File(m.group(2)).getName();
+ File outputDir = new File(m.group(2)).getParentFile();
+
+ File newFile;
+ while (true) {
+ int newId = outputDir.list().length;
+ newFile = new File(outputDir, "jacoco-" + newId + ".exec");
+ try {
+ Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW).close();
+ break;
+ } catch (IOException ioe) {
+ // Try again.
+ }
+ }
+
+ StringBuilder newArgs = new StringBuilder();
+ newArgs.append(m.group(1));
+ newArgs.append(newFile.getAbsolutePath());
+ if (m.group(3) != null) {
+ newArgs.append(m.group(3));
+ }
+
+ return newArgs.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/client-common/src/test/java/com/cloudera/livy/client/common/TestAbstractJobHandle.java
----------------------------------------------------------------------
diff --git a/client-common/src/test/java/com/cloudera/livy/client/common/TestAbstractJobHandle.java b/client-common/src/test/java/com/cloudera/livy/client/common/TestAbstractJobHandle.java
deleted file mode 100644
index 1b48f07..0000000
--- a/client-common/src/test/java/com/cloudera/livy/client/common/TestAbstractJobHandle.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.client.common;
-
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Test;
-import org.mockito.InOrder;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-import com.cloudera.livy.JobHandle;
-import com.cloudera.livy.JobHandle.State;
-
-public class TestAbstractJobHandle {
-
- @Test
- public void testJobHandle() {
- AbstractJobHandle<Void> handle = new TestJobHandle();
-
- assertTrue(handle.changeState(State.QUEUED));
- assertEquals(State.QUEUED, handle.getState());
-
- @SuppressWarnings("unchecked")
- JobHandle.Listener<Void> l1 = mock(JobHandle.Listener.class);
- handle.addListener(l1);
- verify(l1).onJobQueued(handle);
-
- assertTrue(handle.changeState(State.STARTED));
- verify(l1).onJobStarted(handle);
-
- assertTrue(handle.changeState(State.SUCCEEDED));
- verify(l1).onJobSucceeded(handle, null);
-
- assertFalse(handle.changeState(State.CANCELLED));
- }
-
- private static class TestJobHandle extends AbstractJobHandle<Void> {
-
- @Override
- protected Void result() {
- return null;
- }
-
- @Override
- protected Throwable error() {
- return null;
- }
-
- @Override
- public Void get() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Void get(long l, TimeUnit t) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isDone() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isCancelled() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean cancel(boolean b) {
- throw new UnsupportedOperationException();
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/client-common/src/test/java/com/cloudera/livy/client/common/TestBufferUtils.java
----------------------------------------------------------------------
diff --git a/client-common/src/test/java/com/cloudera/livy/client/common/TestBufferUtils.java b/client-common/src/test/java/com/cloudera/livy/client/common/TestBufferUtils.java
deleted file mode 100644
index 725d506..0000000
--- a/client-common/src/test/java/com/cloudera/livy/client/common/TestBufferUtils.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.client.common;
-
-import java.nio.ByteBuffer;
-
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-public class TestBufferUtils {
-
- @Test
- public void testWrappedArray() {
- byte[] array = new byte[] { 0x1b, 0x2b };
- byte[] unwrapped = BufferUtils.toByteArray(ByteBuffer.wrap(array));
- assertSame(array, unwrapped);
- }
-
- @Test
- public void testShortArray() {
- byte[] array = new byte[] { 0x1b, 0x2b };
- byte[] unwrapped = BufferUtils.toByteArray(ByteBuffer.wrap(array, 0, 1));
- assertNotSame(array, unwrapped);
- assertEquals(1, unwrapped.length);
- }
-
- @Test
- public void testOffsetArray() {
- byte[] array = new byte[] { 0x1b, 0x2b };
- byte[] unwrapped = BufferUtils.toByteArray(ByteBuffer.wrap(array, 1, 1));
- assertNotSame(array, unwrapped);
- assertEquals(1, unwrapped.length);
- }
-
- @Test
- public void testDirectBuffer() {
- ByteBuffer direct = ByteBuffer.allocateDirect(1);
- direct.put((byte) 0x1b);
- assertFalse(direct.hasArray());
- direct.flip();
-
- byte[] unwrapped = BufferUtils.toByteArray(direct);
- assertEquals(0x1b, unwrapped[0]);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/client-common/src/test/java/com/cloudera/livy/client/common/TestClientConf.java
----------------------------------------------------------------------
diff --git a/client-common/src/test/java/com/cloudera/livy/client/common/TestClientConf.java b/client-common/src/test/java/com/cloudera/livy/client/common/TestClientConf.java
deleted file mode 100644
index f00c996..0000000
--- a/client-common/src/test/java/com/cloudera/livy/client/common/TestClientConf.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.client.common;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-public class TestClientConf {
-
- @Test
- public void testTypes() {
- TestConf conf = new TestConf(null);
-
- assertNull(conf.get(TestConf.Entry.NULL));
- assertEquals("default", conf.get(TestConf.Entry.STRING));
- assertEquals(false, conf.getBoolean(TestConf.Entry.BOOLEAN));
- assertEquals(42, conf.getInt(TestConf.Entry.INT));
- assertEquals(84L, conf.getLong(TestConf.Entry.LONG));
- assertEquals(168L, conf.getTimeAsMs(TestConf.Entry.TIME));
-
- try {
- conf.get(TestConf.Entry.INT);
- fail("Should have failed to retrieve int as string.");
- } catch (IllegalArgumentException ie) {
- // Expected.
- }
-
- conf.set(TestConf.Entry.INT, 336);
- assertEquals(336, conf.getInt(TestConf.Entry.INT));
-
- try {
- conf.set(TestConf.Entry.INT, "abcde");
- fail("Should have failed to set int as string.");
- } catch (IllegalArgumentException ie) {
- // Expected.
- }
-
- conf.set(TestConf.Entry.STRING, "aString");
- assertEquals("aString", conf.get(TestConf.Entry.STRING));
-
- conf.set(TestConf.Entry.BOOLEAN, true);
- assertEquals(true, conf.getBoolean(TestConf.Entry.BOOLEAN));
-
- conf.set(TestConf.Entry.LONG, 42L);
- assertEquals(42L, conf.getLong(TestConf.Entry.LONG));
-
- conf.set(TestConf.Entry.LONG, null);
- assertEquals(84L, conf.getLong(TestConf.Entry.LONG));
-
- conf.set(TestConf.Entry.TIME_NO_DEFAULT, "100");
- assertEquals(100L, conf.getTimeAsMs(TestConf.Entry.TIME_NO_DEFAULT));
- }
-
- @Test
- public void testRawProperties() {
- Properties dflt = new Properties();
- dflt.put("key1", "val1");
- dflt.put("key2", "val2");
-
- TestConf conf = new TestConf(dflt);
- conf.set("key2", "anotherVal");
-
- assertEquals("val1", conf.get("key1"));
- assertEquals("anotherVal", conf.get("key2"));
-
- conf.setIfMissing("key2", "yetAnotherVal");
- assertEquals("anotherVal", conf.get("key2"));
-
- conf.setIfMissing("key3", "val3");
- assertEquals("val3", conf.get("key3"));
-
- int count = 0;
- for (Map.Entry<String, String> e : conf) {
- count++;
- }
- assertEquals(3, count);
-
- TestConf newProps = new TestConf(null);
- newProps.set("key4", "val4");
- newProps.set("key5", "val5");
- conf.setAll(newProps);
- assertEquals("val4", conf.get("key4"));
- assertEquals("val5", conf.get("key5"));
- }
-
- @Test(expected=IllegalArgumentException.class)
- public void testInvalidTime() {
- TestConf conf = new TestConf(null);
- conf.set(TestConf.Entry.TIME, "invalid");
- conf.getTimeAsMs(TestConf.Entry.TIME);
- }
-
- @Test(expected=IllegalArgumentException.class)
- public void testInvalidTimeSuffix() {
- TestConf conf = new TestConf(null);
- conf.set(TestConf.Entry.TIME, "100foo");
- conf.getTimeAsMs(TestConf.Entry.TIME);
- }
-
- @Test(expected=IllegalArgumentException.class)
- public void testTimeWithoutDefault() {
- TestConf conf = new TestConf(null);
- conf.getTimeAsMs(TestConf.Entry.TIME_NO_DEFAULT);
- }
-
-
- @Test
- public void testDeprecation() {
- TestConf conf = new TestConf(null);
-
- assertNull(conf.get("depKey"));
- assertNull(conf.get("dep_alt"));
- assertNull(conf.get("new-key"));
- assertEquals("value", conf.get(TestConf.Entry.NEW_CONF));
-
- TestConf depProps = new TestConf(null);
- depProps.set("depKey", "dep-val");
- depProps.set("dep_alt", "alt-val");
- conf.setAll(depProps);
- assertEquals("dep-val", conf.get("depKey"));
- assertEquals("alt-val", conf.get("dep_alt"));
- assertEquals("alt-val", conf.get(TestConf.Entry.NEW_CONF));
- assertEquals("alt-val", conf.get("new-key"));
-
- conf.set("new-key", "new-val");
- assertEquals("new-val", conf.get(TestConf.Entry.NEW_CONF));
- assertEquals("alt-val", conf.get("dep_alt"));
- assertEquals("new-val", conf.get("new-key"));
- }
-
- private static class TestConf extends ClientConf<TestConf> {
-
- static enum Entry implements ConfEntry {
- NULL("null", null),
- STRING("string", "default"),
- BOOLEAN("bool", false),
- INT("int", 42),
- LONG("long", 84L),
- TIME("time", "168ms"),
- TIME_NO_DEFAULT("time2", null),
- NEW_CONF("new-key", "value");
-
- private final String key;
- private final Object dflt;
-
- private Entry(String key, Object dflt) {
- this.key = key;
- this.dflt = dflt;
- }
-
- @Override
- public String key() { return key; }
-
- @Override
- public Object dflt() { return dflt; }
-
- }
-
- TestConf(Properties p) {
- super(p);
- }
-
- private static final Map<String, DeprecatedConf> configsWithAlternatives
- = Collections.unmodifiableMap(new HashMap<String, DeprecatedConf>() {{
- put(TestConf.Entry.NEW_CONF.key, DepConf.DEP_WITH_ALT);
- }});
-
- private static final Map<String, DeprecatedConf> deprecatedConfigs
- = Collections.unmodifiableMap(new HashMap<String, DeprecatedConf>() {{
- put(DepConf.DEP_NO_ALT.key, DepConf.DEP_NO_ALT);
- }});
-
- protected Map<String, DeprecatedConf> getConfigsWithAlternatives() {
- return configsWithAlternatives;
- }
-
- protected Map<String, DeprecatedConf> getDeprecatedConfigs() {
- return deprecatedConfigs;
- }
-
- static enum DepConf implements DeprecatedConf {
- DEP_WITH_ALT("dep_alt", "0.4"),
- DEP_NO_ALT("depKey", "1.0");
-
- private final String key;
- private final String version;
- private final String deprecationMessage;
-
- private DepConf(String key, String version) {
- this(key, version, "");
- }
-
- private DepConf(String key, String version, String deprecationMessage) {
- this.key = key;
- this.version = version;
- this.deprecationMessage = deprecationMessage;
- }
-
- @Override
- public String key() { return key; }
-
- @Override
- public String version() { return version; }
-
- @Override
- public String deprecationMessage() { return deprecationMessage; }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/client-common/src/test/java/com/cloudera/livy/client/common/TestHttpMessages.java
----------------------------------------------------------------------
diff --git a/client-common/src/test/java/com/cloudera/livy/client/common/TestHttpMessages.java b/client-common/src/test/java/com/cloudera/livy/client/common/TestHttpMessages.java
deleted file mode 100644
index c6adeca..0000000
--- a/client-common/src/test/java/com/cloudera/livy/client/common/TestHttpMessages.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.client.common;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Field;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-import com.cloudera.livy.JobHandle.State;
-
-public class TestHttpMessages {
-
- /**
- * Tests that all defined messages can be serialized and deserialized using Jackson.
- */
- @Test
- public void testMessageSerialization() throws Exception {
- ObjectMapper mapper = new ObjectMapper();
-
- for (Class<?> msg : HttpMessages.class.getClasses()) {
- if (msg.isInterface()) {
- continue;
- }
-
- String name = msg.getSimpleName();
-
- Constructor c = msg.getConstructors()[0];
- Object[] params = new Object[c.getParameterTypes().length];
- Type[] genericTypes = c.getGenericParameterTypes();
- for (int i = 0; i < params.length; i++) {
- params[i] = dummyValue(c.getParameterTypes()[i], genericTypes[i]);
- }
-
- Object o1 = c.newInstance(params);
- byte[] serialized = mapper.writeValueAsBytes(o1);
- Object o2 = mapper.readValue(serialized, msg);
-
- assertNotNull("could not deserialize " + name, o2);
- for (Field f : msg.getFields()) {
- checkEquals(name, f, o1, o2);
- }
- }
-
- }
-
- @Test(expected=IllegalArgumentException.class)
- public void testJobStatusResultBadState() {
- new HttpMessages.JobStatus(0L, State.QUEUED, new byte[1], null);
- }
-
- @Test(expected=IllegalArgumentException.class)
- public void testJobStatusErrorBadState() {
- new HttpMessages.JobStatus(0L, State.QUEUED, null, "An Error");
- }
-
- private Object dummyValue(Class<?> klass, Type type) {
- switch (klass.getSimpleName()) {
- case "int":
- case "Integer":
- return 42;
- case "long": return 84L;
- case "byte[]": return new byte[] { (byte) 0x42, (byte) 0x84 };
- case "String": return "test";
- case "State": return State.SUCCEEDED;
- case "Map":
- Map<String, String> map = new HashMap<>();
- map.put("dummy1", "dummy2");
- return map;
- case "List":
- Class<?> genericType = getGenericArgType(type);
- return Arrays.asList(dummyValue(genericType, null), dummyValue(genericType, null));
- default: throw new IllegalArgumentException("FIX ME: " + klass.getSimpleName());
- }
- }
-
- private Class<?> getGenericArgType(Type type) {
- assertNotNull("FIX ME: null type argument.", type);
-
- ParameterizedType ptype = (ParameterizedType) type;
- assertEquals("FIX ME: no support for multiple type arguments.",
- 1, ptype.getActualTypeArguments().length);
-
- Type argType = ptype.getActualTypeArguments()[0];
- assertTrue("FIX ME: type argument is not a class.", argType instanceof Class);
-
- return (Class<?>) argType;
- }
-
- private void checkEquals(String name, Field f, Object o1, Object o2) throws Exception {
- Object v1 = f.get(o1);
- Object v2 = f.get(o2);
-
- boolean match;
- if (!f.getType().isArray()) {
- match = v1.equals(v2);
- } else if (v1 instanceof byte[]) {
- match = Arrays.equals((byte[]) v1, (byte[]) v2);
- } else {
- throw new IllegalArgumentException("FIX ME: " + f.getType().getSimpleName());
- }
-
- assertTrue(
- String.format("Field %s of %s does not match after deserialization.", f.getName(), name),
- match);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/client-common/src/test/java/com/cloudera/livy/client/common/TestSerializer.java
----------------------------------------------------------------------
diff --git a/client-common/src/test/java/com/cloudera/livy/client/common/TestSerializer.java b/client-common/src/test/java/com/cloudera/livy/client/common/TestSerializer.java
deleted file mode 100644
index 76fade1..0000000
--- a/client-common/src/test/java/com/cloudera/livy/client/common/TestSerializer.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.client.common;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-public class TestSerializer {
-
- private static final String MESSAGE = "Hello World!";
-
- @Test
- public void testSerializer() throws Exception {
- Object decoded = doSerDe(MESSAGE);
- assertEquals(MESSAGE, decoded);
- }
-
- @Test
- public void testUnicodeSerializer() throws Exception {
- StringBuilder builder = new StringBuilder();
- for (int x = 0; x < 5000; x++) {
- builder.append("\u263A");
- }
- String testMessage = builder.toString();
- Object decoded = doSerDe(testMessage);
- assertEquals(testMessage, decoded);
- }
-
- @Test
- public void testAutoRegistration() throws Exception {
- Object decoded = doSerDe(new TestMessage(MESSAGE), TestMessage.class);
- assertTrue(decoded instanceof TestMessage);
- assertEquals(MESSAGE, ((TestMessage)decoded).data);
- }
-
- private Object doSerDe(Object data, Class<?>... klasses) {
- Serializer s = new Serializer(klasses);
- ByteBuffer serialized = s.serialize(data);
- return s.deserialize(serialized);
- }
-
- private ByteBuffer newBuffer() {
- return ByteBuffer.allocate(1024);
- }
-
- private static class TestMessage {
- final String data;
-
- TestMessage() {
- this(null);
- }
-
- TestMessage(String data) {
- this.data = data;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/client-common/src/test/java/com/cloudera/livy/client/common/TestTestUtils.java
----------------------------------------------------------------------
diff --git a/client-common/src/test/java/com/cloudera/livy/client/common/TestTestUtils.java b/client-common/src/test/java/com/cloudera/livy/client/common/TestTestUtils.java
deleted file mode 100644
index de1bf93..0000000
--- a/client-common/src/test/java/com/cloudera/livy/client/common/TestTestUtils.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.client.common;
-
-import java.nio.ByteBuffer;
-
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-public class TestTestUtils {
-
- @Test
- public void testJacocoArgs() {
- String args1 = TestUtils.getJacocoArgs();
- String expected1 = System.getProperty("jacoco.args").replace("main.exec", "jacoco-1.exec");
- assertEquals(expected1, args1);
-
- String args2 = TestUtils.getJacocoArgs();
- String expected2 = System.getProperty("jacoco.args").replace("main.exec", "jacoco-2.exec");
- assertEquals(expected2, args2);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/client-common/src/test/java/org/apache/livy/client/common/TestAbstractJobHandle.java
----------------------------------------------------------------------
diff --git a/client-common/src/test/java/org/apache/livy/client/common/TestAbstractJobHandle.java b/client-common/src/test/java/org/apache/livy/client/common/TestAbstractJobHandle.java
new file mode 100644
index 0000000..6703147
--- /dev/null
+++ b/client-common/src/test/java/org/apache/livy/client/common/TestAbstractJobHandle.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.client.common;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.mockito.InOrder;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.livy.JobHandle;
+import org.apache.livy.JobHandle.State;
+
+public class TestAbstractJobHandle {
+
+ @Test
+ public void testJobHandle() {
+ AbstractJobHandle<Void> handle = new TestJobHandle();
+
+ assertTrue(handle.changeState(State.QUEUED));
+ assertEquals(State.QUEUED, handle.getState());
+
+ @SuppressWarnings("unchecked")
+ JobHandle.Listener<Void> l1 = mock(JobHandle.Listener.class);
+ handle.addListener(l1);
+ verify(l1).onJobQueued(handle);
+
+ assertTrue(handle.changeState(State.STARTED));
+ verify(l1).onJobStarted(handle);
+
+ assertTrue(handle.changeState(State.SUCCEEDED));
+ verify(l1).onJobSucceeded(handle, null);
+
+ assertFalse(handle.changeState(State.CANCELLED));
+ }
+
+ private static class TestJobHandle extends AbstractJobHandle<Void> {
+
+ @Override
+ protected Void result() {
+ return null;
+ }
+
+ @Override
+ protected Throwable error() {
+ return null;
+ }
+
+ @Override
+ public Void get() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Void get(long l, TimeUnit t) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isDone() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isCancelled() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean cancel(boolean b) {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/client-common/src/test/java/org/apache/livy/client/common/TestBufferUtils.java
----------------------------------------------------------------------
diff --git a/client-common/src/test/java/org/apache/livy/client/common/TestBufferUtils.java b/client-common/src/test/java/org/apache/livy/client/common/TestBufferUtils.java
new file mode 100644
index 0000000..b41111a
--- /dev/null
+++ b/client-common/src/test/java/org/apache/livy/client/common/TestBufferUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.client.common;
+
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestBufferUtils {
+
+ @Test
+ public void testWrappedArray() {
+ byte[] array = new byte[] { 0x1b, 0x2b };
+ byte[] unwrapped = BufferUtils.toByteArray(ByteBuffer.wrap(array));
+ assertSame(array, unwrapped);
+ }
+
+ @Test
+ public void testShortArray() {
+ byte[] array = new byte[] { 0x1b, 0x2b };
+ byte[] unwrapped = BufferUtils.toByteArray(ByteBuffer.wrap(array, 0, 1));
+ assertNotSame(array, unwrapped);
+ assertEquals(1, unwrapped.length);
+ }
+
+ @Test
+ public void testOffsetArray() {
+ byte[] array = new byte[] { 0x1b, 0x2b };
+ byte[] unwrapped = BufferUtils.toByteArray(ByteBuffer.wrap(array, 1, 1));
+ assertNotSame(array, unwrapped);
+ assertEquals(1, unwrapped.length);
+ }
+
+ @Test
+ public void testDirectBuffer() {
+ ByteBuffer direct = ByteBuffer.allocateDirect(1);
+ direct.put((byte) 0x1b);
+ assertFalse(direct.hasArray());
+ direct.flip();
+
+ byte[] unwrapped = BufferUtils.toByteArray(direct);
+ assertEquals(0x1b, unwrapped[0]);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/client-common/src/test/java/org/apache/livy/client/common/TestClientConf.java
----------------------------------------------------------------------
diff --git a/client-common/src/test/java/org/apache/livy/client/common/TestClientConf.java b/client-common/src/test/java/org/apache/livy/client/common/TestClientConf.java
new file mode 100644
index 0000000..02bbaaa
--- /dev/null
+++ b/client-common/src/test/java/org/apache/livy/client/common/TestClientConf.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.client.common;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestClientConf {
+
+ @Test
+ public void testTypes() {
+ TestConf conf = new TestConf(null);
+
+ assertNull(conf.get(TestConf.Entry.NULL));
+ assertEquals("default", conf.get(TestConf.Entry.STRING));
+ assertEquals(false, conf.getBoolean(TestConf.Entry.BOOLEAN));
+ assertEquals(42, conf.getInt(TestConf.Entry.INT));
+ assertEquals(84L, conf.getLong(TestConf.Entry.LONG));
+ assertEquals(168L, conf.getTimeAsMs(TestConf.Entry.TIME));
+
+ try {
+ conf.get(TestConf.Entry.INT);
+ fail("Should have failed to retrieve int as string.");
+ } catch (IllegalArgumentException ie) {
+ // Expected.
+ }
+
+ conf.set(TestConf.Entry.INT, 336);
+ assertEquals(336, conf.getInt(TestConf.Entry.INT));
+
+ try {
+ conf.set(TestConf.Entry.INT, "abcde");
+ fail("Should have failed to set int as string.");
+ } catch (IllegalArgumentException ie) {
+ // Expected.
+ }
+
+ conf.set(TestConf.Entry.STRING, "aString");
+ assertEquals("aString", conf.get(TestConf.Entry.STRING));
+
+ conf.set(TestConf.Entry.BOOLEAN, true);
+ assertEquals(true, conf.getBoolean(TestConf.Entry.BOOLEAN));
+
+ conf.set(TestConf.Entry.LONG, 42L);
+ assertEquals(42L, conf.getLong(TestConf.Entry.LONG));
+
+ conf.set(TestConf.Entry.LONG, null);
+ assertEquals(84L, conf.getLong(TestConf.Entry.LONG));
+
+ conf.set(TestConf.Entry.TIME_NO_DEFAULT, "100");
+ assertEquals(100L, conf.getTimeAsMs(TestConf.Entry.TIME_NO_DEFAULT));
+ }
+
+ @Test
+ public void testRawProperties() {
+ Properties dflt = new Properties();
+ dflt.put("key1", "val1");
+ dflt.put("key2", "val2");
+
+ TestConf conf = new TestConf(dflt);
+ conf.set("key2", "anotherVal");
+
+ assertEquals("val1", conf.get("key1"));
+ assertEquals("anotherVal", conf.get("key2"));
+
+ conf.setIfMissing("key2", "yetAnotherVal");
+ assertEquals("anotherVal", conf.get("key2"));
+
+ conf.setIfMissing("key3", "val3");
+ assertEquals("val3", conf.get("key3"));
+
+ int count = 0;
+ for (Map.Entry<String, String> e : conf) {
+ count++;
+ }
+ assertEquals(3, count);
+
+ TestConf newProps = new TestConf(null);
+ newProps.set("key4", "val4");
+ newProps.set("key5", "val5");
+ conf.setAll(newProps);
+ assertEquals("val4", conf.get("key4"));
+ assertEquals("val5", conf.get("key5"));
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testInvalidTime() {
+ TestConf conf = new TestConf(null);
+ conf.set(TestConf.Entry.TIME, "invalid");
+ conf.getTimeAsMs(TestConf.Entry.TIME);
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testInvalidTimeSuffix() {
+ TestConf conf = new TestConf(null);
+ conf.set(TestConf.Entry.TIME, "100foo");
+ conf.getTimeAsMs(TestConf.Entry.TIME);
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testTimeWithoutDefault() {
+ TestConf conf = new TestConf(null);
+ conf.getTimeAsMs(TestConf.Entry.TIME_NO_DEFAULT);
+ }
+
+
+ @Test
+ public void testDeprecation() {
+ TestConf conf = new TestConf(null);
+
+ assertNull(conf.get("depKey"));
+ assertNull(conf.get("dep_alt"));
+ assertNull(conf.get("new-key"));
+ assertEquals("value", conf.get(TestConf.Entry.NEW_CONF));
+
+ TestConf depProps = new TestConf(null);
+ depProps.set("depKey", "dep-val");
+ depProps.set("dep_alt", "alt-val");
+ conf.setAll(depProps);
+ assertEquals("dep-val", conf.get("depKey"));
+ assertEquals("alt-val", conf.get("dep_alt"));
+ assertEquals("alt-val", conf.get(TestConf.Entry.NEW_CONF));
+ assertEquals("alt-val", conf.get("new-key"));
+
+ conf.set("new-key", "new-val");
+ assertEquals("new-val", conf.get(TestConf.Entry.NEW_CONF));
+ assertEquals("alt-val", conf.get("dep_alt"));
+ assertEquals("new-val", conf.get("new-key"));
+ }
+
+ private static class TestConf extends ClientConf<TestConf> {
+
+ static enum Entry implements ConfEntry {
+ NULL("null", null),
+ STRING("string", "default"),
+ BOOLEAN("bool", false),
+ INT("int", 42),
+ LONG("long", 84L),
+ TIME("time", "168ms"),
+ TIME_NO_DEFAULT("time2", null),
+ NEW_CONF("new-key", "value");
+
+ private final String key;
+ private final Object dflt;
+
+ private Entry(String key, Object dflt) {
+ this.key = key;
+ this.dflt = dflt;
+ }
+
+ @Override
+ public String key() { return key; }
+
+ @Override
+ public Object dflt() { return dflt; }
+
+ }
+
+ TestConf(Properties p) {
+ super(p);
+ }
+
+ private static final Map<String, DeprecatedConf> configsWithAlternatives
+ = Collections.unmodifiableMap(new HashMap<String, DeprecatedConf>() {{
+ put(TestConf.Entry.NEW_CONF.key, DepConf.DEP_WITH_ALT);
+ }});
+
+ private static final Map<String, DeprecatedConf> deprecatedConfigs
+ = Collections.unmodifiableMap(new HashMap<String, DeprecatedConf>() {{
+ put(DepConf.DEP_NO_ALT.key, DepConf.DEP_NO_ALT);
+ }});
+
+ protected Map<String, DeprecatedConf> getConfigsWithAlternatives() {
+ return configsWithAlternatives;
+ }
+
+ protected Map<String, DeprecatedConf> getDeprecatedConfigs() {
+ return deprecatedConfigs;
+ }
+
+ static enum DepConf implements DeprecatedConf {
+ DEP_WITH_ALT("dep_alt", "0.4"),
+ DEP_NO_ALT("depKey", "1.0");
+
+ private final String key;
+ private final String version;
+ private final String deprecationMessage;
+
+ private DepConf(String key, String version) {
+ this(key, version, "");
+ }
+
+ private DepConf(String key, String version, String deprecationMessage) {
+ this.key = key;
+ this.version = version;
+ this.deprecationMessage = deprecationMessage;
+ }
+
+ @Override
+ public String key() { return key; }
+
+ @Override
+ public String version() { return version; }
+
+ @Override
+ public String deprecationMessage() { return deprecationMessage; }
+ }
+ }
+
+}