You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/12 22:59:58 UTC
[16/28] Making maven site works.
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/state/MessageCallback.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/state/MessageCallback.java b/twill-core/src/main/java/org/apache/twill/internal/state/MessageCallback.java
new file mode 100644
index 0000000..f94eaa3
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/state/MessageCallback.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.state;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ *
+ */
+public interface MessageCallback {
+
+ /**
+ * Called when a message is received.
+ * @param message Message being received.
+ * @return A {@link ListenableFuture} that would be completed when message processing is completed or failed.
+ * The result of the future should be the input message Id if succeeded.
+ */
+ ListenableFuture<String> onReceived(String messageId, Message message);
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/state/MessageCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/state/MessageCodec.java b/twill-core/src/main/java/org/apache/twill/internal/state/MessageCodec.java
new file mode 100644
index 0000000..176f620
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/state/MessageCodec.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.state;
+
+import org.apache.twill.api.Command;
+import com.google.common.base.Charsets;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+import java.lang.reflect.Type;
+import java.util.Map;
+
+/**
+ *
+ */
+public final class MessageCodec {
+
+ private static final Type OPTIONS_TYPE = new TypeToken<Map<String, String>>() {}.getType();
+ private static final Gson GSON = new GsonBuilder()
+ .registerTypeAdapter(Message.class, new MessageAdapter())
+ .registerTypeAdapter(Command.class, new CommandAdapter())
+ .create();
+
+ /**
+ * Decodes a {@link Message} from the given byte array.
+ * @param bytes byte array to be decoded
+ * @return Message decoded or {@code null} if fails to decode.
+ */
+ public static Message decode(byte[] bytes) {
+ if (bytes == null) {
+ return null;
+ }
+ String content = new String(bytes, Charsets.UTF_8);
+ return GSON.fromJson(content, Message.class);
+ }
+
+ /**
+ * Encodes a {@link Message} into byte array. Revserse of {@link #decode(byte[])} method.
+ * @param message Message to be encoded
+ * @return byte array representing the encoded message.
+ */
+ public static byte[] encode(Message message) {
+ return GSON.toJson(message, Message.class).getBytes(Charsets.UTF_8);
+ }
+
+ /**
+ * Gson codec for {@link Message} object.
+ */
+ private static final class MessageAdapter implements JsonSerializer<Message>, JsonDeserializer<Message> {
+
+ @Override
+ public Message deserialize(JsonElement json, Type typeOfT,
+ JsonDeserializationContext context) throws JsonParseException {
+ JsonObject jsonObj = json.getAsJsonObject();
+
+ Message.Type type = Message.Type.valueOf(jsonObj.get("type").getAsString());
+ Message.Scope scope = Message.Scope.valueOf(jsonObj.get("scope").getAsString());
+ JsonElement name = jsonObj.get("runnableName");
+ String runnableName = (name == null || name.isJsonNull()) ? null : name.getAsString();
+ Command command = context.deserialize(jsonObj.get("command"), Command.class);
+
+ return new SimpleMessage(type, scope, runnableName, command);
+ }
+
+ @Override
+ public JsonElement serialize(Message message, Type typeOfSrc, JsonSerializationContext context) {
+ JsonObject jsonObj = new JsonObject();
+ jsonObj.addProperty("type", message.getType().name());
+ jsonObj.addProperty("scope", message.getScope().name());
+ jsonObj.addProperty("runnableName", message.getRunnableName());
+ jsonObj.add("command", context.serialize(message.getCommand(), Command.class));
+
+ return jsonObj;
+ }
+ }
+
+ /**
+ * Gson codec for {@link Command} object.
+ */
+ private static final class CommandAdapter implements JsonSerializer<Command>, JsonDeserializer<Command> {
+
+ @Override
+ public Command deserialize(JsonElement json, Type typeOfT,
+ JsonDeserializationContext context) throws JsonParseException {
+ JsonObject jsonObj = json.getAsJsonObject();
+ return Command.Builder.of(jsonObj.get("command").getAsString())
+ .addOptions(context.<Map<String, String>>deserialize(jsonObj.get("options"), OPTIONS_TYPE))
+ .build();
+ }
+
+ @Override
+ public JsonElement serialize(Command command, Type typeOfSrc, JsonSerializationContext context) {
+ JsonObject jsonObj = new JsonObject();
+ jsonObj.addProperty("command", command.getCommand());
+ jsonObj.add("options", context.serialize(command.getOptions(), OPTIONS_TYPE));
+ return jsonObj;
+ }
+ }
+
+ private MessageCodec() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/state/Messages.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/state/Messages.java b/twill-core/src/main/java/org/apache/twill/internal/state/Messages.java
new file mode 100644
index 0000000..9783d62
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/state/Messages.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.state;
+
+import org.apache.twill.api.Command;
+
+/**
+ * Factory class for creating instances of {@link Message}.
+ */
+public final class Messages {
+
+ /**
+ * Creates a {@link Message.Type#USER} type {@link Message} that sends the giving {@link Command} to a
+ * particular runnable.
+ *
+ * @param runnableName Name of the runnable.
+ * @param command The user command to send.
+ * @return A new instance of {@link Message}.
+ */
+ public static Message createForRunnable(String runnableName, Command command) {
+ return new SimpleMessage(Message.Type.USER, Message.Scope.RUNNABLE, runnableName, command);
+ }
+
+ /**
+ * Creates a {@link Message.Type#USER} type {@link Message} that sends the giving {@link Command} to all
+ * runnables.
+ *
+ * @param command The user command to send.
+ * @return A new instance of {@link Message}.
+ */
+ public static Message createForAll(Command command) {
+ return new SimpleMessage(Message.Type.USER, Message.Scope.ALL_RUNNABLE, null, command);
+ }
+
+ private Messages() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java b/twill-core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java
new file mode 100644
index 0000000..e146e56
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.state;
+
+import org.apache.twill.api.Command;
+import com.google.common.base.Objects;
+
+/**
+ *
+ */
+final class SimpleMessage implements Message {
+
+ private final Type type;
+ private final Scope scope;
+ private final String runnableName;
+ private final Command command;
+
+ SimpleMessage(Type type, Scope scope, String runnableName, Command command) {
+ this.type = type;
+ this.scope = scope;
+ this.runnableName = runnableName;
+ this.command = command;
+ }
+
+ @Override
+ public Type getType() {
+ return type;
+ }
+
+ @Override
+ public Scope getScope() {
+ return scope;
+ }
+
+ @Override
+ public String getRunnableName() {
+ return runnableName;
+ }
+
+ @Override
+ public Command getCommand() {
+ return command;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(Message.class)
+ .add("type", type)
+ .add("scope", scope)
+ .add("runnable", runnableName)
+ .add("command", command)
+ .toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(type, scope, runnableName, command);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof Message)) {
+ return false;
+ }
+ Message other = (Message) obj;
+ return type == other.getType()
+ && scope == other.getScope()
+ && Objects.equal(runnableName, other.getRunnableName())
+ && Objects.equal(command, other.getCommand());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/state/StateNode.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/state/StateNode.java b/twill-core/src/main/java/org/apache/twill/internal/state/StateNode.java
new file mode 100644
index 0000000..d66f8a2
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/state/StateNode.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.state;
+
+import org.apache.twill.api.ServiceController;
+import com.google.common.util.concurrent.Service;
+
+/**
+ *
+ */
+public final class StateNode {
+
+ private final ServiceController.State state;
+ private final String errorMessage;
+ private final StackTraceElement[] stackTraces;
+
+ /**
+ * Constructs a StateNode with the given state.
+ */
+ public StateNode(ServiceController.State state) {
+ this(state, null, null);
+ }
+
+ /**
+ * Constructs a StateNode with {@link ServiceController.State#FAILED} caused by the given error.
+ */
+ public StateNode(Throwable error) {
+ this(Service.State.FAILED, error.getMessage(), error.getStackTrace());
+ }
+
+ /**
+ * Constructs a StateNode with the given state, error and stacktraces.
+ * This constructor should only be used by the StateNodeCodec.
+ */
+ public StateNode(ServiceController.State state, String errorMessage, StackTraceElement[] stackTraces) {
+ this.state = state;
+ this.errorMessage = errorMessage;
+ this.stackTraces = stackTraces;
+ }
+
+ public ServiceController.State getState() {
+ return state;
+ }
+
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+
+ public StackTraceElement[] getStackTraces() {
+ return stackTraces;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder("state=").append(state);
+
+ if (errorMessage != null) {
+ builder.append("\n").append("error=").append(errorMessage);
+ }
+ if (stackTraces != null) {
+ builder.append("\n");
+ for (StackTraceElement stackTrace : stackTraces) {
+ builder.append("\tat ").append(stackTrace.toString()).append("\n");
+ }
+ }
+ return builder.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java b/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
new file mode 100644
index 0000000..9877121
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.state;
+
+import org.apache.twill.api.Command;
+import com.google.common.base.Preconditions;
+
+/**
+ * Collection of predefined system messages.
+ */
+public final class SystemMessages {
+
+ public static final Command STOP_COMMAND = Command.Builder.of("stop").build();
+ public static final Message SECURE_STORE_UPDATED = new SimpleMessage(
+ Message.Type.SYSTEM, Message.Scope.APPLICATION, null, Command.Builder.of("secureStoreUpdated").build());
+
+ public static Message stopApplication() {
+ return new SimpleMessage(Message.Type.SYSTEM, Message.Scope.APPLICATION, null, STOP_COMMAND);
+ }
+
+ public static Message stopRunnable(String runnableName) {
+ return new SimpleMessage(Message.Type.SYSTEM, Message.Scope.RUNNABLE, runnableName, STOP_COMMAND);
+ }
+
+ public static Message setInstances(String runnableName, int instances) {
+ Preconditions.checkArgument(instances > 0, "Instances should be > 0.");
+ return new SimpleMessage(Message.Type.SYSTEM, Message.Scope.RUNNABLE, runnableName,
+ Command.Builder.of("instances").addOption("count", Integer.toString(instances)).build());
+ }
+
+ private SystemMessages() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/utils/Dependencies.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/utils/Dependencies.java b/twill-core/src/main/java/org/apache/twill/internal/utils/Dependencies.java
new file mode 100644
index 0000000..015b9f5
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/utils/Dependencies.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.utils;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.io.ByteStreams;
+import org.objectweb.asm.AnnotationVisitor;
+import org.objectweb.asm.ClassReader;
+import org.objectweb.asm.ClassVisitor;
+import org.objectweb.asm.FieldVisitor;
+import org.objectweb.asm.Label;
+import org.objectweb.asm.MethodVisitor;
+import org.objectweb.asm.Opcodes;
+import org.objectweb.asm.Type;
+import org.objectweb.asm.signature.SignatureReader;
+import org.objectweb.asm.signature.SignatureVisitor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URL;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * Utility class to help find out class dependencies.
+ */
+public final class Dependencies {
+
+ /**
+ * Represents a callback for accepting a class during dependency traversal.
+ */
+ public interface ClassAcceptor {
+ /**
+ * Invoked when a class is being found as a dependency.
+ *
+ * @param className Name of the class.
+ * @param classUrl URL for the class resource.
+ * @param classPathUrl URL for the class path resource that contains the class resource.
+ * If the URL protocol is {@code file}, it would be the path to root package.
+ * If the URL protocol is {@code jar}, it would be the jar file.
+ * @return true keep finding dependencies on the given class.
+ */
+ boolean accept(String className, URL classUrl, URL classPathUrl);
+ }
+
+ public static void findClassDependencies(ClassLoader classLoader,
+ ClassAcceptor acceptor,
+ String...classesToResolve) throws IOException {
+ findClassDependencies(classLoader, acceptor, ImmutableList.copyOf(classesToResolve));
+ }
+
+ /**
+ * Finds the class dependencies of the given class.
+ * @param classLoader ClassLoader for finding class bytecode.
+ * @param acceptor Predicate to accept a found class and its bytecode.
+ * @param classesToResolve Classes for looking for dependencies.
+ * @throws IOException Thrown where there is error when loading in class bytecode.
+ */
+ public static void findClassDependencies(ClassLoader classLoader,
+ ClassAcceptor acceptor,
+ Iterable<String> classesToResolve) throws IOException {
+
+ final Set<String> seenClasses = Sets.newHashSet(classesToResolve);
+ final Queue<String> classes = Lists.newLinkedList(classesToResolve);
+
+ // Breadth-first-search classes dependencies.
+ while (!classes.isEmpty()) {
+ String className = classes.remove();
+ URL classUrl = getClassURL(className, classLoader);
+ if (classUrl == null) {
+ continue;
+ }
+
+ // Call the accept to see if it accept the current class.
+ if (!acceptor.accept(className, classUrl, getClassPathURL(className, classUrl))) {
+ continue;
+ }
+
+ InputStream is = classUrl.openStream();
+ try {
+ // Visit the bytecode to lookup classes that the visiting class is depended on.
+ new ClassReader(ByteStreams.toByteArray(is)).accept(new DependencyClassVisitor(new DependencyAcceptor() {
+ @Override
+ public void accept(String className) {
+ // See if the class is accepted
+ if (seenClasses.add(className)) {
+ classes.add(className);
+ }
+ }
+ }), ClassReader.SKIP_DEBUG + ClassReader.SKIP_FRAMES);
+ } finally {
+ is.close();
+ }
+ }
+ }
+
+ /**
+ * Returns the URL for loading the class bytecode of the given class, or null if it is not found or if it is
+ * a system class.
+ */
+ private static URL getClassURL(String className, ClassLoader classLoader) {
+ String resourceName = className.replace('.', '/') + ".class";
+ return classLoader.getResource(resourceName);
+ }
+
+ private static URL getClassPathURL(String className, URL classUrl) {
+ try {
+ if ("file".equals(classUrl.getProtocol())) {
+ String path = classUrl.getFile();
+ // Compute the directory container the class.
+ int endIdx = path.length() - className.length() - ".class".length();
+ if (endIdx > 1) {
+ // If it is not the root directory, return the end index to remove the trailing '/'.
+ endIdx--;
+ }
+ return new URL("file", "", -1, path.substring(0, endIdx));
+ }
+ if ("jar".equals(classUrl.getProtocol())) {
+ String path = classUrl.getFile();
+ return URI.create(path.substring(0, path.indexOf("!/"))).toURL();
+ }
+ } catch (MalformedURLException e) {
+ throw Throwables.propagate(e);
+ }
+ throw new IllegalStateException("Unsupported class URL: " + classUrl);
+ }
+
+ /**
+ * A private interface for accepting a dependent class that is found during bytecode inspection.
+ */
+ private interface DependencyAcceptor {
+ void accept(String className);
+ }
+
+ /**
+ * ASM ClassVisitor for extracting classes dependencies.
+ */
+ private static final class DependencyClassVisitor extends ClassVisitor {
+
+ private final SignatureVisitor signatureVisitor;
+ private final DependencyAcceptor acceptor;
+
+ public DependencyClassVisitor(DependencyAcceptor acceptor) {
+ super(Opcodes.ASM4);
+ this.acceptor = acceptor;
+ this.signatureVisitor = new SignatureVisitor(Opcodes.ASM4) {
+ private String currentClass;
+
+ @Override
+ public void visitClassType(String name) {
+ currentClass = name;
+ addClass(name);
+ }
+
+ @Override
+ public void visitInnerClassType(String name) {
+ addClass(currentClass + "$" + name);
+ }
+ };
+ }
+
+ @Override
+ public void visit(int version, int access, String name, String signature, String superName, String[] interfaces) {
+ addClass(name);
+
+ if (signature != null) {
+ new SignatureReader(signature).accept(signatureVisitor);
+ } else {
+ addClass(superName);
+ addClasses(interfaces);
+ }
+ }
+
+ @Override
+ public void visitOuterClass(String owner, String name, String desc) {
+ addClass(owner);
+ }
+
+ @Override
+ public AnnotationVisitor visitAnnotation(String desc, boolean visible) {
+ addType(Type.getType(desc));
+ return null;
+ }
+
+ @Override
+ public void visitInnerClass(String name, String outerName, String innerName, int access) {
+ addClass(name);
+ }
+
+ @Override
+ public FieldVisitor visitField(int access, String name, String desc, String signature, Object value) {
+ if (signature != null) {
+ new SignatureReader(signature).acceptType(signatureVisitor);
+ } else {
+ addType(Type.getType(desc));
+ }
+
+ return new FieldVisitor(Opcodes.ASM4) {
+ @Override
+ public AnnotationVisitor visitAnnotation(String desc, boolean visible) {
+ addType(Type.getType(desc));
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public MethodVisitor visitMethod(int access, String name, String desc, String signature, String[] exceptions) {
+ if (signature != null) {
+ new SignatureReader(signature).accept(signatureVisitor);
+ } else {
+ addMethod(desc);
+ }
+ addClasses(exceptions);
+
+ return new MethodVisitor(Opcodes.ASM4) {
+ @Override
+ public AnnotationVisitor visitAnnotation(String desc, boolean visible) {
+ addType(Type.getType(desc));
+ return null;
+ }
+
+ @Override
+ public AnnotationVisitor visitParameterAnnotation(int parameter, String desc, boolean visible) {
+ addType(Type.getType(desc));
+ return null;
+ }
+
+ @Override
+ public void visitTypeInsn(int opcode, String type) {
+ addType(Type.getObjectType(type));
+ }
+
+ @Override
+ public void visitFieldInsn(int opcode, String owner, String name, String desc) {
+ addType(Type.getObjectType(owner));
+ addType(Type.getType(desc));
+ }
+
+ @Override
+ public void visitMethodInsn(int opcode, String owner, String name, String desc) {
+ addType(Type.getObjectType(owner));
+ addMethod(desc);
+ }
+
+ @Override
+ public void visitLdcInsn(Object cst) {
+ if (cst instanceof Type) {
+ addType((Type) cst);
+ }
+ }
+
+ @Override
+ public void visitMultiANewArrayInsn(String desc, int dims) {
+ addType(Type.getType(desc));
+ }
+
+ @Override
+ public void visitLocalVariable(String name, String desc, String signature, Label start, Label end, int index) {
+ if (signature != null) {
+ new SignatureReader(signature).acceptType(signatureVisitor);
+ } else {
+ addType(Type.getType(desc));
+ }
+ }
+ };
+ }
+
+ private void addClass(String internalName) {
+ if (internalName == null || internalName.startsWith("java/")) {
+ return;
+ }
+ acceptor.accept(Type.getObjectType(internalName).getClassName());
+ }
+
+ private void addClasses(String[] classes) {
+ if (classes != null) {
+ for (String clz : classes) {
+ addClass(clz);
+ }
+ }
+ }
+
+ private void addType(Type type) {
+ if (type.getSort() == Type.ARRAY) {
+ type = type.getElementType();
+ }
+ if (type.getSort() == Type.OBJECT) {
+ addClass(type.getInternalName());
+ }
+ }
+
+ private void addMethod(String desc) {
+ addType(Type.getReturnType(desc));
+ for (Type type : Type.getArgumentTypes(desc)) {
+ addType(type);
+ }
+ }
+ }
+
+ private Dependencies() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/utils/Instances.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/utils/Instances.java b/twill-core/src/main/java/org/apache/twill/internal/utils/Instances.java
new file mode 100644
index 0000000..28bfce9
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/utils/Instances.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.utils;
+
+import com.google.common.base.Defaults;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.reflect.TypeToken;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+
+/**
+ * Utility class to help instantiate object instance from class.
+ */
+public final class Instances {
+
+ private static final Object UNSAFE;
+ private static final Method UNSAFE_NEW_INSTANCE;
+
+ static {
+ Object unsafe;
+ Method newInstance;
+ try {
+ Class<?> clz = Class.forName("sun.misc.Unsafe");
+ Field f = clz.getDeclaredField("theUnsafe");
+ f.setAccessible(true);
+ unsafe = f.get(null);
+
+ newInstance = clz.getMethod("allocateInstance", Class.class);
+ } catch (Exception e) {
+ unsafe = null;
+ newInstance = null;
+ }
+ UNSAFE = unsafe;
+ UNSAFE_NEW_INSTANCE = newInstance;
+ }
+
+ /**
+ * Creates a new instance of the given class. It will use the default constructor if it is presents.
+ * Otherwise it will try to use {@link sun.misc.Unsafe#allocateInstance(Class)} to create the instance.
+ * @param clz Class of object to be instantiated.
+ * @param <T> Type of the class
+ * @return An instance of type {@code <T>}
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> T newInstance(Class<T> clz) {
+ try {
+ try {
+ Constructor<T> cons = clz.getDeclaredConstructor();
+ if (!cons.isAccessible()) {
+ cons.setAccessible(true);
+ }
+ return cons.newInstance();
+ } catch (Exception e) {
+ // Try to use Unsafe
+ Preconditions.checkState(UNSAFE != null, "Fail to instantiate with Unsafe.");
+ return unsafeCreate(clz);
+ }
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+
+ /**
+ * Creates an instance of the given using Unsafe. It also initialize all fields into default values.
+ */
+ private static <T> T unsafeCreate(Class<T> clz) throws InvocationTargetException, IllegalAccessException {
+ T instance = (T) UNSAFE_NEW_INSTANCE.invoke(UNSAFE, clz);
+
+ for (TypeToken<?> type : TypeToken.of(clz).getTypes().classes()) {
+ if (Object.class.equals(type.getRawType())) {
+ break;
+ }
+ for (Field field : type.getRawType().getDeclaredFields()) {
+ if (Modifier.isStatic(field.getModifiers())) {
+ continue;
+ }
+ if (!field.isAccessible()) {
+ field.setAccessible(true);
+ }
+ field.set(instance, Defaults.defaultValue(field.getType()));
+ }
+ }
+
+ return instance;
+ }
+
+
+ private Instances() {
+ // Protect instantiation of this class
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/utils/Networks.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/utils/Networks.java b/twill-core/src/main/java/org/apache/twill/internal/utils/Networks.java
new file mode 100644
index 0000000..8e7d736
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/utils/Networks.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.utils;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+/**
+ *
+ */
+public final class Networks {
+
+ /**
+ * Find a random free port in localhost for binding.
+ * @return A port number or -1 for failure.
+ */
+ public static int getRandomPort() {
+ try {
+ ServerSocket socket = new ServerSocket(0);
+ try {
+ return socket.getLocalPort();
+ } finally {
+ socket.close();
+ }
+ } catch (IOException e) {
+ return -1;
+ }
+ }
+
+ private Networks() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/utils/Paths.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/utils/Paths.java b/twill-core/src/main/java/org/apache/twill/internal/utils/Paths.java
new file mode 100644
index 0000000..aeee09f
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/utils/Paths.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.utils;
+
+import com.google.common.io.Files;
+
+/**
+ *
+ */
+public final class Paths {
+
+
+ public static String appendSuffix(String extractFrom, String appendTo) {
+ String suffix = getExtension(extractFrom);
+ if (!suffix.isEmpty()) {
+ return appendTo + '.' + suffix;
+ }
+ return appendTo;
+ }
+
+ public static String getExtension(String path) {
+ if (path.endsWith(".tar.gz")) {
+ return "tar.gz";
+ }
+
+ return Files.getFileExtension(path);
+ }
+
+ private Paths() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/kafka/client/FetchException.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/FetchException.java b/twill-core/src/main/java/org/apache/twill/kafka/client/FetchException.java
new file mode 100644
index 0000000..acccf04
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/kafka/client/FetchException.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.kafka.client;
+
+/**
+ *
+ */
+public final class FetchException extends RuntimeException {
+
+ private final ErrorCode errorCode;
+
+ public FetchException(String message, ErrorCode errorCode) {
+ super(message);
+ this.errorCode = errorCode;
+ }
+
+ public ErrorCode getErrorCode() {
+ return errorCode;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s. Error code: %s", super.toString(), errorCode);
+ }
+
+ public enum ErrorCode {
+ UNKNOWN(-1),
+ OK(0),
+ OFFSET_OUT_OF_RANGE(1),
+ INVALID_MESSAGE(2),
+ WRONG_PARTITION(3),
+ INVALID_FETCH_SIZE(4);
+
+ private final int code;
+
+ ErrorCode(int code) {
+ this.code = code;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public static ErrorCode fromCode(int code) {
+ switch (code) {
+ case -1:
+ return UNKNOWN;
+ case 0:
+ return OK;
+ case 1:
+ return OFFSET_OUT_OF_RANGE;
+ case 2:
+ return INVALID_MESSAGE;
+ case 3:
+ return WRONG_PARTITION;
+ case 4:
+ return INVALID_FETCH_SIZE;
+ }
+ throw new IllegalArgumentException("Unknown error code");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/kafka/client/FetchedMessage.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/FetchedMessage.java b/twill-core/src/main/java/org/apache/twill/kafka/client/FetchedMessage.java
new file mode 100644
index 0000000..65e140f
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/kafka/client/FetchedMessage.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.kafka.client;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Represents a message fetched from kafka broker.
+ */
+public interface FetchedMessage {
+
+ /**
+ * Returns the message offset.
+ */
+ long getOffset();
+
+ /**
+ * Returns the message payload.
+ */
+ ByteBuffer getBuffer();
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaClient.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaClient.java b/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaClient.java
new file mode 100644
index 0000000..496195b
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaClient.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.kafka.client;
+
+import org.apache.twill.internal.kafka.client.Compression;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
+
+import java.util.Iterator;
+
+/**
+ * This interface provides methods for interacting with kafka broker. It also
+ * extends from {@link Service} for lifecycle management. The {@link #start()} method
+ * must be called prior to other methods in this class. When instance of this class
+ * is not needed, call {@link #stop()}} to release any resources that it holds.
+ */
+public interface KafkaClient extends Service {
+
+ PreparePublish preparePublish(String topic, Compression compression);
+
+ Iterator<FetchedMessage> consume(String topic, int partition, long offset, int maxSize);
+
+ /**
+ * Fetches offset from the given topic and partition.
+ * @param topic Topic to fetch from.
+ * @param partition Partition to fetch from.
+ * @param time The first offset of every segment file for a given partition with a modified time less than time.
+ * {@code -1} for latest offset, {@code -2} for earliest offset.
+ * @param maxOffsets Maximum number of offsets to fetch.
+ * @return A Future that carry the result as an array of offsets in descending order.
+ * The size of the result array would not be larger than maxOffsets. If there is any error during the fetch,
+ * the exception will be carried in the exception.
+ */
+ ListenableFuture<long[]> getOffset(String topic, int partition, long time, int maxOffsets);
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/kafka/client/PreparePublish.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/PreparePublish.java b/twill-core/src/main/java/org/apache/twill/kafka/client/PreparePublish.java
new file mode 100644
index 0000000..5db4abb
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/kafka/client/PreparePublish.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.kafka.client;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This interface is for preparing to publish a set of messages to kafka.
+ */
+public interface PreparePublish {
+
+ PreparePublish add(byte[] payload, Object partitionKey);
+
+ PreparePublish add(ByteBuffer payload, Object partitionKey);
+
+ ListenableFuture<?> publish();
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/kafka/client/package-info.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/package-info.java b/twill-core/src/main/java/org/apache/twill/kafka/client/package-info.java
new file mode 100644
index 0000000..ea3bf20
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/kafka/client/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package provides a pure java Kafka client interface.
+ */
+package org.apache.twill.kafka.client;
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java b/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java
new file mode 100644
index 0000000..2c8c1ef
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.launcher;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.jar.JarEntry;
+import java.util.jar.JarInputStream;
+
+/**
+ * A launcher for application from a archive jar.
+ * This class should have no dependencies on any library except the J2SE one.
+ * This class should not import any thing except java.*
+ */
+public final class TwillLauncher {
+
+ private static final int TEMP_DIR_ATTEMPTS = 20;
+
+ /**
+ * Main method to unpackage a jar and run the mainClass.main() method.
+ * @param args args[0] is the path to jar file, args[1] is the class name of the mainClass.
+ * The rest of args will be passed the mainClass unmodified.
+ */
+ public static void main(String[] args) throws Exception {
+ if (args.length < 3) {
+ System.out.println("Usage: java " + TwillLauncher.class.getName() + " [jarFile] [mainClass] [use_classpath]");
+ return;
+ }
+
+ File file = new File(args[0]);
+ final File targetDir = createTempDir("twill.launcher");
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ System.out.println("Cleanup directory " + targetDir);
+ deleteDir(targetDir);
+ }
+ });
+
+ System.out.println("UnJar " + file + " to " + targetDir);
+ unJar(file, targetDir);
+
+ // Create ClassLoader
+ URLClassLoader classLoader = createClassLoader(targetDir, Boolean.parseBoolean(args[2]));
+ Thread.currentThread().setContextClassLoader(classLoader);
+
+ System.out.println("Launch class with classpath: " + Arrays.toString(classLoader.getURLs()));
+
+ Class<?> mainClass = classLoader.loadClass(args[1]);
+ Method mainMethod = mainClass.getMethod("main", String[].class);
+ String[] arguments = Arrays.copyOfRange(args, 3, args.length);
+ System.out.println("Launching main: " + mainMethod + " " + Arrays.toString(arguments));
+ mainMethod.invoke(mainClass, new Object[]{arguments});
+ System.out.println("Main class completed.");
+
+ System.out.println("Launcher completed");
+ }
+
+ /**
+ * This method is copied from Guava Files.createTempDir().
+ */
+ private static File createTempDir(String prefix) throws IOException {
+ File baseDir = new File(System.getProperty("java.io.tmpdir"));
+ if (!baseDir.isDirectory() && !baseDir.mkdirs()) {
+ throw new IOException("Tmp directory not exists: " + baseDir.getAbsolutePath());
+ }
+
+ String baseName = prefix + "-" + System.currentTimeMillis() + "-";
+
+ for (int counter = 0; counter < TEMP_DIR_ATTEMPTS; counter++) {
+ File tempDir = new File(baseDir, baseName + counter);
+ if (tempDir.mkdir()) {
+ return tempDir;
+ }
+ }
+ throw new IOException("Failed to create directory within "
+ + TEMP_DIR_ATTEMPTS + " attempts (tried "
+ + baseName + "0 to " + baseName + (TEMP_DIR_ATTEMPTS - 1) + ')');
+ }
+
+ private static void unJar(File jarFile, File targetDir) throws IOException {
+ JarInputStream jarInput = new JarInputStream(new FileInputStream(jarFile));
+ try {
+ JarEntry jarEntry = jarInput.getNextJarEntry();
+ while (jarEntry != null) {
+ File target = new File(targetDir, jarEntry.getName());
+ if (jarEntry.isDirectory()) {
+ target.mkdirs();
+ } else {
+ target.getParentFile().mkdirs();
+ copy(jarInput, target);
+ }
+ jarEntry = jarInput.getNextJarEntry();
+ }
+ } finally {
+ jarInput.close();
+ }
+ }
+
+ private static void copy(InputStream is, File file) throws IOException {
+ byte[] buf = new byte[8192];
+ OutputStream os = new BufferedOutputStream(new FileOutputStream(file));
+ try {
+ int len = is.read(buf);
+ while (len != -1) {
+ os.write(buf, 0, len);
+ len = is.read(buf);
+ }
+ } finally {
+ os.close();
+ }
+ }
+
+ private static URLClassLoader createClassLoader(File dir, boolean useClassPath) {
+ try {
+ List<URL> urls = new ArrayList<URL>();
+ urls.add(dir.toURI().toURL());
+ urls.add(new File(dir, "classes").toURI().toURL());
+ urls.add(new File(dir, "resources").toURI().toURL());
+
+ File libDir = new File(dir, "lib");
+ File[] files = libDir.listFiles();
+ if (files != null) {
+ for (File file : files) {
+ if (file.getName().endsWith(".jar")) {
+ urls.add(file.toURI().toURL());
+ }
+ }
+ }
+
+ if (useClassPath) {
+ InputStream is = ClassLoader.getSystemResourceAsStream("classpath");
+ if (is != null) {
+ try {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(is, Charset.forName("UTF-8")));
+ String line = reader.readLine();
+ if (line != null) {
+ for (String path : line.split(":")) {
+ urls.addAll(getClassPaths(path));
+ }
+ }
+ } finally {
+ is.close();
+ }
+ }
+ }
+
+ return new URLClassLoader(urls.toArray(new URL[0]));
+
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private static Collection<URL> getClassPaths(String path) throws MalformedURLException {
+ String classpath = expand(path);
+ if (classpath.endsWith("/*")) {
+ // Grab all .jar files
+ File dir = new File(classpath.substring(0, classpath.length() - 2));
+ File[] files = dir.listFiles();
+ if (files == null || files.length == 0) {
+ return singleItem(dir.toURI().toURL());
+ }
+
+ List<URL> result = new ArrayList<URL>(files.length);
+ for (File file : files) {
+ if (file.getName().endsWith(".jar")) {
+ result.add(file.toURI().toURL());
+ }
+ }
+ return result;
+ } else {
+ return singleItem(new File(classpath).toURI().toURL());
+ }
+ }
+
+ private static Collection<URL> singleItem(URL url) {
+ List<URL> result = new ArrayList<URL>(1);
+ result.add(url);
+ return result;
+ }
+
+ private static String expand(String value) {
+ String result = value;
+ for (Map.Entry<String, String> entry : System.getenv().entrySet()) {
+ result = result.replace("$" + entry.getKey(), entry.getValue());
+ result = result.replace("${" + entry.getKey() + "}", entry.getValue());
+ }
+ return result;
+ }
+
+ private static void deleteDir(File dir) {
+ File[] files = dir.listFiles();
+ if (files == null || files.length == 0) {
+ dir.delete();
+ return;
+ }
+ for (File file : files) {
+ deleteDir(file);
+ }
+ dir.delete();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/resources/kafka-0.7.2.tgz
----------------------------------------------------------------------
diff --git a/twill-core/src/main/resources/kafka-0.7.2.tgz b/twill-core/src/main/resources/kafka-0.7.2.tgz
new file mode 100644
index 0000000..24178d9
Binary files /dev/null and b/twill-core/src/main/resources/kafka-0.7.2.tgz differ
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java b/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java
new file mode 100644
index 0000000..382dc95
--- /dev/null
+++ b/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.api.Command;
+import org.apache.twill.api.ResourceReport;
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.ServiceController;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.logging.LogHandler;
+import org.apache.twill.common.ServiceListenerAdapter;
+import org.apache.twill.common.Threads;
+import org.apache.twill.internal.state.StateNode;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.ZKClient;
+import org.apache.twill.zookeeper.ZKClientService;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Service;
+import com.google.gson.JsonObject;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ *
+ */
+public class ControllerTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ControllerTest.class);
+
+ @Test
+ public void testController() throws ExecutionException, InterruptedException, TimeoutException {
+ InMemoryZKServer zkServer = InMemoryZKServer.builder().build();
+ zkServer.startAndWait();
+
+ LOG.info("ZKServer: " + zkServer.getConnectionStr());
+
+ try {
+ RunId runId = RunIds.generate();
+ ZKClientService zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+ zkClientService.startAndWait();
+
+ Service service = createService(zkClientService, runId);
+ service.startAndWait();
+
+ TwillController controller = getController(zkClientService, runId);
+ controller.sendCommand(Command.Builder.of("test").build()).get(2, TimeUnit.SECONDS);
+ controller.stop().get(2, TimeUnit.SECONDS);
+
+ Assert.assertEquals(ServiceController.State.TERMINATED, controller.state());
+
+ final CountDownLatch terminateLatch = new CountDownLatch(1);
+ service.addListener(new ServiceListenerAdapter() {
+ @Override
+ public void terminated(Service.State from) {
+ terminateLatch.countDown();
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+
+ Assert.assertTrue(service.state() == Service.State.TERMINATED || terminateLatch.await(2, TimeUnit.SECONDS));
+
+ zkClientService.stopAndWait();
+
+ } finally {
+ zkServer.stopAndWait();
+ }
+ }
+
+ // Test controller created before service starts.
+ @Test
+ public void testControllerBefore() throws InterruptedException {
+ InMemoryZKServer zkServer = InMemoryZKServer.builder().build();
+ zkServer.startAndWait();
+
+ LOG.info("ZKServer: " + zkServer.getConnectionStr());
+ try {
+ RunId runId = RunIds.generate();
+ ZKClientService zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+ zkClientService.startAndWait();
+
+ final CountDownLatch runLatch = new CountDownLatch(1);
+ final CountDownLatch stopLatch = new CountDownLatch(1);
+ TwillController controller = getController(zkClientService, runId);
+ controller.addListener(new ServiceListenerAdapter() {
+ @Override
+ public void running() {
+ runLatch.countDown();
+ }
+
+ @Override
+ public void terminated(Service.State from) {
+ stopLatch.countDown();
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+
+ Service service = createService(zkClientService, runId);
+ service.start();
+
+ Assert.assertTrue(runLatch.await(2, TimeUnit.SECONDS));
+ Assert.assertFalse(stopLatch.await(2, TimeUnit.SECONDS));
+
+ service.stop();
+
+ Assert.assertTrue(stopLatch.await(2, TimeUnit.SECONDS));
+
+ } finally {
+ zkServer.stopAndWait();
+ }
+ }
+
+ // Test controller listener receive first state change without state transition from service
+ @Test
+ public void testControllerListener() throws InterruptedException {
+ InMemoryZKServer zkServer = InMemoryZKServer.builder().build();
+ zkServer.startAndWait();
+
+ LOG.info("ZKServer: " + zkServer.getConnectionStr());
+ try {
+ RunId runId = RunIds.generate();
+ ZKClientService zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+ zkClientService.startAndWait();
+
+ Service service = createService(zkClientService, runId);
+ service.startAndWait();
+
+ final CountDownLatch runLatch = new CountDownLatch(1);
+ TwillController controller = getController(zkClientService, runId);
+ controller.addListener(new ServiceListenerAdapter() {
+ @Override
+ public void running() {
+ runLatch.countDown();
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+
+ Assert.assertTrue(runLatch.await(2, TimeUnit.SECONDS));
+
+ service.stopAndWait();
+
+ zkClientService.stopAndWait();
+ } finally {
+ zkServer.stopAndWait();
+ }
+ }
+
+ private Service createService(ZKClient zkClient, RunId runId) {
+ return new ZKServiceDecorator(
+ zkClient, runId, Suppliers.ofInstance(new JsonObject()), new AbstractIdleService() {
+
+ @Override
+ protected void startUp() throws Exception {
+ LOG.info("Start");
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ LOG.info("Stop");
+ }
+ });
+ }
+
+ private TwillController getController(ZKClient zkClient, RunId runId) {
+ TwillController controller = new AbstractTwillController(runId, zkClient, ImmutableList.<LogHandler>of()) {
+
+ @Override
+ public void kill() {
+ // No-op
+ }
+
+ @Override
+ protected void instanceNodeUpdated(NodeData nodeData) {
+ // No-op
+ }
+
+ @Override
+ protected void stateNodeUpdated(StateNode stateNode) {
+ // No-op
+ }
+
+ @Override
+ public ResourceReport getResourceReport() {
+ return null;
+ }
+ };
+ controller.startAndWait();
+ return controller;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/test/java/org/apache/twill/internal/state/MessageCodecTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/internal/state/MessageCodecTest.java b/twill-core/src/test/java/org/apache/twill/internal/state/MessageCodecTest.java
new file mode 100644
index 0000000..d267cf8
--- /dev/null
+++ b/twill-core/src/test/java/org/apache/twill/internal/state/MessageCodecTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.state;
+
+import org.apache.twill.api.Command;
+import com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+/**
+ *
+ */
+public class MessageCodecTest {
+
+ @Test
+ public void testCodec() {
+ Message message = MessageCodec.decode(MessageCodec.encode(new Message() {
+
+ @Override
+ public Type getType() {
+ return Type.SYSTEM;
+ }
+
+ @Override
+ public Scope getScope() {
+ return Scope.APPLICATION;
+ }
+
+ @Override
+ public String getRunnableName() {
+ return null;
+ }
+
+ @Override
+ public Command getCommand() {
+ return new Command() {
+ @Override
+ public String getCommand() {
+ return "stop";
+ }
+
+ @Override
+ public Map<String, String> getOptions() {
+ return ImmutableMap.of("timeout", "1", "timeoutUnit", "SECONDS");
+ }
+ };
+ }
+ }));
+
+ Assert.assertEquals(Message.Type.SYSTEM, message.getType());
+ Assert.assertEquals(Message.Scope.APPLICATION, message.getScope());
+ Assert.assertNull(message.getRunnableName());
+ Assert.assertEquals("stop", message.getCommand().getCommand());
+ Assert.assertEquals(ImmutableMap.of("timeout", "1", "timeoutUnit", "SECONDS"), message.getCommand().getOptions());
+ }
+
+ @Test
+ public void testFailureDecode() {
+ Assert.assertNull(MessageCodec.decode("".getBytes()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/test/java/org/apache/twill/internal/state/ZKServiceDecoratorTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/internal/state/ZKServiceDecoratorTest.java b/twill-core/src/test/java/org/apache/twill/internal/state/ZKServiceDecoratorTest.java
new file mode 100644
index 0000000..47d8562
--- /dev/null
+++ b/twill-core/src/test/java/org/apache/twill/internal/state/ZKServiceDecoratorTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.state;
+
+import org.apache.twill.api.RunId;
+import org.apache.twill.internal.RunIds;
+import org.apache.twill.internal.ZKServiceDecorator;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClients;
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Suppliers;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.Service;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ *
+ */
+public class ZKServiceDecoratorTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ZKServiceDecoratorTest.class);
+
+ @Test
+ public void testStateTransition() throws InterruptedException, ExecutionException, TimeoutException {
+ InMemoryZKServer zkServer = InMemoryZKServer.builder().build();
+ zkServer.startAndWait();
+
+ try {
+ final String namespace = Joiner.on('/').join("/twill", RunIds.generate(), "runnables", "Runner1");
+
+ final ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+ zkClient.startAndWait();
+ zkClient.create(namespace, null, CreateMode.PERSISTENT).get();
+
+ try {
+ JsonObject content = new JsonObject();
+ content.addProperty("containerId", "container-123");
+ content.addProperty("host", "localhost");
+
+ RunId runId = RunIds.generate();
+ final Semaphore semaphore = new Semaphore(0);
+ ZKServiceDecorator service = new ZKServiceDecorator(ZKClients.namespace(zkClient, namespace),
+ runId, Suppliers.ofInstance(content),
+ new AbstractIdleService() {
+ @Override
+ protected void startUp() throws Exception {
+ Preconditions.checkArgument(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Fail to start");
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ Preconditions.checkArgument(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Fail to stop");
+ }
+ });
+
+ final String runnablePath = namespace + "/" + runId.getId();
+ final AtomicReference<String> stateMatch = new AtomicReference<String>("STARTING");
+ watchDataChange(zkClient, runnablePath + "/state", semaphore, stateMatch);
+ Assert.assertEquals(Service.State.RUNNING, service.start().get(5, TimeUnit.SECONDS));
+
+ stateMatch.set("STOPPING");
+ Assert.assertEquals(Service.State.TERMINATED, service.stop().get(5, TimeUnit.SECONDS));
+
+ } finally {
+ zkClient.stopAndWait();
+ }
+ } finally {
+ zkServer.stopAndWait();
+ }
+ }
+
+ private void watchDataChange(final ZKClientService zkClient, final String path,
+ final Semaphore semaphore, final AtomicReference<String> stateMatch) {
+ Futures.addCallback(zkClient.getData(path, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getType() == Event.EventType.NodeDataChanged) {
+ watchDataChange(zkClient, path, semaphore, stateMatch);
+ }
+ }
+ }), new FutureCallback<NodeData>() {
+ @Override
+ public void onSuccess(NodeData result) {
+ String content = new String(result.getData(), Charsets.UTF_8);
+ JsonObject json = new Gson().fromJson(content, JsonElement.class).getAsJsonObject();
+ if (stateMatch.get().equals(json.get("state").getAsString())) {
+ semaphore.release();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ exists();
+ }
+
+ private void exists() {
+ Futures.addCallback(zkClient.exists(path, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getType() == Event.EventType.NodeCreated) {
+ watchDataChange(zkClient, path, semaphore, stateMatch);
+ }
+ }
+ }), new FutureCallback<Stat>() {
+ @Override
+ public void onSuccess(Stat result) {
+ if (result != null) {
+ watchDataChange(zkClient, path, semaphore, stateMatch);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error(t.getMessage(), t);
+ }
+ });
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java b/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java
new file mode 100644
index 0000000..508cadb
--- /dev/null
+++ b/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.utils;
+
+import org.apache.twill.filesystem.LocalLocationFactory;
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.internal.ApplicationBundler;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.List;
+import java.util.jar.JarEntry;
+import java.util.jar.JarInputStream;
+
+/**
+ *
+ */
+public class ApplicationBundlerTest {
+
+ @Rule
+ public TemporaryFolder tmpDir = new TemporaryFolder();
+
+ @Test
+ public void testFindDependencies() throws IOException, ClassNotFoundException {
+ Location location = new LocalLocationFactory(tmpDir.newFolder()).create("test.jar");
+
+ // Create a jar file with by tracing dependency
+ ApplicationBundler bundler = new ApplicationBundler(ImmutableList.<String>of());
+ bundler.createBundle(location, ApplicationBundler.class);
+
+ File targetDir = tmpDir.newFolder();
+ unjar(new File(location.toURI()), targetDir);
+
+ // Load the class back, it should be loaded by the custom classloader
+ ClassLoader classLoader = createClassLoader(targetDir);
+ Class<?> clz = classLoader.loadClass(ApplicationBundler.class.getName());
+ Assert.assertSame(classLoader, clz.getClassLoader());
+
+ // For system classes, they shouldn't be packaged, hence loaded by different classloader.
+ clz = classLoader.loadClass(Object.class.getName());
+ Assert.assertNotSame(classLoader, clz.getClassLoader());
+ }
+
+ private void unjar(File jarFile, File targetDir) throws IOException {
+ JarInputStream jarInput = new JarInputStream(new FileInputStream(jarFile));
+ try {
+ JarEntry jarEntry = jarInput.getNextJarEntry();
+ while (jarEntry != null) {
+ File target = new File(targetDir, jarEntry.getName());
+ if (jarEntry.isDirectory()) {
+ target.mkdirs();
+ } else {
+ target.getParentFile().mkdirs();
+ ByteStreams.copy(jarInput, Files.newOutputStreamSupplier(target));
+ }
+
+ jarEntry = jarInput.getNextJarEntry();
+ }
+ } finally {
+ jarInput.close();
+ }
+ }
+
+ private ClassLoader createClassLoader(File dir) throws MalformedURLException {
+ List<URL> urls = Lists.newArrayList();
+ urls.add(new File(dir, "classes").toURI().toURL());
+ File[] libFiles = new File(dir, "lib").listFiles();
+ if (libFiles != null) {
+ for (File file : libFiles) {
+ urls.add(file.toURI().toURL());
+ }
+ }
+ return new URLClassLoader(urls.toArray(new URL[0])) {
+ @Override
+ protected synchronized Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+ // Load class from the given URLs first before delegating to parent.
+ try {
+ return super.findClass(name);
+ } catch (ClassNotFoundException e) {
+ ClassLoader parent = getParent();
+ return parent == null ? ClassLoader.getSystemClassLoader().loadClass(name) : parent.loadClass(name);
+ }
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
new file mode 100644
index 0000000..40fc3ed
--- /dev/null
+++ b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.kafka.client;
+
+import org.apache.twill.common.Services;
+import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
+import org.apache.twill.internal.kafka.client.Compression;
+import org.apache.twill.internal.kafka.client.SimpleKafkaClient;
+import org.apache.twill.internal.utils.Networks;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.zookeeper.ZKClientService;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.Futures;
+import org.apache.commons.compress.archivers.ArchiveEntry;
+import org.apache.commons.compress.archivers.ArchiveException;
+import org.apache.commons.compress.archivers.ArchiveInputStream;
+import org.apache.commons.compress.archivers.ArchiveStreamFactory;
+import org.apache.commons.compress.compressors.CompressorException;
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class KafkaTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaTest.class);
+
+ @ClassRule
+ public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+
+ private static InMemoryZKServer zkServer;
+ private static EmbeddedKafkaServer kafkaServer;
+ private static ZKClientService zkClientService;
+ private static KafkaClient kafkaClient;
+
+ @BeforeClass
+ public static void init() throws Exception {
+ zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build();
+ zkServer.startAndWait();
+
+ // Extract the kafka.tgz and start the kafka server
+ kafkaServer = new EmbeddedKafkaServer(extractKafka(), generateKafkaConfig(zkServer.getConnectionStr()));
+ kafkaServer.startAndWait();
+
+ zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+
+ kafkaClient = new SimpleKafkaClient(zkClientService);
+ Services.chainStart(zkClientService, kafkaClient).get();
+ }
+
+ @AfterClass
+ public static void finish() throws Exception {
+ Services.chainStop(kafkaClient, zkClientService).get();
+ kafkaServer.stopAndWait();
+ zkServer.stopAndWait();
+ }
+
+ @Test
+ public void testKafkaClient() throws Exception {
+ String topic = "testClient";
+
+ Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10);
+ Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10);
+
+ t1.start();
+ t2.start();
+
+ Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10);
+ t2.join();
+ t3.start();
+
+ Iterator<FetchedMessage> consumer = kafkaClient.consume(topic, 0, 0, 1048576);
+ int count = 0;
+ long startTime = System.nanoTime();
+ while (count < 30 && consumer.hasNext() && secondsPassed(startTime, TimeUnit.NANOSECONDS) < 5) {
+ LOG.info(Charsets.UTF_8.decode(consumer.next().getBuffer()).toString());
+ count++;
+ }
+
+ Assert.assertEquals(30, count);
+ }
+
+ @Test (timeout = 10000)
+ public void testOffset() throws Exception {
+ String topic = "testOffset";
+
+ // Initial earliest offset should be 0.
+ long[] offsets = kafkaClient.getOffset(topic, 0, -2, 10).get();
+ Assert.assertArrayEquals(new long[]{0L}, offsets);
+
+ // Publish some messages
+ Thread publishThread = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing", 2000);
+ publishThread.start();
+ publishThread.join();
+
+ // Fetch earliest offset, should still be 0.
+ offsets = kafkaClient.getOffset(topic, 0, -2, 10).get();
+ Assert.assertArrayEquals(new long[]{0L}, offsets);
+
+ // Fetch latest offset
+ offsets = kafkaClient.getOffset(topic, 0, -1, 10).get();
+ Iterator<FetchedMessage> consumer = kafkaClient.consume(topic, 0, offsets[0], 1048576);
+
+ // Publish one more message, the consumer should see the new message being published.
+ publishThread = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing", 1, 3000);
+ publishThread.start();
+ publishThread.join();
+
+ // Should see the last message being published.
+ Assert.assertTrue(consumer.hasNext());
+ Assert.assertEquals("3000 Testing", Charsets.UTF_8.decode(consumer.next().getBuffer()).toString());
+ }
+
+ private Thread createPublishThread(final KafkaClient kafkaClient, final String topic,
+ final Compression compression, final String message, final int count) {
+ return createPublishThread(kafkaClient, topic, compression, message, count, 0);
+ }
+
+ private Thread createPublishThread(final KafkaClient kafkaClient, final String topic, final Compression compression,
+ final String message, final int count, final int base) {
+ return new Thread() {
+ public void run() {
+ PreparePublish preparePublish = kafkaClient.preparePublish(topic, compression);
+ for (int i = 0; i < count; i++) {
+ preparePublish.add(((base + i) + " " + message).getBytes(Charsets.UTF_8), 0);
+ }
+ Futures.getUnchecked(preparePublish.publish());
+ }
+ };
+ }
+
+ private long secondsPassed(long startTime, TimeUnit startUnit) {
+ return TimeUnit.SECONDS.convert(System.nanoTime() - TimeUnit.NANOSECONDS.convert(startTime, startUnit),
+ TimeUnit.NANOSECONDS);
+ }
+
+ private static File extractKafka() throws IOException, ArchiveException, CompressorException {
+ File kafkaExtract = TMP_FOLDER.newFolder();
+ InputStream kakfaResource = KafkaTest.class.getClassLoader().getResourceAsStream("kafka-0.7.2.tgz");
+ ArchiveInputStream archiveInput = new ArchiveStreamFactory()
+ .createArchiveInputStream(ArchiveStreamFactory.TAR,
+ new CompressorStreamFactory()
+ .createCompressorInputStream(CompressorStreamFactory.GZIP, kakfaResource));
+
+ try {
+ ArchiveEntry entry = archiveInput.getNextEntry();
+ while (entry != null) {
+ File file = new File(kafkaExtract, entry.getName());
+ if (entry.isDirectory()) {
+ file.mkdirs();
+ } else {
+ ByteStreams.copy(archiveInput, Files.newOutputStreamSupplier(file));
+ }
+ entry = archiveInput.getNextEntry();
+ }
+ } finally {
+ archiveInput.close();
+ }
+ return kafkaExtract;
+ }
+
+ private static Properties generateKafkaConfig(String zkConnectStr) throws IOException {
+ int port = Networks.getRandomPort();
+ Preconditions.checkState(port > 0, "Failed to get random port.");
+
+ Properties prop = new Properties();
+ prop.setProperty("log.dir", TMP_FOLDER.newFolder().getAbsolutePath());
+ prop.setProperty("zk.connect", zkConnectStr);
+ prop.setProperty("num.threads", "8");
+ prop.setProperty("port", Integer.toString(port));
+ prop.setProperty("log.flush.interval", "1000");
+ prop.setProperty("max.socket.request.bytes", "104857600");
+ prop.setProperty("log.cleanup.interval.mins", "1");
+ prop.setProperty("log.default.flush.scheduler.interval.ms", "1000");
+ prop.setProperty("zk.connectiontimeout.ms", "1000000");
+ prop.setProperty("socket.receive.buffer", "1048576");
+ prop.setProperty("enable.zookeeper", "true");
+ prop.setProperty("log.retention.hours", "24");
+ prop.setProperty("brokerid", "0");
+ prop.setProperty("socket.send.buffer", "1048576");
+ prop.setProperty("num.partitions", "1");
+ // Use a really small file size to force some flush to happen
+ prop.setProperty("log.file.size", "1024");
+ prop.setProperty("log.default.flush.interval.ms", "1000");
+ return prop;
+ }
+}