You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/12 22:59:58 UTC

[16/28] Making maven site works.

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/state/MessageCallback.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/state/MessageCallback.java b/twill-core/src/main/java/org/apache/twill/internal/state/MessageCallback.java
new file mode 100644
index 0000000..f94eaa3
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/state/MessageCallback.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.state;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ *
+ */
+public interface MessageCallback {
+
+  /**
+   * Called when a message is received.
+   * @param message Message being received.
+   * @return A {@link ListenableFuture} that would be completed when message processing is completed or failed.
+   *         The result of the future should be the input message Id if succeeded.
+   */
+  ListenableFuture<String> onReceived(String messageId, Message message);
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/state/MessageCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/state/MessageCodec.java b/twill-core/src/main/java/org/apache/twill/internal/state/MessageCodec.java
new file mode 100644
index 0000000..176f620
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/state/MessageCodec.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.state;
+
+import org.apache.twill.api.Command;
+import com.google.common.base.Charsets;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+import java.lang.reflect.Type;
+import java.util.Map;
+
+/**
+ *
+ */
+public final class MessageCodec {
+
+  private static final Type OPTIONS_TYPE = new TypeToken<Map<String, String>>() {}.getType();
+  private static final Gson GSON = new GsonBuilder()
+                                        .registerTypeAdapter(Message.class, new MessageAdapter())
+                                        .registerTypeAdapter(Command.class, new CommandAdapter())
+                                        .create();
+
+  /**
+   * Decodes a {@link Message} from the given byte array.
+   * @param bytes byte array to be decoded
+   * @return Message decoded or {@code null} if fails to decode.
+   */
+  public static Message decode(byte[] bytes) {
+    if (bytes == null) {
+      return null;
+    }
+    String content = new String(bytes, Charsets.UTF_8);
+    return GSON.fromJson(content, Message.class);
+  }
+
+  /**
+   * Encodes a {@link Message} into byte array. Revserse of {@link #decode(byte[])} method.
+   * @param message Message to be encoded
+   * @return byte array representing the encoded message.
+   */
+  public static byte[] encode(Message message) {
+    return GSON.toJson(message, Message.class).getBytes(Charsets.UTF_8);
+  }
+
+  /**
+   * Gson codec for {@link Message} object.
+   */
+  private static final class MessageAdapter implements JsonSerializer<Message>, JsonDeserializer<Message> {
+
+    @Override
+    public Message deserialize(JsonElement json, Type typeOfT,
+                               JsonDeserializationContext context) throws JsonParseException {
+      JsonObject jsonObj = json.getAsJsonObject();
+
+      Message.Type type = Message.Type.valueOf(jsonObj.get("type").getAsString());
+      Message.Scope scope = Message.Scope.valueOf(jsonObj.get("scope").getAsString());
+      JsonElement name = jsonObj.get("runnableName");
+      String runnableName = (name == null || name.isJsonNull()) ? null : name.getAsString();
+      Command command = context.deserialize(jsonObj.get("command"), Command.class);
+
+      return new SimpleMessage(type, scope, runnableName, command);
+    }
+
+    @Override
+    public JsonElement serialize(Message message, Type typeOfSrc, JsonSerializationContext context) {
+      JsonObject jsonObj = new JsonObject();
+      jsonObj.addProperty("type", message.getType().name());
+      jsonObj.addProperty("scope", message.getScope().name());
+      jsonObj.addProperty("runnableName", message.getRunnableName());
+      jsonObj.add("command", context.serialize(message.getCommand(), Command.class));
+
+      return jsonObj;
+    }
+  }
+
+  /**
+   * Gson codec for {@link Command} object.
+   */
+  private static final class CommandAdapter implements JsonSerializer<Command>, JsonDeserializer<Command> {
+
+    @Override
+    public Command deserialize(JsonElement json, Type typeOfT,
+                               JsonDeserializationContext context) throws JsonParseException {
+      JsonObject jsonObj = json.getAsJsonObject();
+      return Command.Builder.of(jsonObj.get("command").getAsString())
+                            .addOptions(context.<Map<String, String>>deserialize(jsonObj.get("options"), OPTIONS_TYPE))
+                            .build();
+    }
+
+    @Override
+    public JsonElement serialize(Command command, Type typeOfSrc, JsonSerializationContext context) {
+      JsonObject jsonObj = new JsonObject();
+      jsonObj.addProperty("command", command.getCommand());
+      jsonObj.add("options", context.serialize(command.getOptions(), OPTIONS_TYPE));
+      return jsonObj;
+    }
+  }
+
+  private MessageCodec() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/state/Messages.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/state/Messages.java b/twill-core/src/main/java/org/apache/twill/internal/state/Messages.java
new file mode 100644
index 0000000..9783d62
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/state/Messages.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.state;
+
+import org.apache.twill.api.Command;
+
+/**
+ * Factory class for creating instances of {@link Message}.
+ */
+public final class Messages {
+
+  /**
+   * Creates a {@link Message.Type#USER} type {@link Message} that sends the giving {@link Command} to a
+   * particular runnable.
+   *
+   * @param runnableName Name of the runnable.
+   * @param command The user command to send.
+   * @return A new instance of {@link Message}.
+   */
+  public static Message createForRunnable(String runnableName, Command command) {
+    return new SimpleMessage(Message.Type.USER, Message.Scope.RUNNABLE, runnableName, command);
+  }
+
+  /**
+   * Creates a {@link Message.Type#USER} type {@link Message} that sends the giving {@link Command} to all
+   * runnables.
+   *
+   * @param command The user command to send.
+   * @return A new instance of {@link Message}.
+   */
+  public static Message createForAll(Command command) {
+    return new SimpleMessage(Message.Type.USER, Message.Scope.ALL_RUNNABLE, null, command);
+  }
+
+  private Messages() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java b/twill-core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java
new file mode 100644
index 0000000..e146e56
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.state;
+
+import org.apache.twill.api.Command;
+import com.google.common.base.Objects;
+
+/**
+ *
+ */
+final class SimpleMessage implements Message {
+
+  private final Type type;
+  private final Scope scope;
+  private final String runnableName;
+  private final Command command;
+
+  SimpleMessage(Type type, Scope scope, String runnableName, Command command) {
+    this.type = type;
+    this.scope = scope;
+    this.runnableName = runnableName;
+    this.command = command;
+  }
+
+  @Override
+  public Type getType() {
+    return type;
+  }
+
+  @Override
+  public Scope getScope() {
+    return scope;
+  }
+
+  @Override
+  public String getRunnableName() {
+    return runnableName;
+  }
+
+  @Override
+  public Command getCommand() {
+    return command;
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(Message.class)
+      .add("type", type)
+      .add("scope", scope)
+      .add("runnable", runnableName)
+      .add("command", command)
+      .toString();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(type, scope, runnableName, command);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == this) {
+      return true;
+    }
+    if (!(obj instanceof Message)) {
+      return false;
+    }
+    Message other = (Message) obj;
+    return type == other.getType()
+      && scope == other.getScope()
+      && Objects.equal(runnableName, other.getRunnableName())
+      && Objects.equal(command, other.getCommand());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/state/StateNode.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/state/StateNode.java b/twill-core/src/main/java/org/apache/twill/internal/state/StateNode.java
new file mode 100644
index 0000000..d66f8a2
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/state/StateNode.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.state;
+
+import org.apache.twill.api.ServiceController;
+import com.google.common.util.concurrent.Service;
+
+/**
+ *
+ */
+public final class StateNode {
+
+  private final ServiceController.State state;
+  private final String errorMessage;
+  private final StackTraceElement[] stackTraces;
+
+  /**
+   * Constructs a StateNode with the given state.
+   */
+  public StateNode(ServiceController.State state) {
+    this(state, null, null);
+  }
+
+  /**
+   * Constructs a StateNode with {@link ServiceController.State#FAILED} caused by the given error.
+   */
+  public StateNode(Throwable error) {
+    this(Service.State.FAILED, error.getMessage(), error.getStackTrace());
+  }
+
+  /**
+   * Constructs a StateNode with the given state, error and stacktraces.
+   * This constructor should only be used by the StateNodeCodec.
+   */
+  public StateNode(ServiceController.State state, String errorMessage, StackTraceElement[] stackTraces) {
+    this.state = state;
+    this.errorMessage = errorMessage;
+    this.stackTraces = stackTraces;
+  }
+
+  public ServiceController.State getState() {
+    return state;
+  }
+
+  public String getErrorMessage() {
+    return errorMessage;
+  }
+
+  public StackTraceElement[] getStackTraces() {
+    return stackTraces;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder("state=").append(state);
+
+    if (errorMessage != null) {
+      builder.append("\n").append("error=").append(errorMessage);
+    }
+    if (stackTraces != null) {
+      builder.append("\n");
+      for (StackTraceElement stackTrace : stackTraces) {
+        builder.append("\tat ").append(stackTrace.toString()).append("\n");
+      }
+    }
+    return builder.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java b/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
new file mode 100644
index 0000000..9877121
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.state;
+
+import org.apache.twill.api.Command;
+import com.google.common.base.Preconditions;
+
+/**
+ * Collection of predefined system messages.
+ */
+public final class SystemMessages {
+
+  public static final Command STOP_COMMAND = Command.Builder.of("stop").build();
+  public static final Message SECURE_STORE_UPDATED = new SimpleMessage(
+    Message.Type.SYSTEM, Message.Scope.APPLICATION, null, Command.Builder.of("secureStoreUpdated").build());
+
+  public static Message stopApplication() {
+    return new SimpleMessage(Message.Type.SYSTEM, Message.Scope.APPLICATION, null, STOP_COMMAND);
+  }
+
+  public static Message stopRunnable(String runnableName) {
+    return new SimpleMessage(Message.Type.SYSTEM, Message.Scope.RUNNABLE, runnableName, STOP_COMMAND);
+  }
+
+  public static Message setInstances(String runnableName, int instances) {
+    Preconditions.checkArgument(instances > 0, "Instances should be > 0.");
+    return new SimpleMessage(Message.Type.SYSTEM, Message.Scope.RUNNABLE, runnableName,
+                             Command.Builder.of("instances").addOption("count", Integer.toString(instances)).build());
+  }
+
+  private SystemMessages() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/utils/Dependencies.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/utils/Dependencies.java b/twill-core/src/main/java/org/apache/twill/internal/utils/Dependencies.java
new file mode 100644
index 0000000..015b9f5
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/utils/Dependencies.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.utils;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.io.ByteStreams;
+import org.objectweb.asm.AnnotationVisitor;
+import org.objectweb.asm.ClassReader;
+import org.objectweb.asm.ClassVisitor;
+import org.objectweb.asm.FieldVisitor;
+import org.objectweb.asm.Label;
+import org.objectweb.asm.MethodVisitor;
+import org.objectweb.asm.Opcodes;
+import org.objectweb.asm.Type;
+import org.objectweb.asm.signature.SignatureReader;
+import org.objectweb.asm.signature.SignatureVisitor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URL;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * Utility class to help find out class dependencies.
+ */
+public final class Dependencies {
+
+  /**
+   * Represents a callback for accepting a class during dependency traversal.
+   */
+  public interface ClassAcceptor {
+    /**
+     * Invoked when a class is being found as a dependency.
+     *
+     * @param className Name of the class.
+     * @param classUrl URL for the class resource.
+     * @param classPathUrl URL for the class path resource that contains the class resource.
+     *                     If the URL protocol is {@code file}, it would be the path to root package.
+     *                     If the URL protocol is {@code jar}, it would be the jar file.
+     * @return true keep finding dependencies on the given class.
+     */
+    boolean accept(String className, URL classUrl, URL classPathUrl);
+  }
+
+  public static void findClassDependencies(ClassLoader classLoader,
+                                           ClassAcceptor acceptor,
+                                           String...classesToResolve) throws IOException {
+    findClassDependencies(classLoader, acceptor, ImmutableList.copyOf(classesToResolve));
+  }
+
+  /**
+   * Finds the class dependencies of the given class.
+   * @param classLoader ClassLoader for finding class bytecode.
+   * @param acceptor Predicate to accept a found class and its bytecode.
+   * @param classesToResolve Classes for looking for dependencies.
+   * @throws IOException Thrown where there is error when loading in class bytecode.
+   */
+  public static void findClassDependencies(ClassLoader classLoader,
+                                           ClassAcceptor acceptor,
+                                           Iterable<String> classesToResolve) throws IOException {
+
+    final Set<String> seenClasses = Sets.newHashSet(classesToResolve);
+    final Queue<String> classes = Lists.newLinkedList(classesToResolve);
+
+    // Breadth-first-search classes dependencies.
+    while (!classes.isEmpty()) {
+      String className = classes.remove();
+      URL classUrl = getClassURL(className, classLoader);
+      if (classUrl == null) {
+        continue;
+      }
+
+      // Call the accept to see if it accept the current class.
+      if (!acceptor.accept(className, classUrl, getClassPathURL(className, classUrl))) {
+        continue;
+      }
+
+      InputStream is = classUrl.openStream();
+      try {
+        // Visit the bytecode to lookup classes that the visiting class is depended on.
+        new ClassReader(ByteStreams.toByteArray(is)).accept(new DependencyClassVisitor(new DependencyAcceptor() {
+          @Override
+          public void accept(String className) {
+            // See if the class is accepted
+            if (seenClasses.add(className)) {
+              classes.add(className);
+            }
+          }
+        }), ClassReader.SKIP_DEBUG + ClassReader.SKIP_FRAMES);
+      } finally {
+        is.close();
+      }
+    }
+  }
+
+  /**
+   * Returns the URL for loading the class bytecode of the given class, or null if it is not found or if it is
+   * a system class.
+   */
+  private static URL getClassURL(String className, ClassLoader classLoader) {
+    String resourceName = className.replace('.', '/') + ".class";
+    return classLoader.getResource(resourceName);
+  }
+
+  private static URL getClassPathURL(String className, URL classUrl) {
+    try {
+      if ("file".equals(classUrl.getProtocol())) {
+        String path = classUrl.getFile();
+        // Compute the directory container the class.
+        int endIdx = path.length() - className.length() - ".class".length();
+        if (endIdx > 1) {
+          // If it is not the root directory, return the end index to remove the trailing '/'.
+          endIdx--;
+        }
+        return new URL("file", "", -1, path.substring(0, endIdx));
+      }
+      if ("jar".equals(classUrl.getProtocol())) {
+        String path = classUrl.getFile();
+        return URI.create(path.substring(0, path.indexOf("!/"))).toURL();
+      }
+    } catch (MalformedURLException e) {
+      throw Throwables.propagate(e);
+    }
+    throw new IllegalStateException("Unsupported class URL: " + classUrl);
+  }
+
+  /**
+   * A private interface for accepting a dependent class that is found during bytecode inspection.
+   */
+  private interface DependencyAcceptor {
+    void accept(String className);
+  }
+
+  /**
+   * ASM ClassVisitor for extracting classes dependencies.
+   */
+  private static final class DependencyClassVisitor extends ClassVisitor {
+
+    private final SignatureVisitor signatureVisitor;
+    private final DependencyAcceptor acceptor;
+
+    public DependencyClassVisitor(DependencyAcceptor acceptor) {
+      super(Opcodes.ASM4);
+      this.acceptor = acceptor;
+      this.signatureVisitor = new SignatureVisitor(Opcodes.ASM4) {
+        private String currentClass;
+
+        @Override
+        public void visitClassType(String name) {
+          currentClass = name;
+          addClass(name);
+        }
+
+        @Override
+        public void visitInnerClassType(String name) {
+          addClass(currentClass + "$" + name);
+        }
+      };
+    }
+
+    @Override
+    public void visit(int version, int access, String name, String signature, String superName, String[] interfaces) {
+      addClass(name);
+
+      if (signature != null) {
+        new SignatureReader(signature).accept(signatureVisitor);
+      } else {
+        addClass(superName);
+        addClasses(interfaces);
+      }
+    }
+
+    @Override
+    public void visitOuterClass(String owner, String name, String desc) {
+      addClass(owner);
+    }
+
+    @Override
+    public AnnotationVisitor visitAnnotation(String desc, boolean visible) {
+      addType(Type.getType(desc));
+      return null;
+    }
+
+    @Override
+    public void visitInnerClass(String name, String outerName, String innerName, int access) {
+      addClass(name);
+    }
+
+    @Override
+    public FieldVisitor visitField(int access, String name, String desc, String signature, Object value) {
+      if (signature != null) {
+        new SignatureReader(signature).acceptType(signatureVisitor);
+      } else {
+        addType(Type.getType(desc));
+      }
+
+      return new FieldVisitor(Opcodes.ASM4) {
+        @Override
+        public AnnotationVisitor visitAnnotation(String desc, boolean visible) {
+          addType(Type.getType(desc));
+          return null;
+        }
+      };
+    }
+
+    @Override
+    public MethodVisitor visitMethod(int access, String name, String desc, String signature, String[] exceptions) {
+      if (signature != null) {
+        new SignatureReader(signature).accept(signatureVisitor);
+      } else {
+        addMethod(desc);
+      }
+      addClasses(exceptions);
+
+      return new MethodVisitor(Opcodes.ASM4) {
+        @Override
+        public AnnotationVisitor visitAnnotation(String desc, boolean visible) {
+          addType(Type.getType(desc));
+          return null;
+        }
+
+        @Override
+        public AnnotationVisitor visitParameterAnnotation(int parameter, String desc, boolean visible) {
+          addType(Type.getType(desc));
+          return null;
+        }
+
+        @Override
+        public void visitTypeInsn(int opcode, String type) {
+          addType(Type.getObjectType(type));
+        }
+
+        @Override
+        public void visitFieldInsn(int opcode, String owner, String name, String desc) {
+          addType(Type.getObjectType(owner));
+          addType(Type.getType(desc));
+        }
+
+        @Override
+        public void visitMethodInsn(int opcode, String owner, String name, String desc) {
+          addType(Type.getObjectType(owner));
+          addMethod(desc);
+        }
+
+        @Override
+        public void visitLdcInsn(Object cst) {
+          if (cst instanceof Type) {
+            addType((Type) cst);
+          }
+        }
+
+        @Override
+        public void visitMultiANewArrayInsn(String desc, int dims) {
+          addType(Type.getType(desc));
+        }
+
+        @Override
+        public void visitLocalVariable(String name, String desc, String signature, Label start, Label end, int index) {
+          if (signature != null) {
+            new SignatureReader(signature).acceptType(signatureVisitor);
+          } else {
+            addType(Type.getType(desc));
+          }
+        }
+      };
+    }
+
+    private void addClass(String internalName) {
+      if (internalName == null || internalName.startsWith("java/")) {
+        return;
+      }
+      acceptor.accept(Type.getObjectType(internalName).getClassName());
+    }
+
+    private void addClasses(String[] classes) {
+      if (classes != null) {
+        for (String clz : classes) {
+          addClass(clz);
+        }
+      }
+    }
+
+    private void addType(Type type) {
+      if (type.getSort() == Type.ARRAY) {
+        type = type.getElementType();
+      }
+      if (type.getSort() == Type.OBJECT) {
+        addClass(type.getInternalName());
+      }
+    }
+
+    private void addMethod(String desc) {
+      addType(Type.getReturnType(desc));
+      for (Type type : Type.getArgumentTypes(desc)) {
+        addType(type);
+      }
+    }
+  }
+
+  private Dependencies() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/utils/Instances.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/utils/Instances.java b/twill-core/src/main/java/org/apache/twill/internal/utils/Instances.java
new file mode 100644
index 0000000..28bfce9
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/utils/Instances.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.utils;
+
+import com.google.common.base.Defaults;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.reflect.TypeToken;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+
+/**
+ * Utility class to help instantiate object instance from class.
+ */
+public final class Instances {
+
+  private static final Object UNSAFE;
+  private static final Method UNSAFE_NEW_INSTANCE;
+
+  static {
+    Object unsafe;
+    Method newInstance;
+    try {
+      Class<?> clz = Class.forName("sun.misc.Unsafe");
+      Field f = clz.getDeclaredField("theUnsafe");
+      f.setAccessible(true);
+      unsafe = f.get(null);
+
+      newInstance = clz.getMethod("allocateInstance", Class.class);
+    } catch (Exception e) {
+      unsafe = null;
+      newInstance = null;
+    }
+    UNSAFE = unsafe;
+    UNSAFE_NEW_INSTANCE = newInstance;
+  }
+
+  /**
+   * Creates a new instance of the given class. It will use the default constructor if it is presents.
+   * Otherwise it will try to use {@link sun.misc.Unsafe#allocateInstance(Class)} to create the instance.
+   * @param clz Class of object to be instantiated.
+   * @param <T> Type of the class
+   * @return An instance of type {@code <T>}
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> T newInstance(Class<T> clz) {
+    try {
+      try {
+        Constructor<T> cons = clz.getDeclaredConstructor();
+        if (!cons.isAccessible()) {
+          cons.setAccessible(true);
+        }
+        return cons.newInstance();
+      } catch (Exception e) {
+        // Try to use Unsafe
+        Preconditions.checkState(UNSAFE != null, "Fail to instantiate with Unsafe.");
+        return unsafeCreate(clz);
+      }
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+
+  /**
+   * Creates an instance of the given using Unsafe. It also initialize all fields into default values.
+   */
+  private static <T> T unsafeCreate(Class<T> clz) throws InvocationTargetException, IllegalAccessException {
+    T instance = (T) UNSAFE_NEW_INSTANCE.invoke(UNSAFE, clz);
+
+    for (TypeToken<?> type : TypeToken.of(clz).getTypes().classes()) {
+      if (Object.class.equals(type.getRawType())) {
+        break;
+      }
+      for (Field field : type.getRawType().getDeclaredFields()) {
+        if (Modifier.isStatic(field.getModifiers())) {
+          continue;
+        }
+        if (!field.isAccessible()) {
+          field.setAccessible(true);
+        }
+        field.set(instance, Defaults.defaultValue(field.getType()));
+      }
+    }
+
+    return instance;
+  }
+
+
+  private Instances() {
+    // Protect instantiation of this class
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/utils/Networks.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/utils/Networks.java b/twill-core/src/main/java/org/apache/twill/internal/utils/Networks.java
new file mode 100644
index 0000000..8e7d736
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/utils/Networks.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.utils;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+/**
+ *
+ */
+public final class Networks {
+
+  /**
+   * Find a random free port in localhost for binding.
+   * @return A port number or -1 for failure.
+   */
+  public static int getRandomPort() {
+    try {
+      ServerSocket socket = new ServerSocket(0);
+      try {
+        return socket.getLocalPort();
+      } finally {
+        socket.close();
+      }
+    } catch (IOException e) {
+      return -1;
+    }
+  }
+
+  private Networks() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/utils/Paths.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/utils/Paths.java b/twill-core/src/main/java/org/apache/twill/internal/utils/Paths.java
new file mode 100644
index 0000000..aeee09f
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/utils/Paths.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.utils;
+
+import com.google.common.io.Files;
+
+/**
+ *
+ */
+public final class Paths {
+
+
+  public static String appendSuffix(String extractFrom, String appendTo) {
+    String suffix = getExtension(extractFrom);
+    if (!suffix.isEmpty()) {
+      return appendTo + '.' + suffix;
+    }
+    return appendTo;
+  }
+
+  public static String getExtension(String path) {
+    if (path.endsWith(".tar.gz")) {
+      return "tar.gz";
+    }
+
+    return Files.getFileExtension(path);
+  }
+
+  private Paths() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/kafka/client/FetchException.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/FetchException.java b/twill-core/src/main/java/org/apache/twill/kafka/client/FetchException.java
new file mode 100644
index 0000000..acccf04
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/kafka/client/FetchException.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.kafka.client;
+
+/**
+ *
+ */
+public final class FetchException extends RuntimeException {
+
+  private final ErrorCode errorCode;
+
+  public FetchException(String message, ErrorCode errorCode) {
+    super(message);
+    this.errorCode = errorCode;
+  }
+
+  public ErrorCode getErrorCode() {
+    return errorCode;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s. Error code: %s", super.toString(), errorCode);
+  }
+
+  public enum ErrorCode {
+    UNKNOWN(-1),
+    OK(0),
+    OFFSET_OUT_OF_RANGE(1),
+    INVALID_MESSAGE(2),
+    WRONG_PARTITION(3),
+    INVALID_FETCH_SIZE(4);
+
+    private final int code;
+
+    ErrorCode(int code) {
+      this.code = code;
+    }
+
+    public int getCode() {
+      return code;
+    }
+
+    public static ErrorCode fromCode(int code) {
+      switch (code) {
+        case -1:
+          return UNKNOWN;
+        case 0:
+          return OK;
+        case 1:
+          return OFFSET_OUT_OF_RANGE;
+        case 2:
+          return INVALID_MESSAGE;
+        case 3:
+          return WRONG_PARTITION;
+        case 4:
+          return INVALID_FETCH_SIZE;
+      }
+      throw new IllegalArgumentException("Unknown error code");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/kafka/client/FetchedMessage.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/FetchedMessage.java b/twill-core/src/main/java/org/apache/twill/kafka/client/FetchedMessage.java
new file mode 100644
index 0000000..65e140f
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/kafka/client/FetchedMessage.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.kafka.client;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Represents a message fetched from kafka broker.
+ */
+public interface FetchedMessage {
+
+  /**
+   * Returns the message offset.
+   */
+  long getOffset();
+
+  /**
+   * Returns the message payload.
+   */
+  ByteBuffer getBuffer();
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaClient.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaClient.java b/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaClient.java
new file mode 100644
index 0000000..496195b
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaClient.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.kafka.client;
+
+import org.apache.twill.internal.kafka.client.Compression;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
+
+import java.util.Iterator;
+
+/**
+ * This interface provides methods for interacting with kafka broker. It also
+ * extends from {@link Service} for lifecycle management. The {@link #start()} method
+ * must be called prior to other methods in this class. When instance of this class
+ * is not needed, call {@link #stop()}} to release any resources that it holds.
+ */
+public interface KafkaClient extends Service {
+
+  PreparePublish preparePublish(String topic, Compression compression);
+
+  Iterator<FetchedMessage> consume(String topic, int partition, long offset, int maxSize);
+
+  /**
+   * Fetches offset from the given topic and partition.
+   * @param topic Topic to fetch from.
+   * @param partition Partition to fetch from.
+   * @param time The first offset of every segment file for a given partition with a modified time less than time.
+   *             {@code -1} for latest offset, {@code -2} for earliest offset.
+   * @param maxOffsets Maximum number of offsets to fetch.
+   * @return A Future that carry the result as an array of offsets in descending order.
+   *         The size of the result array would not be larger than maxOffsets. If there is any error during the fetch,
+   *         the exception will be carried in the exception.
+   */
+  ListenableFuture<long[]> getOffset(String topic, int partition, long time, int maxOffsets);
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/kafka/client/PreparePublish.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/PreparePublish.java b/twill-core/src/main/java/org/apache/twill/kafka/client/PreparePublish.java
new file mode 100644
index 0000000..5db4abb
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/kafka/client/PreparePublish.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.kafka.client;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This interface is for preparing to publish a set of messages to kafka.
+ */
+public interface PreparePublish {
+
+  PreparePublish add(byte[] payload, Object partitionKey);
+
+  PreparePublish add(ByteBuffer payload, Object partitionKey);
+
+  ListenableFuture<?> publish();
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/kafka/client/package-info.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/package-info.java b/twill-core/src/main/java/org/apache/twill/kafka/client/package-info.java
new file mode 100644
index 0000000..ea3bf20
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/kafka/client/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package provides a pure java Kafka client interface.
+ */
+package org.apache.twill.kafka.client;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java b/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java
new file mode 100644
index 0000000..2c8c1ef
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.launcher;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.jar.JarEntry;
+import java.util.jar.JarInputStream;
+
+/**
+ * A launcher for application from a archive jar.
+ * This class should have no dependencies on any library except the J2SE one.
+ * This class should not import any thing except java.*
+ */
+public final class TwillLauncher {
+
+  private static final int TEMP_DIR_ATTEMPTS = 20;
+
+  /**
+   * Main method to unpackage a jar and run the mainClass.main() method.
+   * @param args args[0] is the path to jar file, args[1] is the class name of the mainClass.
+   *             The rest of args will be passed the mainClass unmodified.
+   */
+  public static void main(String[] args) throws Exception {
+    if (args.length < 3) {
+      System.out.println("Usage: java " + TwillLauncher.class.getName() + " [jarFile] [mainClass] [use_classpath]");
+      return;
+    }
+
+    File file = new File(args[0]);
+    final File targetDir = createTempDir("twill.launcher");
+
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        System.out.println("Cleanup directory " + targetDir);
+        deleteDir(targetDir);
+      }
+    });
+
+    System.out.println("UnJar " + file + " to " + targetDir);
+    unJar(file, targetDir);
+
+    // Create ClassLoader
+    URLClassLoader classLoader = createClassLoader(targetDir, Boolean.parseBoolean(args[2]));
+    Thread.currentThread().setContextClassLoader(classLoader);
+
+    System.out.println("Launch class with classpath: " + Arrays.toString(classLoader.getURLs()));
+
+    Class<?> mainClass = classLoader.loadClass(args[1]);
+    Method mainMethod = mainClass.getMethod("main", String[].class);
+    String[] arguments = Arrays.copyOfRange(args, 3, args.length);
+    System.out.println("Launching main: " + mainMethod + " " + Arrays.toString(arguments));
+    mainMethod.invoke(mainClass, new Object[]{arguments});
+    System.out.println("Main class completed.");
+
+    System.out.println("Launcher completed");
+  }
+
+  /**
+   * This method is copied from Guava Files.createTempDir().
+   */
+  private static File createTempDir(String prefix) throws IOException {
+    File baseDir = new File(System.getProperty("java.io.tmpdir"));
+    if (!baseDir.isDirectory() && !baseDir.mkdirs()) {
+      throw new IOException("Tmp directory not exists: " + baseDir.getAbsolutePath());
+    }
+
+    String baseName = prefix + "-" + System.currentTimeMillis() + "-";
+
+    for (int counter = 0; counter < TEMP_DIR_ATTEMPTS; counter++) {
+      File tempDir = new File(baseDir, baseName + counter);
+      if (tempDir.mkdir()) {
+        return tempDir;
+      }
+    }
+    throw new IOException("Failed to create directory within "
+                            + TEMP_DIR_ATTEMPTS + " attempts (tried "
+                            + baseName + "0 to " + baseName + (TEMP_DIR_ATTEMPTS - 1) + ')');
+  }
+
+  private static void unJar(File jarFile, File targetDir) throws IOException {
+    JarInputStream jarInput = new JarInputStream(new FileInputStream(jarFile));
+    try {
+      JarEntry jarEntry = jarInput.getNextJarEntry();
+      while (jarEntry != null) {
+        File target = new File(targetDir, jarEntry.getName());
+        if (jarEntry.isDirectory()) {
+          target.mkdirs();
+        } else {
+          target.getParentFile().mkdirs();
+          copy(jarInput, target);
+        }
+        jarEntry = jarInput.getNextJarEntry();
+      }
+    } finally {
+      jarInput.close();
+    }
+  }
+
+  private static void copy(InputStream is, File file) throws IOException {
+    byte[] buf = new byte[8192];
+    OutputStream os = new BufferedOutputStream(new FileOutputStream(file));
+    try {
+      int len = is.read(buf);
+      while (len != -1) {
+        os.write(buf, 0, len);
+        len = is.read(buf);
+      }
+    } finally {
+      os.close();
+    }
+  }
+
+  private static URLClassLoader createClassLoader(File dir, boolean useClassPath) {
+    try {
+      List<URL> urls = new ArrayList<URL>();
+      urls.add(dir.toURI().toURL());
+      urls.add(new File(dir, "classes").toURI().toURL());
+      urls.add(new File(dir, "resources").toURI().toURL());
+
+      File libDir = new File(dir, "lib");
+      File[] files = libDir.listFiles();
+      if (files != null) {
+        for (File file : files) {
+          if (file.getName().endsWith(".jar")) {
+            urls.add(file.toURI().toURL());
+          }
+        }
+      }
+
+      if (useClassPath) {
+        InputStream is = ClassLoader.getSystemResourceAsStream("classpath");
+        if (is != null) {
+          try {
+            BufferedReader reader = new BufferedReader(new InputStreamReader(is, Charset.forName("UTF-8")));
+            String line = reader.readLine();
+            if (line != null) {
+              for (String path : line.split(":")) {
+                urls.addAll(getClassPaths(path));
+              }
+            }
+          } finally {
+            is.close();
+          }
+        }
+      }
+
+      return new URLClassLoader(urls.toArray(new URL[0]));
+
+    } catch (Exception e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  private static Collection<URL> getClassPaths(String path) throws MalformedURLException {
+    String classpath = expand(path);
+    if (classpath.endsWith("/*")) {
+      // Grab all .jar files
+      File dir = new File(classpath.substring(0, classpath.length() - 2));
+      File[] files = dir.listFiles();
+      if (files == null || files.length == 0) {
+        return singleItem(dir.toURI().toURL());
+      }
+
+      List<URL> result = new ArrayList<URL>(files.length);
+      for (File file : files) {
+        if (file.getName().endsWith(".jar")) {
+          result.add(file.toURI().toURL());
+        }
+      }
+      return result;
+    } else {
+      return singleItem(new File(classpath).toURI().toURL());
+    }
+  }
+
+  private static Collection<URL> singleItem(URL url) {
+    List<URL> result = new ArrayList<URL>(1);
+    result.add(url);
+    return result;
+  }
+
+  private static String expand(String value) {
+    String result = value;
+    for (Map.Entry<String, String> entry : System.getenv().entrySet()) {
+      result = result.replace("$" + entry.getKey(), entry.getValue());
+      result = result.replace("${" + entry.getKey() + "}", entry.getValue());
+    }
+    return result;
+  }
+
+  private static void deleteDir(File dir) {
+    File[] files = dir.listFiles();
+    if (files == null || files.length == 0) {
+      dir.delete();
+      return;
+    }
+    for (File file : files) {
+      deleteDir(file);
+    }
+    dir.delete();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/resources/kafka-0.7.2.tgz
----------------------------------------------------------------------
diff --git a/twill-core/src/main/resources/kafka-0.7.2.tgz b/twill-core/src/main/resources/kafka-0.7.2.tgz
new file mode 100644
index 0000000..24178d9
Binary files /dev/null and b/twill-core/src/main/resources/kafka-0.7.2.tgz differ

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java b/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java
new file mode 100644
index 0000000..382dc95
--- /dev/null
+++ b/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.api.Command;
+import org.apache.twill.api.ResourceReport;
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.ServiceController;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.logging.LogHandler;
+import org.apache.twill.common.ServiceListenerAdapter;
+import org.apache.twill.common.Threads;
+import org.apache.twill.internal.state.StateNode;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.ZKClient;
+import org.apache.twill.zookeeper.ZKClientService;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Service;
+import com.google.gson.JsonObject;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ *
+ */
+public class ControllerTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ControllerTest.class);
+
+  @Test
+  public void testController() throws ExecutionException, InterruptedException, TimeoutException {
+    InMemoryZKServer zkServer = InMemoryZKServer.builder().build();
+    zkServer.startAndWait();
+
+    LOG.info("ZKServer: " + zkServer.getConnectionStr());
+
+    try {
+      RunId runId = RunIds.generate();
+      ZKClientService zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+      zkClientService.startAndWait();
+
+      Service service = createService(zkClientService, runId);
+      service.startAndWait();
+
+      TwillController controller = getController(zkClientService, runId);
+      controller.sendCommand(Command.Builder.of("test").build()).get(2, TimeUnit.SECONDS);
+      controller.stop().get(2, TimeUnit.SECONDS);
+
+      Assert.assertEquals(ServiceController.State.TERMINATED, controller.state());
+
+      final CountDownLatch terminateLatch = new CountDownLatch(1);
+      service.addListener(new ServiceListenerAdapter() {
+        @Override
+        public void terminated(Service.State from) {
+          terminateLatch.countDown();
+        }
+      }, Threads.SAME_THREAD_EXECUTOR);
+
+      Assert.assertTrue(service.state() == Service.State.TERMINATED || terminateLatch.await(2, TimeUnit.SECONDS));
+
+      zkClientService.stopAndWait();
+
+    } finally {
+      zkServer.stopAndWait();
+    }
+  }
+
+  // Test controller created before service starts.
+  @Test
+  public void testControllerBefore() throws InterruptedException {
+    InMemoryZKServer zkServer = InMemoryZKServer.builder().build();
+    zkServer.startAndWait();
+
+    LOG.info("ZKServer: " + zkServer.getConnectionStr());
+    try {
+      RunId runId = RunIds.generate();
+      ZKClientService zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+      zkClientService.startAndWait();
+
+      final CountDownLatch runLatch = new CountDownLatch(1);
+      final CountDownLatch stopLatch = new CountDownLatch(1);
+      TwillController controller = getController(zkClientService, runId);
+      controller.addListener(new ServiceListenerAdapter() {
+        @Override
+        public void running() {
+          runLatch.countDown();
+        }
+
+        @Override
+        public void terminated(Service.State from) {
+          stopLatch.countDown();
+        }
+      }, Threads.SAME_THREAD_EXECUTOR);
+
+      Service service = createService(zkClientService, runId);
+      service.start();
+
+      Assert.assertTrue(runLatch.await(2, TimeUnit.SECONDS));
+      Assert.assertFalse(stopLatch.await(2, TimeUnit.SECONDS));
+
+      service.stop();
+
+      Assert.assertTrue(stopLatch.await(2, TimeUnit.SECONDS));
+
+    } finally {
+      zkServer.stopAndWait();
+    }
+  }
+
+  // Test controller listener receive first state change without state transition from service
+  @Test
+  public void testControllerListener() throws InterruptedException {
+    InMemoryZKServer zkServer = InMemoryZKServer.builder().build();
+    zkServer.startAndWait();
+
+    LOG.info("ZKServer: " + zkServer.getConnectionStr());
+    try {
+      RunId runId = RunIds.generate();
+      ZKClientService zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+      zkClientService.startAndWait();
+
+      Service service = createService(zkClientService, runId);
+      service.startAndWait();
+
+      final CountDownLatch runLatch = new CountDownLatch(1);
+      TwillController controller = getController(zkClientService, runId);
+      controller.addListener(new ServiceListenerAdapter() {
+        @Override
+        public void running() {
+          runLatch.countDown();
+        }
+      }, Threads.SAME_THREAD_EXECUTOR);
+
+      Assert.assertTrue(runLatch.await(2, TimeUnit.SECONDS));
+
+      service.stopAndWait();
+
+      zkClientService.stopAndWait();
+    } finally {
+      zkServer.stopAndWait();
+    }
+  }
+
+  private Service createService(ZKClient zkClient, RunId runId) {
+    return new ZKServiceDecorator(
+      zkClient, runId, Suppliers.ofInstance(new JsonObject()), new AbstractIdleService() {
+
+      @Override
+      protected void startUp() throws Exception {
+        LOG.info("Start");
+      }
+
+      @Override
+      protected void shutDown() throws Exception {
+        LOG.info("Stop");
+      }
+    });
+  }
+
+  private TwillController getController(ZKClient zkClient, RunId runId) {
+    TwillController controller = new AbstractTwillController(runId, zkClient, ImmutableList.<LogHandler>of()) {
+
+      @Override
+      public void kill() {
+        // No-op
+      }
+
+      @Override
+      protected void instanceNodeUpdated(NodeData nodeData) {
+        // No-op
+      }
+
+      @Override
+      protected void stateNodeUpdated(StateNode stateNode) {
+        // No-op
+      }
+
+      @Override
+      public ResourceReport getResourceReport() {
+        return null;
+      }
+    };
+    controller.startAndWait();
+    return controller;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/test/java/org/apache/twill/internal/state/MessageCodecTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/internal/state/MessageCodecTest.java b/twill-core/src/test/java/org/apache/twill/internal/state/MessageCodecTest.java
new file mode 100644
index 0000000..d267cf8
--- /dev/null
+++ b/twill-core/src/test/java/org/apache/twill/internal/state/MessageCodecTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.state;
+
+import org.apache.twill.api.Command;
+import com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+/**
+ *
+ */
+public class MessageCodecTest {
+
+  @Test
+  public void testCodec() {
+    Message message = MessageCodec.decode(MessageCodec.encode(new Message() {
+
+      @Override
+      public Type getType() {
+        return Type.SYSTEM;
+      }
+
+      @Override
+      public Scope getScope() {
+        return Scope.APPLICATION;
+      }
+
+      @Override
+      public String getRunnableName() {
+        return null;
+      }
+
+      @Override
+      public Command getCommand() {
+        return new Command() {
+          @Override
+          public String getCommand() {
+            return "stop";
+          }
+
+          @Override
+          public Map<String, String> getOptions() {
+            return ImmutableMap.of("timeout", "1", "timeoutUnit", "SECONDS");
+          }
+        };
+      }
+    }));
+
+    Assert.assertEquals(Message.Type.SYSTEM, message.getType());
+    Assert.assertEquals(Message.Scope.APPLICATION, message.getScope());
+    Assert.assertNull(message.getRunnableName());
+    Assert.assertEquals("stop", message.getCommand().getCommand());
+    Assert.assertEquals(ImmutableMap.of("timeout", "1", "timeoutUnit", "SECONDS"), message.getCommand().getOptions());
+  }
+
+  @Test
+  public void testFailureDecode() {
+    Assert.assertNull(MessageCodec.decode("".getBytes()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/test/java/org/apache/twill/internal/state/ZKServiceDecoratorTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/internal/state/ZKServiceDecoratorTest.java b/twill-core/src/test/java/org/apache/twill/internal/state/ZKServiceDecoratorTest.java
new file mode 100644
index 0000000..47d8562
--- /dev/null
+++ b/twill-core/src/test/java/org/apache/twill/internal/state/ZKServiceDecoratorTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.state;
+
+import org.apache.twill.api.RunId;
+import org.apache.twill.internal.RunIds;
+import org.apache.twill.internal.ZKServiceDecorator;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClients;
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Suppliers;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.Service;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ *
+ */
+public class ZKServiceDecoratorTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ZKServiceDecoratorTest.class);
+
+  @Test
+  public void testStateTransition() throws InterruptedException, ExecutionException, TimeoutException {
+    InMemoryZKServer zkServer = InMemoryZKServer.builder().build();
+    zkServer.startAndWait();
+
+    try {
+      final String namespace = Joiner.on('/').join("/twill", RunIds.generate(), "runnables", "Runner1");
+
+      final ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+      zkClient.startAndWait();
+      zkClient.create(namespace, null, CreateMode.PERSISTENT).get();
+
+      try {
+        JsonObject content = new JsonObject();
+        content.addProperty("containerId", "container-123");
+        content.addProperty("host", "localhost");
+
+        RunId runId = RunIds.generate();
+        final Semaphore semaphore = new Semaphore(0);
+        ZKServiceDecorator service = new ZKServiceDecorator(ZKClients.namespace(zkClient, namespace),
+                                                            runId, Suppliers.ofInstance(content),
+                                                            new AbstractIdleService() {
+          @Override
+          protected void startUp() throws Exception {
+            Preconditions.checkArgument(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Fail to start");
+          }
+
+          @Override
+          protected void shutDown() throws Exception {
+            Preconditions.checkArgument(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Fail to stop");
+          }
+        });
+
+        final String runnablePath = namespace + "/" + runId.getId();
+        final AtomicReference<String> stateMatch = new AtomicReference<String>("STARTING");
+        watchDataChange(zkClient, runnablePath + "/state", semaphore, stateMatch);
+        Assert.assertEquals(Service.State.RUNNING, service.start().get(5, TimeUnit.SECONDS));
+
+        stateMatch.set("STOPPING");
+        Assert.assertEquals(Service.State.TERMINATED, service.stop().get(5, TimeUnit.SECONDS));
+
+      } finally {
+        zkClient.stopAndWait();
+      }
+    } finally {
+      zkServer.stopAndWait();
+    }
+  }
+
+  private void watchDataChange(final ZKClientService zkClient, final String path,
+                               final Semaphore semaphore, final AtomicReference<String> stateMatch) {
+    Futures.addCallback(zkClient.getData(path, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        if (event.getType() == Event.EventType.NodeDataChanged) {
+          watchDataChange(zkClient, path, semaphore, stateMatch);
+        }
+      }
+    }), new FutureCallback<NodeData>() {
+      @Override
+      public void onSuccess(NodeData result) {
+        String content = new String(result.getData(), Charsets.UTF_8);
+        JsonObject json = new Gson().fromJson(content, JsonElement.class).getAsJsonObject();
+        if (stateMatch.get().equals(json.get("state").getAsString())) {
+          semaphore.release();
+        }
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        exists();
+      }
+
+      private void exists() {
+        Futures.addCallback(zkClient.exists(path, new Watcher() {
+          @Override
+          public void process(WatchedEvent event) {
+            if (event.getType() == Event.EventType.NodeCreated) {
+              watchDataChange(zkClient, path, semaphore, stateMatch);
+            }
+          }
+        }), new FutureCallback<Stat>() {
+          @Override
+          public void onSuccess(Stat result) {
+            if (result != null) {
+              watchDataChange(zkClient, path, semaphore, stateMatch);
+            }
+          }
+
+          @Override
+          public void onFailure(Throwable t) {
+            LOG.error(t.getMessage(), t);
+          }
+        });
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java b/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java
new file mode 100644
index 0000000..508cadb
--- /dev/null
+++ b/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.utils;
+
+import org.apache.twill.filesystem.LocalLocationFactory;
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.internal.ApplicationBundler;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.List;
+import java.util.jar.JarEntry;
+import java.util.jar.JarInputStream;
+
+/**
+ *
+ */
+public class ApplicationBundlerTest {
+
+  @Rule
+  public TemporaryFolder tmpDir = new TemporaryFolder();
+
+  @Test
+  public void testFindDependencies() throws IOException, ClassNotFoundException {
+    Location location = new LocalLocationFactory(tmpDir.newFolder()).create("test.jar");
+
+    // Create a jar file with by tracing dependency
+    ApplicationBundler bundler = new ApplicationBundler(ImmutableList.<String>of());
+    bundler.createBundle(location, ApplicationBundler.class);
+
+    File targetDir = tmpDir.newFolder();
+    unjar(new File(location.toURI()), targetDir);
+
+    // Load the class back, it should be loaded by the custom classloader
+    ClassLoader classLoader = createClassLoader(targetDir);
+    Class<?> clz = classLoader.loadClass(ApplicationBundler.class.getName());
+    Assert.assertSame(classLoader, clz.getClassLoader());
+
+    // For system classes, they shouldn't be packaged, hence loaded by different classloader.
+    clz = classLoader.loadClass(Object.class.getName());
+    Assert.assertNotSame(classLoader, clz.getClassLoader());
+  }
+
+  private void unjar(File jarFile, File targetDir) throws IOException {
+    JarInputStream jarInput = new JarInputStream(new FileInputStream(jarFile));
+    try {
+      JarEntry jarEntry = jarInput.getNextJarEntry();
+      while (jarEntry != null) {
+        File target = new File(targetDir, jarEntry.getName());
+        if (jarEntry.isDirectory()) {
+          target.mkdirs();
+        } else {
+          target.getParentFile().mkdirs();
+          ByteStreams.copy(jarInput, Files.newOutputStreamSupplier(target));
+        }
+
+        jarEntry = jarInput.getNextJarEntry();
+      }
+    } finally {
+      jarInput.close();
+    }
+  }
+
+  private ClassLoader createClassLoader(File dir) throws MalformedURLException {
+    List<URL> urls = Lists.newArrayList();
+    urls.add(new File(dir, "classes").toURI().toURL());
+    File[] libFiles = new File(dir, "lib").listFiles();
+    if (libFiles != null) {
+      for (File file : libFiles) {
+        urls.add(file.toURI().toURL());
+      }
+    }
+    return new URLClassLoader(urls.toArray(new URL[0])) {
+      @Override
+      protected synchronized Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+        // Load class from the given URLs first before delegating to parent.
+        try {
+          return super.findClass(name);
+        } catch (ClassNotFoundException e) {
+          ClassLoader parent = getParent();
+          return parent == null ? ClassLoader.getSystemClassLoader().loadClass(name) : parent.loadClass(name);
+        }
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
new file mode 100644
index 0000000..40fc3ed
--- /dev/null
+++ b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.kafka.client;
+
+import org.apache.twill.common.Services;
+import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
+import org.apache.twill.internal.kafka.client.Compression;
+import org.apache.twill.internal.kafka.client.SimpleKafkaClient;
+import org.apache.twill.internal.utils.Networks;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.zookeeper.ZKClientService;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.Futures;
+import org.apache.commons.compress.archivers.ArchiveEntry;
+import org.apache.commons.compress.archivers.ArchiveException;
+import org.apache.commons.compress.archivers.ArchiveInputStream;
+import org.apache.commons.compress.archivers.ArchiveStreamFactory;
+import org.apache.commons.compress.compressors.CompressorException;
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class KafkaTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaTest.class);
+
+  @ClassRule
+  public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+
+  private static InMemoryZKServer zkServer;
+  private static EmbeddedKafkaServer kafkaServer;
+  private static ZKClientService zkClientService;
+  private static KafkaClient kafkaClient;
+
+  @BeforeClass
+  public static void init() throws Exception {
+    zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build();
+    zkServer.startAndWait();
+
+    // Extract the kafka.tgz and start the kafka server
+    kafkaServer = new EmbeddedKafkaServer(extractKafka(), generateKafkaConfig(zkServer.getConnectionStr()));
+    kafkaServer.startAndWait();
+
+    zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+
+    kafkaClient = new SimpleKafkaClient(zkClientService);
+    Services.chainStart(zkClientService, kafkaClient).get();
+  }
+
+  @AfterClass
+  public static void finish() throws Exception {
+    Services.chainStop(kafkaClient, zkClientService).get();
+    kafkaServer.stopAndWait();
+    zkServer.stopAndWait();
+  }
+
+  @Test
+  public void testKafkaClient() throws Exception {
+    String topic = "testClient";
+
+    Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10);
+    Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10);
+
+    t1.start();
+    t2.start();
+
+    Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10);
+    t2.join();
+    t3.start();
+
+    Iterator<FetchedMessage> consumer = kafkaClient.consume(topic, 0, 0, 1048576);
+    int count = 0;
+    long startTime = System.nanoTime();
+    while (count < 30 && consumer.hasNext() && secondsPassed(startTime, TimeUnit.NANOSECONDS) < 5) {
+      LOG.info(Charsets.UTF_8.decode(consumer.next().getBuffer()).toString());
+      count++;
+    }
+
+    Assert.assertEquals(30, count);
+  }
+
+  @Test (timeout = 10000)
+  public void testOffset() throws Exception {
+    String topic = "testOffset";
+
+    // Initial earliest offset should be 0.
+    long[] offsets = kafkaClient.getOffset(topic, 0, -2, 10).get();
+    Assert.assertArrayEquals(new long[]{0L}, offsets);
+
+    // Publish some messages
+    Thread publishThread = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing", 2000);
+    publishThread.start();
+    publishThread.join();
+
+    // Fetch earliest offset, should still be 0.
+    offsets = kafkaClient.getOffset(topic, 0, -2, 10).get();
+    Assert.assertArrayEquals(new long[]{0L}, offsets);
+
+    // Fetch latest offset
+    offsets = kafkaClient.getOffset(topic, 0, -1, 10).get();
+    Iterator<FetchedMessage> consumer = kafkaClient.consume(topic, 0, offsets[0], 1048576);
+
+    // Publish one more message, the consumer should see the new message being published.
+    publishThread = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing", 1, 3000);
+    publishThread.start();
+    publishThread.join();
+
+    // Should see the last message being published.
+    Assert.assertTrue(consumer.hasNext());
+    Assert.assertEquals("3000 Testing", Charsets.UTF_8.decode(consumer.next().getBuffer()).toString());
+  }
+
+  private Thread createPublishThread(final KafkaClient kafkaClient, final String topic,
+                                     final Compression compression, final String message, final int count) {
+    return createPublishThread(kafkaClient, topic, compression, message, count, 0);
+  }
+
+  private Thread createPublishThread(final KafkaClient kafkaClient, final String topic, final Compression compression,
+                                     final String message, final int count, final int base) {
+    return new Thread() {
+      public void run() {
+        PreparePublish preparePublish = kafkaClient.preparePublish(topic, compression);
+        for (int i = 0; i < count; i++) {
+          preparePublish.add(((base + i) + " " + message).getBytes(Charsets.UTF_8), 0);
+        }
+        Futures.getUnchecked(preparePublish.publish());
+      }
+    };
+  }
+
+  private long secondsPassed(long startTime, TimeUnit startUnit) {
+    return TimeUnit.SECONDS.convert(System.nanoTime() - TimeUnit.NANOSECONDS.convert(startTime, startUnit),
+                                    TimeUnit.NANOSECONDS);
+  }
+
+  private static File extractKafka() throws IOException, ArchiveException, CompressorException {
+    File kafkaExtract = TMP_FOLDER.newFolder();
+    InputStream kakfaResource = KafkaTest.class.getClassLoader().getResourceAsStream("kafka-0.7.2.tgz");
+    ArchiveInputStream archiveInput = new ArchiveStreamFactory()
+      .createArchiveInputStream(ArchiveStreamFactory.TAR,
+                                new CompressorStreamFactory()
+                                  .createCompressorInputStream(CompressorStreamFactory.GZIP, kakfaResource));
+
+    try {
+      ArchiveEntry entry = archiveInput.getNextEntry();
+      while (entry != null) {
+        File file = new File(kafkaExtract, entry.getName());
+        if (entry.isDirectory()) {
+          file.mkdirs();
+        } else {
+          ByteStreams.copy(archiveInput, Files.newOutputStreamSupplier(file));
+        }
+        entry = archiveInput.getNextEntry();
+      }
+    } finally {
+      archiveInput.close();
+    }
+    return kafkaExtract;
+  }
+
+  private static Properties generateKafkaConfig(String zkConnectStr) throws IOException {
+    int port = Networks.getRandomPort();
+    Preconditions.checkState(port > 0, "Failed to get random port.");
+
+    Properties prop = new Properties();
+    prop.setProperty("log.dir", TMP_FOLDER.newFolder().getAbsolutePath());
+    prop.setProperty("zk.connect", zkConnectStr);
+    prop.setProperty("num.threads", "8");
+    prop.setProperty("port", Integer.toString(port));
+    prop.setProperty("log.flush.interval", "1000");
+    prop.setProperty("max.socket.request.bytes", "104857600");
+    prop.setProperty("log.cleanup.interval.mins", "1");
+    prop.setProperty("log.default.flush.scheduler.interval.ms", "1000");
+    prop.setProperty("zk.connectiontimeout.ms", "1000000");
+    prop.setProperty("socket.receive.buffer", "1048576");
+    prop.setProperty("enable.zookeeper", "true");
+    prop.setProperty("log.retention.hours", "24");
+    prop.setProperty("brokerid", "0");
+    prop.setProperty("socket.send.buffer", "1048576");
+    prop.setProperty("num.partitions", "1");
+    // Use a really small file size to force some flush to happen
+    prop.setProperty("log.file.size", "1024");
+    prop.setProperty("log.default.flush.interval.ms", "1000");
+    return prop;
+  }
+}