You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/11/21 22:54:35 UTC

[12/15] Initial import commit.

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
new file mode 100644
index 0000000..63f8732
--- /dev/null
+++ b/core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.api.LocalFile;
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.RuntimeSpecification;
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.state.StateNode;
+import org.apache.twill.launcher.TwillLauncher;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.ZKClient;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * This class helps launching a container.
+ */
+public final class TwillContainerLauncher {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TwillContainerLauncher.class);
+
+  private static final double HEAP_MIN_RATIO = 0.7d;
+
+  private final RuntimeSpecification runtimeSpec;
+  private final ProcessLauncher.PrepareLaunchContext launchContext;
+  private final ZKClient zkClient;
+  private final int instanceCount;
+  private final String jvmOpts;
+  private final int reservedMemory;
+  private final Location secureStoreLocation;
+
+  public TwillContainerLauncher(RuntimeSpecification runtimeSpec, ProcessLauncher.PrepareLaunchContext launchContext,
+                                ZKClient zkClient, int instanceCount, String jvmOpts, int reservedMemory,
+                                Location secureStoreLocation) {
+    this.runtimeSpec = runtimeSpec;
+    this.launchContext = launchContext;
+    this.zkClient = zkClient;
+    this.instanceCount = instanceCount;
+    this.jvmOpts = jvmOpts;
+    this.reservedMemory = reservedMemory;
+    this.secureStoreLocation = secureStoreLocation;
+  }
+
+  public TwillContainerController start(RunId runId, int instanceId, Class<?> mainClass, String classPath) {
+    ProcessLauncher.PrepareLaunchContext.AfterResources afterResources = null;
+    ProcessLauncher.PrepareLaunchContext.ResourcesAdder resourcesAdder = null;
+
+    // Adds all file to be localized to container
+    if (!runtimeSpec.getLocalFiles().isEmpty()) {
+      resourcesAdder = launchContext.withResources();
+
+      for (LocalFile localFile : runtimeSpec.getLocalFiles()) {
+        afterResources = resourcesAdder.add(localFile);
+      }
+    }
+
+    // Optionally localize secure store.
+    try {
+      if (secureStoreLocation != null && secureStoreLocation.exists()) {
+        if (resourcesAdder == null) {
+          resourcesAdder = launchContext.withResources();
+        }
+        afterResources = resourcesAdder.add(new DefaultLocalFile(Constants.Files.CREDENTIALS,
+                                                                 secureStoreLocation.toURI(),
+                                                                 secureStoreLocation.lastModified(),
+                                                                 secureStoreLocation.length(), false, null));
+      }
+    } catch (IOException e) {
+      LOG.warn("Failed to launch container with secure store {}.", secureStoreLocation.toURI());
+    }
+
+    if (afterResources == null) {
+      afterResources = launchContext.noResources();
+    }
+
+    int memory = runtimeSpec.getResourceSpecification().getMemorySize();
+    if (((double) (memory - reservedMemory) / memory) >= HEAP_MIN_RATIO) {
+      // Reduce -Xmx by the reserved memory size.
+      memory = runtimeSpec.getResourceSpecification().getMemorySize() - reservedMemory;
+    } else {
+      // If it is a small VM, just discount it by the min ratio.
+      memory = (int) Math.ceil(memory * HEAP_MIN_RATIO);
+    }
+
+    // Currently no reporting is supported for runnable containers
+    ProcessController<Void> processController = afterResources
+      .withEnvironment()
+      .add(EnvKeys.TWILL_RUN_ID, runId.getId())
+      .add(EnvKeys.TWILL_RUNNABLE_NAME, runtimeSpec.getName())
+      .add(EnvKeys.TWILL_INSTANCE_ID, Integer.toString(instanceId))
+      .add(EnvKeys.TWILL_INSTANCE_COUNT, Integer.toString(instanceCount))
+      .withCommands()
+      .add("java",
+           "-Djava.io.tmpdir=tmp",
+           "-Dyarn.container=$" + EnvKeys.YARN_CONTAINER_ID,
+           "-Dtwill.runnable=$" + EnvKeys.TWILL_APP_NAME + ".$" + EnvKeys.TWILL_RUNNABLE_NAME,
+           "-cp", Constants.Files.LAUNCHER_JAR + ":" + classPath,
+           "-Xmx" + memory + "m",
+           jvmOpts,
+           TwillLauncher.class.getName(),
+           Constants.Files.CONTAINER_JAR,
+           mainClass.getName(),
+           Boolean.TRUE.toString())
+      .redirectOutput(Constants.STDOUT).redirectError(Constants.STDERR)
+      .launch();
+
+    TwillContainerControllerImpl controller = new TwillContainerControllerImpl(zkClient, runId, processController);
+    controller.start();
+    return controller;
+  }
+
+  private static final class TwillContainerControllerImpl extends AbstractZKServiceController
+                                                          implements TwillContainerController {
+
+    private final ProcessController<Void> processController;
+
+    protected TwillContainerControllerImpl(ZKClient zkClient, RunId runId,
+                                           ProcessController<Void> processController) {
+      super(runId, zkClient);
+      this.processController = processController;
+    }
+
+    @Override
+    protected void doStartUp() {
+      // No-op
+    }
+
+    @Override
+    protected void doShutDown() {
+      // No-op
+    }
+
+    @Override
+    protected void instanceNodeUpdated(NodeData nodeData) {
+      // No-op
+    }
+
+    @Override
+    protected void stateNodeUpdated(StateNode stateNode) {
+      // No-op
+    }
+
+    @Override
+    public ListenableFuture<Message> sendMessage(Message message) {
+      return sendMessage(message, message);
+    }
+
+    @Override
+    public synchronized void completed(int exitStatus) {
+      if (exitStatus != 0) {  // If a container terminated with exit code != 0, treat it as error
+//        fireStateChange(new StateNode(State.FAILED, new StackTraceElement[0]));
+      }
+      forceShutDown();
+    }
+
+    @Override
+    public void kill() {
+      processController.cancel();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/core/src/main/java/org/apache/twill/internal/ZKMessages.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/ZKMessages.java b/core/src/main/java/org/apache/twill/internal/ZKMessages.java
new file mode 100644
index 0000000..03575dd
--- /dev/null
+++ b/core/src/main/java/org/apache/twill/internal/ZKMessages.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.state.MessageCodec;
+import org.apache.twill.zookeeper.ZKClient;
+import org.apache.twill.zookeeper.ZKOperations;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.zookeeper.CreateMode;
+
+/**
+ *
+ */
+public final class ZKMessages {
+
+  /**
+   * Creates a message node in zookeeper. The message node created is a PERSISTENT_SEQUENTIAL node.
+   *
+   * @param zkClient The ZooKeeper client for interacting with ZooKeeper.
+   * @param messagePathPrefix ZooKeeper path prefix for the message node.
+   * @param message The {@link Message} object for the content of the message node.
+   * @param completionResult Object to set to the result future when the message is processed.
+   * @param <V> Type of the completion result.
+   * @return A {@link ListenableFuture} that will be completed when the message is consumed, which indicated
+   *         by deletion of the node. If there is exception during the process, it will be reflected
+   *         to the future returned.
+   */
+  public static <V> ListenableFuture<V> sendMessage(final ZKClient zkClient, String messagePathPrefix,
+                                                    Message message, final V completionResult) {
+    SettableFuture<V> result = SettableFuture.create();
+    sendMessage(zkClient, messagePathPrefix, message, result, completionResult);
+    return result;
+  }
+
+  /**
+   * Creates a message node in zookeeper. The message node created is a PERSISTENT_SEQUENTIAL node.
+   *
+   * @param zkClient The ZooKeeper client for interacting with ZooKeeper.
+   * @param messagePathPrefix ZooKeeper path prefix for the message node.
+   * @param message The {@link Message} object for the content of the message node.
+   * @param completion A {@link SettableFuture} to reflect the result of message process completion.
+   * @param completionResult Object to set to the result future when the message is processed.
+   * @param <V> Type of the completion result.
+   */
+  public static <V> void sendMessage(final ZKClient zkClient, String messagePathPrefix, Message message,
+                                     final SettableFuture<V> completion, final V completionResult) {
+
+    // Creates a message and watch for its deletion for completion.
+    Futures.addCallback(zkClient.create(messagePathPrefix, MessageCodec.encode(message),
+                                        CreateMode.PERSISTENT_SEQUENTIAL), new FutureCallback<String>() {
+      @Override
+      public void onSuccess(String path) {
+        Futures.addCallback(ZKOperations.watchDeleted(zkClient, path), new FutureCallback<String>() {
+          @Override
+          public void onSuccess(String result) {
+            completion.set(completionResult);
+          }
+
+          @Override
+          public void onFailure(Throwable t) {
+            completion.setException(t);
+          }
+        });
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        completion.setException(t);
+      }
+    });
+  }
+
+  private ZKMessages() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/core/src/main/java/org/apache/twill/internal/ZKServiceDecorator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/ZKServiceDecorator.java b/core/src/main/java/org/apache/twill/internal/ZKServiceDecorator.java
new file mode 100644
index 0000000..7313d33
--- /dev/null
+++ b/core/src/main/java/org/apache/twill/internal/ZKServiceDecorator.java
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.ServiceController;
+import org.apache.twill.common.ServiceListenerAdapter;
+import org.apache.twill.common.Threads;
+import org.apache.twill.internal.json.StackTraceElementCodec;
+import org.apache.twill.internal.json.StateNodeCodec;
+import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.state.MessageCallback;
+import org.apache.twill.internal.state.MessageCodec;
+import org.apache.twill.internal.state.StateNode;
+import org.apache.twill.internal.state.SystemMessages;
+import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.OperationFuture;
+import org.apache.twill.zookeeper.ZKClient;
+import org.apache.twill.zookeeper.ZKOperations;
+import com.google.common.base.Charsets;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Service;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link Service} decorator that wrap another {@link Service} with the service states reflected
+ * to ZooKeeper.
+ */
+public final class ZKServiceDecorator extends AbstractService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ZKServiceDecorator.class);
+
+  private final ZKClient zkClient;
+  private final RunId id;
+  private final Supplier<? extends JsonElement> liveNodeData;
+  private final Service decoratedService;
+  private final MessageCallbackCaller messageCallback;
+  private ExecutorService callbackExecutor;
+
+
+  public ZKServiceDecorator(ZKClient zkClient, RunId id, Supplier<? extends JsonElement> liveNodeData,
+                            Service decoratedService) {
+    this(zkClient, id, liveNodeData, decoratedService, null);
+  }
+
+  /**
+   * Creates a ZKServiceDecorator.
+   * @param zkClient ZooKeeper client
+   * @param id The run id of the service
+   * @param liveNodeData A supplier for providing information writing to live node.
+   * @param decoratedService The Service for monitoring state changes
+   * @param finalizer An optional Runnable to run when this decorator terminated.
+   */
+  public ZKServiceDecorator(ZKClient zkClient, RunId id, Supplier <? extends JsonElement> liveNodeData,
+                            Service decoratedService, @Nullable Runnable finalizer) {
+    this.zkClient = zkClient;
+    this.id = id;
+    this.liveNodeData = liveNodeData;
+    this.decoratedService = decoratedService;
+    if (decoratedService instanceof MessageCallback) {
+      this.messageCallback = new MessageCallbackCaller((MessageCallback) decoratedService, zkClient);
+    } else {
+      this.messageCallback = new MessageCallbackCaller(zkClient);
+    }
+    if (finalizer != null) {
+      addFinalizer(finalizer);
+    }
+  }
+
+  /**
+   * Deletes the given ZK path recursively and create the path again.
+   */
+  private ListenableFuture<String> deleteAndCreate(final String path, final byte[] data, final CreateMode mode) {
+    return Futures.transform(ZKOperations.ignoreError(ZKOperations.recursiveDelete(zkClient, path),
+                                                      KeeperException.NoNodeException.class, null),
+                             new AsyncFunction<String, String>() {
+      @Override
+      public ListenableFuture<String> apply(String input) throws Exception {
+        return zkClient.create(path, data, mode);
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+  }
+
+  @Override
+  protected void doStart() {
+    callbackExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("message-callback"));
+    Futures.addCallback(createLiveNode(), new FutureCallback<String>() {
+      @Override
+      public void onSuccess(String result) {
+        // Create nodes for states and messaging
+        StateNode stateNode = new StateNode(ServiceController.State.STARTING);
+
+        final ListenableFuture<List<String>> createFuture = Futures.allAsList(
+          deleteAndCreate(getZKPath("messages"), null, CreateMode.PERSISTENT),
+          deleteAndCreate(getZKPath("state"), encodeStateNode(stateNode), CreateMode.PERSISTENT)
+        );
+
+        createFuture.addListener(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              createFuture.get();
+              // Starts the decorated service
+              decoratedService.addListener(createListener(), Threads.SAME_THREAD_EXECUTOR);
+              decoratedService.start();
+            } catch (Exception e) {
+              notifyFailed(e);
+            }
+          }
+        }, Threads.SAME_THREAD_EXECUTOR);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        notifyFailed(t);
+      }
+    });
+  }
+
+  @Override
+  protected void doStop() {
+    // Stops the decorated service
+    decoratedService.stop();
+    callbackExecutor.shutdownNow();
+  }
+
+  private void addFinalizer(final Runnable finalizer) {
+    addListener(new ServiceListenerAdapter() {
+      @Override
+      public void terminated(State from) {
+        try {
+          finalizer.run();
+        } catch (Throwable t) {
+          LOG.warn("Exception when running finalizer.", t);
+        }
+      }
+
+      @Override
+      public void failed(State from, Throwable failure) {
+        try {
+          finalizer.run();
+        } catch (Throwable t) {
+          LOG.warn("Exception when running finalizer.", t);
+        }
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+  }
+
+  private OperationFuture<String> createLiveNode() {
+    String liveNode = getLiveNodePath();
+    LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNode);
+
+    JsonObject content = new JsonObject();
+    content.add("data", liveNodeData.get());
+    return ZKOperations.ignoreError(zkClient.create(liveNode, encodeJson(content), CreateMode.EPHEMERAL),
+                                    KeeperException.NodeExistsException.class, liveNode);
+  }
+
+  private OperationFuture<String> removeLiveNode() {
+    String liveNode = getLiveNodePath();
+    LOG.info("Remove live node {}{}", zkClient.getConnectString(), liveNode);
+    return ZKOperations.ignoreError(zkClient.delete(liveNode), KeeperException.NoNodeException.class, liveNode);
+  }
+
+  private OperationFuture<String> removeServiceNode() {
+    String serviceNode = String.format("/%s", id.getId());
+    LOG.info("Remove service node {}{}", zkClient.getConnectString(), serviceNode);
+    return ZKOperations.recursiveDelete(zkClient, serviceNode);
+  }
+
+  private void watchMessages() {
+    final String messagesPath = getZKPath("messages");
+    Futures.addCallback(zkClient.getChildren(messagesPath, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        // TODO: Do we need to deal with other type of events?
+        if (event.getType() == Event.EventType.NodeChildrenChanged && decoratedService.isRunning()) {
+          watchMessages();
+        }
+      }
+    }), new FutureCallback<NodeChildren>() {
+      @Override
+      public void onSuccess(NodeChildren result) {
+        // Sort by the name, which is the messageId. Assumption is that message ids is ordered by time.
+        List<String> messages = Lists.newArrayList(result.getChildren());
+        Collections.sort(messages);
+        for (String messageId : messages) {
+          processMessage(messagesPath + "/" + messageId, messageId);
+        }
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        // TODO: what could be done besides just logging?
+        LOG.error("Failed to watch messages.", t);
+      }
+    });
+  }
+
+  private void processMessage(final String path, final String messageId) {
+    Futures.addCallback(zkClient.getData(path), new FutureCallback<NodeData>() {
+      @Override
+      public void onSuccess(NodeData result) {
+        Message message = MessageCodec.decode(result.getData());
+        if (message == null) {
+          LOG.error("Failed to decode message for " + messageId + " in " + path);
+          listenFailure(zkClient.delete(path, result.getStat().getVersion()));
+          return;
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Message received from " + path + ": " + new String(MessageCodec.encode(message), Charsets.UTF_8));
+        }
+        if (handleStopMessage(message, getDeleteSupplier(path, result.getStat().getVersion()))) {
+          return;
+        }
+        messageCallback.onReceived(callbackExecutor, path, result.getStat().getVersion(), messageId, message);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        LOG.error("Failed to fetch message content.", t);
+      }
+    });
+  }
+
+  private <V> boolean handleStopMessage(Message message, final Supplier<OperationFuture<V>> postHandleSupplier) {
+    if (message.getType() == Message.Type.SYSTEM && SystemMessages.STOP_COMMAND.equals(message.getCommand())) {
+      callbackExecutor.execute(new Runnable() {
+        @Override
+        public void run() {
+          decoratedService.stop().addListener(new Runnable() {
+
+            @Override
+            public void run() {
+              stopServiceOnComplete(postHandleSupplier.get(), ZKServiceDecorator.this);
+            }
+          }, MoreExecutors.sameThreadExecutor());
+        }
+      });
+      return true;
+    }
+    return false;
+  }
+
+
+  private Supplier<OperationFuture<String>> getDeleteSupplier(final String path, final int version) {
+    return new Supplier<OperationFuture<String>>() {
+      @Override
+      public OperationFuture<String> get() {
+        return zkClient.delete(path, version);
+      }
+    };
+  }
+
+  private Listener createListener() {
+    return new DecoratedServiceListener();
+  }
+
+  private <V> byte[] encode(V data, Class<? extends V> clz) {
+    return new GsonBuilder().registerTypeAdapter(StateNode.class, new StateNodeCodec())
+                            .registerTypeAdapter(StackTraceElement.class, new StackTraceElementCodec())
+                            .create()
+      .toJson(data, clz).getBytes(Charsets.UTF_8);
+  }
+
+  private byte[] encodeStateNode(StateNode stateNode) {
+    return encode(stateNode, StateNode.class);
+  }
+
+  private <V extends JsonElement> byte[] encodeJson(V json) {
+    return new Gson().toJson(json).getBytes(Charsets.UTF_8);
+  }
+
+  private String getZKPath(String path) {
+    return String.format("/%s/%s", id, path);
+  }
+
+  private String getLiveNodePath() {
+    return "/instances/" + id;
+  }
+
+  private static <V> OperationFuture<V> listenFailure(final OperationFuture<V> operationFuture) {
+    operationFuture.addListener(new Runnable() {
+
+      @Override
+      public void run() {
+        try {
+          if (!operationFuture.isCancelled()) {
+            operationFuture.get();
+          }
+        } catch (Exception e) {
+          // TODO: what could be done besides just logging?
+          LOG.error("Operation execution failed for " + operationFuture.getRequestPath(), e);
+        }
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+    return operationFuture;
+  }
+
+  private static final class MessageCallbackCaller {
+    private final MessageCallback callback;
+    private final ZKClient zkClient;
+
+    private MessageCallbackCaller(ZKClient zkClient) {
+      this(null, zkClient);
+    }
+
+    private MessageCallbackCaller(MessageCallback callback, ZKClient zkClient) {
+      this.callback = callback;
+      this.zkClient = zkClient;
+    }
+
+    public void onReceived(Executor executor, final String path,
+                           final int version, final String id, final Message message) {
+      if (callback == null) {
+        // Simply delete the message
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ignoring incoming message from " + path + ": " + message);
+        }
+        listenFailure(zkClient.delete(path, version));
+        return;
+      }
+
+      executor.execute(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            // Message process is synchronous for now. Making it async needs more thoughts about race conditions.
+            // The executor is the callbackExecutor which is a single thread executor.
+            callback.onReceived(id, message).get();
+          } catch (Throwable t) {
+            LOG.error("Exception when processing message: {}, {}, {}", id, message, path, t);
+          } finally {
+            listenFailure(zkClient.delete(path, version));
+          }
+        }
+      });
+    }
+  }
+
+  private final class DecoratedServiceListener implements Listener {
+    private volatile boolean zkFailure = false;
+
+    @Override
+    public void starting() {
+      LOG.info("Starting: " + id);
+      saveState(ServiceController.State.STARTING);
+    }
+
+    @Override
+    public void running() {
+      LOG.info("Running: " + id);
+      notifyStarted();
+      watchMessages();
+      saveState(ServiceController.State.RUNNING);
+    }
+
+    @Override
+    public void stopping(State from) {
+      LOG.info("Stopping: " + id);
+      saveState(ServiceController.State.STOPPING);
+    }
+
+    @Override
+    public void terminated(State from) {
+      LOG.info("Terminated: " + from + " " + id);
+      if (zkFailure) {
+        return;
+      }
+
+      ImmutableList<OperationFuture<String>> futures = ImmutableList.of(removeLiveNode(), removeServiceNode());
+      final ListenableFuture<List<String>> future = Futures.allAsList(futures);
+      Futures.successfulAsList(futures).addListener(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            future.get();
+            LOG.info("Service and state node removed");
+            notifyStopped();
+          } catch (Exception e) {
+            LOG.warn("Failed to remove ZK nodes.", e);
+            notifyFailed(e);
+          }
+        }
+      }, Threads.SAME_THREAD_EXECUTOR);
+    }
+
+    @Override
+    public void failed(State from, final Throwable failure) {
+      LOG.info("Failed: {} {}.", from, id, failure);
+      if (zkFailure) {
+        return;
+      }
+
+      ImmutableList<OperationFuture<String>> futures = ImmutableList.of(removeLiveNode(), removeServiceNode());
+      Futures.successfulAsList(futures).addListener(new Runnable() {
+        @Override
+        public void run() {
+          LOG.info("Service and state node removed");
+          notifyFailed(failure);
+        }
+      }, Threads.SAME_THREAD_EXECUTOR);
+    }
+
+    private void saveState(ServiceController.State state) {
+      if (zkFailure) {
+        return;
+      }
+      StateNode stateNode = new StateNode(state);
+      stopOnFailure(zkClient.setData(getZKPath("state"), encodeStateNode(stateNode)));
+    }
+
+    private <V> void stopOnFailure(final OperationFuture<V> future) {
+      future.addListener(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            future.get();
+          } catch (final Exception e) {
+            LOG.error("ZK operation failed", e);
+            zkFailure = true;
+            decoratedService.stop().addListener(new Runnable() {
+              @Override
+              public void run() {
+                notifyFailed(e);
+              }
+            }, Threads.SAME_THREAD_EXECUTOR);
+          }
+        }
+      }, Threads.SAME_THREAD_EXECUTOR);
+    }
+  }
+
+  private <V> ListenableFuture<State> stopServiceOnComplete(ListenableFuture <V> future, final Service service) {
+    return Futures.transform(future, new AsyncFunction<V, State>() {
+      @Override
+      public ListenableFuture<State> apply(V input) throws Exception {
+        return service.stop();
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java b/core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java
new file mode 100644
index 0000000..07d4c1d
--- /dev/null
+++ b/core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import org.apache.twill.internal.Arguments;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.io.InputSupplier;
+import com.google.common.io.OutputSupplier;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.Writer;
+import java.lang.reflect.Type;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public final class ArgumentsCodec implements JsonSerializer<Arguments>, JsonDeserializer<Arguments> {
+
+  private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Arguments.class, new ArgumentsCodec())
+                                                    .create();
+
+  public static void encode(Arguments arguments, OutputSupplier<? extends Writer> writerSupplier) throws IOException {
+    Writer writer = writerSupplier.getOutput();
+    try {
+      GSON.toJson(arguments, writer);
+    } finally {
+      writer.close();
+    }
+  }
+
+
+  public static Arguments decode(InputSupplier<? extends Reader> readerSupplier) throws IOException {
+    Reader reader = readerSupplier.getInput();
+    try {
+      return GSON.fromJson(reader, Arguments.class);
+    } finally {
+      reader.close();
+    }
+  }
+
+  @Override
+  public JsonElement serialize(Arguments src, Type typeOfSrc,
+                               JsonSerializationContext context) {
+    JsonObject json = new JsonObject();
+    json.add("arguments", context.serialize(src.getArguments()));
+    json.add("runnableArguments", context.serialize(src.getRunnableArguments().asMap()));
+
+    return json;
+  }
+
+  @Override
+  public Arguments deserialize(JsonElement json, Type typeOfT,
+                              JsonDeserializationContext context) throws JsonParseException {
+    JsonObject jsonObj = json.getAsJsonObject();
+    List<String> arguments = context.deserialize(jsonObj.get("arguments"), new TypeToken<List<String>>() {}.getType());
+    Map<String, Collection<String>> args = context.deserialize(jsonObj.get("runnableArguments"),
+                                                               new TypeToken<Map<String, Collection<String>>>(){
+                                                               }.getType());
+
+    ImmutableMultimap.Builder<String, String> builder = ImmutableMultimap.builder();
+    for (Map.Entry<String, Collection<String>> entry : args.entrySet()) {
+      builder.putAll(entry.getKey(), entry.getValue());
+    }
+    return new Arguments(arguments, builder.build());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/core/src/main/java/org/apache/twill/internal/json/JsonUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/JsonUtils.java b/core/src/main/java/org/apache/twill/internal/json/JsonUtils.java
new file mode 100644
index 0000000..9556ad8
--- /dev/null
+++ b/core/src/main/java/org/apache/twill/internal/json/JsonUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+/**
+ * Collections of helper functions for json codec.
+ */
+public final class JsonUtils {
+
+  private JsonUtils() {
+  }
+
+  /**
+   * Returns a String representation of the given property.
+   */
+  public static String getAsString(JsonObject json, String property) {
+    JsonElement jsonElement = json.get(property);
+    if (jsonElement.isJsonNull()) {
+      return null;
+    }
+    if (jsonElement.isJsonPrimitive()) {
+      return jsonElement.getAsString();
+    }
+    return jsonElement.toString();
+  }
+
+  /**
+   * Returns a long representation of the given property.
+   */
+  public static long getAsLong(JsonObject json, String property, long defaultValue) {
+    try {
+      return json.get(property).getAsLong();
+    } catch (Exception e) {
+      return defaultValue;
+    }
+  }
+
+  /**
+   * Returns a long representation of the given property.
+   */
+  public static int getAsInt(JsonObject json, String property, int defaultValue) {
+    try {
+      return json.get(property).getAsInt();
+    } catch (Exception e) {
+      return defaultValue;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/core/src/main/java/org/apache/twill/internal/json/LocalFileCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/LocalFileCodec.java b/core/src/main/java/org/apache/twill/internal/json/LocalFileCodec.java
new file mode 100644
index 0000000..680a36c
--- /dev/null
+++ b/core/src/main/java/org/apache/twill/internal/json/LocalFileCodec.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import org.apache.twill.api.LocalFile;
+import org.apache.twill.internal.DefaultLocalFile;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+import java.lang.reflect.Type;
+import java.net.URI;
+
+/**
+ *
+ */
+public final class LocalFileCodec implements JsonSerializer<LocalFile>, JsonDeserializer<LocalFile> {
+
+  @Override
+  public JsonElement serialize(LocalFile src, Type typeOfSrc, JsonSerializationContext context) {
+    JsonObject json = new JsonObject();
+
+    json.addProperty("name", src.getName());
+    json.addProperty("uri", src.getURI().toASCIIString());
+    json.addProperty("lastModified", src.getLastModified());
+    json.addProperty("size", src.getSize());
+    json.addProperty("archive", src.isArchive());
+    json.addProperty("pattern", src.getPattern());
+
+    return json;
+  }
+
+  @Override
+  public LocalFile deserialize(JsonElement json, Type typeOfT,
+                               JsonDeserializationContext context) throws JsonParseException {
+    JsonObject jsonObj = json.getAsJsonObject();
+
+    String name = jsonObj.get("name").getAsString();
+    URI uri = URI.create(jsonObj.get("uri").getAsString());
+    long lastModified = jsonObj.get("lastModified").getAsLong();
+    long size = jsonObj.get("size").getAsLong();
+    boolean archive = jsonObj.get("archive").getAsBoolean();
+    JsonElement pattern = jsonObj.get("pattern");
+
+    return new DefaultLocalFile(name, uri, lastModified, size,
+                                archive, (pattern == null || pattern.isJsonNull()) ? null : pattern.getAsString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/core/src/main/java/org/apache/twill/internal/json/ResourceReportAdapter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/ResourceReportAdapter.java b/core/src/main/java/org/apache/twill/internal/json/ResourceReportAdapter.java
new file mode 100644
index 0000000..e473fe7
--- /dev/null
+++ b/core/src/main/java/org/apache/twill/internal/json/ResourceReportAdapter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import org.apache.twill.api.ResourceReport;
+import org.apache.twill.api.TwillRunResources;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import java.io.Reader;
+import java.io.Writer;
+
+/**
+ * This class provides utility to help encode/decode {@link ResourceReport} to/from Json.
+ */
+public final class ResourceReportAdapter {
+
+  private final Gson gson;
+
+  public static ResourceReportAdapter create() {
+    return new ResourceReportAdapter();
+  }
+
+  private ResourceReportAdapter() {
+    gson = new GsonBuilder()
+              .serializeNulls()
+              .registerTypeAdapter(TwillRunResources.class, new TwillRunResourcesCodec())
+              .registerTypeAdapter(ResourceReport.class, new ResourceReportCodec())
+              .create();
+  }
+
+  public String toJson(ResourceReport report) {
+    return gson.toJson(report, ResourceReport.class);
+  }
+
+  public void toJson(ResourceReport report, Writer writer) {
+    gson.toJson(report, ResourceReport.class, writer);
+  }
+
+  public ResourceReport fromJson(String json) {
+    return gson.fromJson(json, ResourceReport.class);
+  }
+
+  public ResourceReport fromJson(Reader reader) {
+    return gson.fromJson(reader, ResourceReport.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/core/src/main/java/org/apache/twill/internal/json/ResourceReportCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/ResourceReportCodec.java b/core/src/main/java/org/apache/twill/internal/json/ResourceReportCodec.java
new file mode 100644
index 0000000..884d889
--- /dev/null
+++ b/core/src/main/java/org/apache/twill/internal/json/ResourceReportCodec.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import org.apache.twill.api.ResourceReport;
+import org.apache.twill.api.TwillRunResources;
+import org.apache.twill.internal.DefaultResourceReport;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+import com.google.gson.reflect.TypeToken;
+
+import java.lang.reflect.Type;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Codec for serializing and deserializing a {@link ResourceReport} object using json.
+ */
+public final class ResourceReportCodec implements JsonSerializer<ResourceReport>,
+                                           JsonDeserializer<ResourceReport> {
+
+  @Override
+  public JsonElement serialize(ResourceReport src, Type typeOfSrc, JsonSerializationContext context) {
+    JsonObject json = new JsonObject();
+
+    json.addProperty("appMasterId", src.getApplicationId());
+    json.add("appMasterResources", context.serialize(
+      src.getAppMasterResources(), new TypeToken<TwillRunResources>(){}.getType()));
+    json.add("runnableResources", context.serialize(
+      src.getResources(), new TypeToken<Map<String, Collection<TwillRunResources>>>(){}.getType()));
+
+    return json;
+  }
+
+  @Override
+  public ResourceReport deserialize(JsonElement json, Type typeOfT,
+                                           JsonDeserializationContext context) throws JsonParseException {
+    JsonObject jsonObj = json.getAsJsonObject();
+    String appMasterId = jsonObj.get("appMasterId").getAsString();
+    TwillRunResources masterResources = context.deserialize(
+      jsonObj.get("appMasterResources"), TwillRunResources.class);
+    Map<String, Collection<TwillRunResources>> resources = context.deserialize(
+      jsonObj.get("runnableResources"), new TypeToken<Map<String, Collection<TwillRunResources>>>(){}.getType());
+
+    return new DefaultResourceReport(appMasterId, masterResources, resources);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java b/core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
new file mode 100644
index 0000000..bea73c4
--- /dev/null
+++ b/core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.internal.DefaultResourceSpecification;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+import java.lang.reflect.Type;
+
+/**
+ *
+ */
+final class ResourceSpecificationCodec implements JsonSerializer<ResourceSpecification>,
+                                                  JsonDeserializer<ResourceSpecification> {
+
+  @Override
+  public JsonElement serialize(ResourceSpecification src, Type typeOfSrc, JsonSerializationContext context) {
+    JsonObject json = new JsonObject();
+
+    json.addProperty("cores", src.getVirtualCores());
+    json.addProperty("memorySize", src.getMemorySize());
+    json.addProperty("instances", src.getInstances());
+    json.addProperty("uplink", src.getUplink());
+    json.addProperty("downlink", src.getDownlink());
+
+    return json;
+  }
+
+  @Override
+  public ResourceSpecification deserialize(JsonElement json, Type typeOfT,
+                                           JsonDeserializationContext context) throws JsonParseException {
+    JsonObject jsonObj = json.getAsJsonObject();
+    return new DefaultResourceSpecification(jsonObj.get("cores").getAsInt(),
+                                            jsonObj.get("memorySize").getAsInt(),
+                                            jsonObj.get("instances").getAsInt(),
+                                            jsonObj.get("uplink").getAsInt(),
+                                            jsonObj.get("downlink").getAsInt());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/core/src/main/java/org/apache/twill/internal/json/RuntimeSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/RuntimeSpecificationCodec.java b/core/src/main/java/org/apache/twill/internal/json/RuntimeSpecificationCodec.java
new file mode 100644
index 0000000..867f4a8
--- /dev/null
+++ b/core/src/main/java/org/apache/twill/internal/json/RuntimeSpecificationCodec.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import org.apache.twill.api.LocalFile;
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.RuntimeSpecification;
+import org.apache.twill.api.TwillRunnableSpecification;
+import org.apache.twill.internal.DefaultRuntimeSpecification;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+import java.lang.reflect.Type;
+import java.util.Collection;
+
+/**
+ *
+ */
+final class RuntimeSpecificationCodec implements JsonSerializer<RuntimeSpecification>,
+                                                 JsonDeserializer<RuntimeSpecification> {
+
+  @Override
+  public JsonElement serialize(RuntimeSpecification src, Type typeOfSrc, JsonSerializationContext context) {
+    JsonObject json = new JsonObject();
+    json.addProperty("name", src.getName());
+    json.add("runnable", context.serialize(src.getRunnableSpecification(), TwillRunnableSpecification.class));
+    json.add("resources", context.serialize(src.getResourceSpecification(), ResourceSpecification.class));
+    json.add("files", context.serialize(src.getLocalFiles(), new TypeToken<Collection<LocalFile>>(){}.getType()));
+
+    return json;
+  }
+
+  @Override
+  public RuntimeSpecification deserialize(JsonElement json, Type typeOfT,
+                                          JsonDeserializationContext context) throws JsonParseException {
+    JsonObject jsonObj = json.getAsJsonObject();
+
+    String name = jsonObj.get("name").getAsString();
+    TwillRunnableSpecification runnable = context.deserialize(jsonObj.get("runnable"),
+                                                               TwillRunnableSpecification.class);
+    ResourceSpecification resources = context.deserialize(jsonObj.get("resources"),
+                                                          ResourceSpecification.class);
+    Collection<LocalFile> files = context.deserialize(jsonObj.get("files"),
+                                                      new TypeToken<Collection<LocalFile>>(){}.getType());
+
+    return new DefaultRuntimeSpecification(name, runnable, resources, files);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/core/src/main/java/org/apache/twill/internal/json/StackTraceElementCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/StackTraceElementCodec.java b/core/src/main/java/org/apache/twill/internal/json/StackTraceElementCodec.java
new file mode 100644
index 0000000..9a57b46
--- /dev/null
+++ b/core/src/main/java/org/apache/twill/internal/json/StackTraceElementCodec.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+import java.lang.reflect.Type;
+
+/**
+ *
+ */
+public final class StackTraceElementCodec implements JsonSerializer<StackTraceElement>,
+                                                     JsonDeserializer<StackTraceElement> {
+
+  @Override
+  public StackTraceElement deserialize(JsonElement json, Type typeOfT,
+                                       JsonDeserializationContext context) throws JsonParseException {
+    JsonObject jsonObj = json.getAsJsonObject();
+    return new StackTraceElement(JsonUtils.getAsString(jsonObj, "className"),
+                                 JsonUtils.getAsString(jsonObj, "method"),
+                                 JsonUtils.getAsString(jsonObj, "file"),
+                                 JsonUtils.getAsInt(jsonObj, "line", -1));
+  }
+
+  @Override
+  public JsonElement serialize(StackTraceElement src, Type typeOfSrc, JsonSerializationContext context) {
+    JsonObject jsonObj = new JsonObject();
+    jsonObj.addProperty("className", src.getClassName());
+    jsonObj.addProperty("method", src.getMethodName());
+    jsonObj.addProperty("file", src.getFileName());
+    jsonObj.addProperty("line", src.getLineNumber());
+
+    return jsonObj;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/core/src/main/java/org/apache/twill/internal/json/StateNodeCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/StateNodeCodec.java b/core/src/main/java/org/apache/twill/internal/json/StateNodeCodec.java
new file mode 100644
index 0000000..c1e9d1c
--- /dev/null
+++ b/core/src/main/java/org/apache/twill/internal/json/StateNodeCodec.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import org.apache.twill.api.ServiceController;
+import org.apache.twill.internal.state.StateNode;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+import java.lang.reflect.Type;
+
+/**
+ *
+ */
+public final class StateNodeCodec implements JsonSerializer<StateNode>, JsonDeserializer<StateNode> {
+
+  @Override
+  public StateNode deserialize(JsonElement json, Type typeOfT,
+                               JsonDeserializationContext context) throws JsonParseException {
+    JsonObject jsonObj = json.getAsJsonObject();
+    ServiceController.State state = ServiceController.State.valueOf(jsonObj.get("state").getAsString());
+    String errorMessage = jsonObj.has("errorMessage") ? jsonObj.get("errorMessage").getAsString() : null;
+
+    return new StateNode(state, errorMessage,
+                         context.<StackTraceElement[]>deserialize(jsonObj.get("stackTraces"), StackTraceElement[].class));
+  }
+
+  @Override
+  public JsonElement serialize(StateNode src, Type typeOfSrc, JsonSerializationContext context) {
+    JsonObject jsonObj = new JsonObject();
+    jsonObj.addProperty("state", src.getState().name());
+    if (src.getErrorMessage() != null) {
+      jsonObj.addProperty("errorMessage", src.getErrorMessage());
+    }
+    if (src.getStackTraces() != null) {
+      jsonObj.add("stackTraces", context.serialize(src.getStackTraces(), StackTraceElement[].class));
+    }
+    return jsonObj;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java b/core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
new file mode 100644
index 0000000..8951173
--- /dev/null
+++ b/core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import org.apache.twill.api.TwillRunResources;
+import org.apache.twill.internal.DefaultTwillRunResources;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+import java.lang.reflect.Type;
+
+/**
+ * Codec for serializing and deserializing a {@link org.apache.twill.api.TwillRunResources} object using json.
+ */
+public final class TwillRunResourcesCodec implements JsonSerializer<TwillRunResources>,
+                                              JsonDeserializer<TwillRunResources> {
+
+  @Override
+  public JsonElement serialize(TwillRunResources src, Type typeOfSrc, JsonSerializationContext context) {
+    JsonObject json = new JsonObject();
+
+    json.addProperty("containerId", src.getContainerId());
+    json.addProperty("instanceId", src.getInstanceId());
+    json.addProperty("host", src.getHost());
+    json.addProperty("memoryMB", src.getMemoryMB());
+    json.addProperty("virtualCores", src.getVirtualCores());
+
+    return json;
+  }
+
+  @Override
+  public TwillRunResources deserialize(JsonElement json, Type typeOfT,
+                                           JsonDeserializationContext context) throws JsonParseException {
+    JsonObject jsonObj = json.getAsJsonObject();
+    return new DefaultTwillRunResources(jsonObj.get("instanceId").getAsInt(),
+                                        jsonObj.get("containerId").getAsString(),
+                                        jsonObj.get("virtualCores").getAsInt(),
+                                        jsonObj.get("memoryMB").getAsInt(),
+                                        jsonObj.get("host").getAsString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/core/src/main/java/org/apache/twill/internal/json/TwillRunnableSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/TwillRunnableSpecificationCodec.java b/core/src/main/java/org/apache/twill/internal/json/TwillRunnableSpecificationCodec.java
new file mode 100644
index 0000000..f37c1e8
--- /dev/null
+++ b/core/src/main/java/org/apache/twill/internal/json/TwillRunnableSpecificationCodec.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import org.apache.twill.api.TwillRunnableSpecification;
+import org.apache.twill.internal.DefaultTwillRunnableSpecification;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+import java.lang.reflect.Type;
+import java.util.Map;
+
+/**
+ *
+ */
+final class TwillRunnableSpecificationCodec implements JsonSerializer<TwillRunnableSpecification>,
+                                                       JsonDeserializer<TwillRunnableSpecification> {
+
+  @Override
+  public JsonElement serialize(TwillRunnableSpecification src, Type typeOfSrc, JsonSerializationContext context) {
+    JsonObject json = new JsonObject();
+
+    json.addProperty("classname", src.getClassName());
+    json.addProperty("name", src.getName());
+    json.add("arguments", context.serialize(src.getConfigs(), new TypeToken<Map<String, String>>(){}.getType()));
+
+    return json;
+  }
+
+  @Override
+  public TwillRunnableSpecification deserialize(JsonElement json, Type typeOfT,
+                                                JsonDeserializationContext context) throws JsonParseException {
+    JsonObject jsonObj = json.getAsJsonObject();
+
+    String className = jsonObj.get("classname").getAsString();
+    String name = jsonObj.get("name").getAsString();
+    Map<String, String> arguments = context.deserialize(jsonObj.get("arguments"),
+                                                        new TypeToken<Map<String, String>>(){}.getType());
+
+    return new DefaultTwillRunnableSpecification(className, name, arguments);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java b/core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
new file mode 100644
index 0000000..67c15a2
--- /dev/null
+++ b/core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapter;
+import com.google.gson.TypeAdapterFactory;
+import com.google.gson.reflect.TypeToken;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonToken;
+import com.google.gson.stream.JsonWriter;
+import org.apache.twill.api.EventHandlerSpecification;
+import org.apache.twill.api.LocalFile;
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.RuntimeSpecification;
+import org.apache.twill.api.TwillRunnableSpecification;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.internal.json.TwillSpecificationCodec.EventHandlerSpecificationCoder;
+import org.apache.twill.internal.json.TwillSpecificationCodec.TwillSpecificationOrderCoder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Reader;
+import java.io.Writer;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.Map;
+
+/**
+ *
+ */
+public final class TwillSpecificationAdapter {
+
+  private final Gson gson;
+
+  public static TwillSpecificationAdapter create() {
+    return new TwillSpecificationAdapter();
+  }
+
+  private TwillSpecificationAdapter() {
+    gson = new GsonBuilder()
+              .serializeNulls()
+              .registerTypeAdapter(TwillSpecification.class, new TwillSpecificationCodec())
+              .registerTypeAdapter(TwillSpecification.Order.class, new TwillSpecificationOrderCoder())
+              .registerTypeAdapter(EventHandlerSpecification.class, new EventHandlerSpecificationCoder())
+              .registerTypeAdapter(RuntimeSpecification.class, new RuntimeSpecificationCodec())
+              .registerTypeAdapter(TwillRunnableSpecification.class, new TwillRunnableSpecificationCodec())
+              .registerTypeAdapter(ResourceSpecification.class, new ResourceSpecificationCodec())
+              .registerTypeAdapter(LocalFile.class, new LocalFileCodec())
+              .registerTypeAdapterFactory(new TwillSpecificationTypeAdapterFactory())
+              .create();
+  }
+
+  public String toJson(TwillSpecification spec) {
+    return gson.toJson(spec, TwillSpecification.class);
+  }
+
+  public void toJson(TwillSpecification spec, Writer writer) {
+    gson.toJson(spec, TwillSpecification.class, writer);
+  }
+
+  public void toJson(TwillSpecification spec, File file) throws IOException {
+    Writer writer = Files.newWriter(file, Charsets.UTF_8);
+    try {
+      toJson(spec, writer);
+    } finally {
+      writer.close();
+    }
+  }
+
+  public TwillSpecification fromJson(String json) {
+    return gson.fromJson(json, TwillSpecification.class);
+  }
+
+  public TwillSpecification fromJson(Reader reader) {
+    return gson.fromJson(reader, TwillSpecification.class);
+  }
+
+  public TwillSpecification fromJson(File file) throws IOException {
+    Reader reader = Files.newReader(file, Charsets.UTF_8);
+    try {
+      return fromJson(reader);
+    } finally {
+      reader.close();
+    }
+  }
+
+  // This is to get around gson ignoring of inner class
+  private static final class TwillSpecificationTypeAdapterFactory implements TypeAdapterFactory {
+
+    @Override
+    public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> type) {
+      Class<?> rawType = type.getRawType();
+      if (!Map.class.isAssignableFrom(rawType)) {
+        return null;
+      }
+      Type[] typeArgs = ((ParameterizedType) type.getType()).getActualTypeArguments();
+      TypeToken<?> keyType = TypeToken.get(typeArgs[0]);
+      TypeToken<?> valueType = TypeToken.get(typeArgs[1]);
+      if (keyType.getRawType() != String.class) {
+        return null;
+      }
+      return (TypeAdapter<T>) mapAdapter(gson, valueType);
+    }
+
+    private <V> TypeAdapter<Map<String, V>> mapAdapter(Gson gson, TypeToken<V> valueType) {
+      final TypeAdapter<V> valueAdapter = gson.getAdapter(valueType);
+
+      return new TypeAdapter<Map<String, V>>() {
+        @Override
+        public void write(JsonWriter writer, Map<String, V> map) throws IOException {
+          if (map == null) {
+            writer.nullValue();
+            return;
+          }
+          writer.beginObject();
+          for (Map.Entry<String, V> entry : map.entrySet()) {
+            writer.name(entry.getKey());
+            valueAdapter.write(writer, entry.getValue());
+          }
+          writer.endObject();
+        }
+
+        @Override
+        public Map<String, V> read(JsonReader reader) throws IOException {
+          if (reader.peek() == JsonToken.NULL) {
+            reader.nextNull();
+            return null;
+          }
+          if (reader.peek() != JsonToken.BEGIN_OBJECT) {
+            return null;
+          }
+          Map<String, V> map = Maps.newHashMap();
+          reader.beginObject();
+          while (reader.peek() != JsonToken.END_OBJECT) {
+            map.put(reader.nextName(), valueAdapter.read(reader));
+          }
+          reader.endObject();
+          return map;
+        }
+      };
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java b/core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
new file mode 100644
index 0000000..5d88350
--- /dev/null
+++ b/core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import org.apache.twill.api.EventHandlerSpecification;
+import org.apache.twill.api.RuntimeSpecification;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.internal.DefaultEventHandlerSpecification;
+import org.apache.twill.internal.DefaultTwillSpecification;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * An implementation of gson serializer/deserializer {@link org.apache.twill.api.TwillSpecification}.
+ */
+final class TwillSpecificationCodec implements JsonSerializer<TwillSpecification>,
+                                               JsonDeserializer<TwillSpecification> {
+
+  @Override
+  public JsonElement serialize(TwillSpecification src, Type typeOfSrc, JsonSerializationContext context) {
+    JsonObject json = new JsonObject();
+    json.addProperty("name", src.getName());
+    json.add("runnables", context.serialize(src.getRunnables(),
+                                            new TypeToken<Map<String, RuntimeSpecification>>(){}.getType()));
+    json.add("orders", context.serialize(src.getOrders(),
+                                         new TypeToken<List<TwillSpecification.Order>>(){}.getType()));
+    EventHandlerSpecification eventHandler = src.getEventHandler();
+    if (eventHandler != null) {
+      json.add("handler", context.serialize(eventHandler, EventHandlerSpecification.class));
+    }
+
+    return json;
+  }
+
+  @Override
+  public TwillSpecification deserialize(JsonElement json, Type typeOfT,
+                                        JsonDeserializationContext context) throws JsonParseException {
+    JsonObject jsonObj = json.getAsJsonObject();
+
+    String name = jsonObj.get("name").getAsString();
+    Map<String, RuntimeSpecification> runnables = context.deserialize(
+      jsonObj.get("runnables"), new TypeToken<Map<String, RuntimeSpecification>>(){}.getType());
+    List<TwillSpecification.Order> orders = context.deserialize(
+      jsonObj.get("orders"), new TypeToken<List<TwillSpecification.Order>>(){}.getType());
+
+    JsonElement handler = jsonObj.get("handler");
+    EventHandlerSpecification eventHandler = null;
+    if (handler != null && !handler.isJsonNull()) {
+      eventHandler = context.deserialize(handler, EventHandlerSpecification.class);
+    }
+
+    return new DefaultTwillSpecification(name, runnables, orders, eventHandler);
+  }
+
+  static final class TwillSpecificationOrderCoder implements JsonSerializer<TwillSpecification.Order>,
+                                                             JsonDeserializer<TwillSpecification.Order> {
+
+    @Override
+    public JsonElement serialize(TwillSpecification.Order src, Type typeOfSrc, JsonSerializationContext context) {
+      JsonObject json = new JsonObject();
+      json.add("names", context.serialize(src.getNames(), new TypeToken<Set<String>>(){}.getType()));
+      json.addProperty("type", src.getType().name());
+      return json;
+    }
+
+    @Override
+    public TwillSpecification.Order deserialize(JsonElement json, Type typeOfT,
+                                                JsonDeserializationContext context) throws JsonParseException {
+      JsonObject jsonObj = json.getAsJsonObject();
+
+      Set<String> names = context.deserialize(jsonObj.get("names"), new TypeToken<Set<String>>(){}.getType());
+      TwillSpecification.Order.Type type = TwillSpecification.Order.Type.valueOf(jsonObj.get("type").getAsString());
+
+      return new DefaultTwillSpecification.DefaultOrder(names, type);
+    }
+  }
+
+  static final class EventHandlerSpecificationCoder implements JsonSerializer<EventHandlerSpecification>,
+                                                               JsonDeserializer<EventHandlerSpecification> {
+
+    @Override
+    public JsonElement serialize(EventHandlerSpecification src, Type typeOfSrc, JsonSerializationContext context) {
+      JsonObject json = new JsonObject();
+      json.addProperty("classname", src.getClassName());
+      json.add("configs", context.serialize(src.getConfigs(), new TypeToken<Map<String, String>>(){}.getType()));
+      return json;
+    }
+
+    @Override
+    public EventHandlerSpecification deserialize(JsonElement json, Type typeOfT,
+                                                 JsonDeserializationContext context) throws JsonParseException {
+      JsonObject jsonObj = json.getAsJsonObject();
+      String className = jsonObj.get("classname").getAsString();
+      Map<String, String> configs = context.deserialize(jsonObj.get("configs"),
+                                                        new TypeToken<Map<String, String>>() {
+                                                        }.getType());
+
+      return new DefaultEventHandlerSpecification(className, configs);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java b/core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
new file mode 100644
index 0000000..14dfc70
--- /dev/null
+++ b/core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ *
+ */
+public final class EmbeddedKafkaServer extends AbstractIdleService {
+
+  private static final String KAFAK_CONFIG_CLASS = "kafka.server.KafkaConfig";
+  private static final String KAFKA_SERVER_CLASS = "kafka.server.KafkaServerStartable";
+
+  private final Object server;
+
+  public EmbeddedKafkaServer(File kafkaDir, Properties properties) {
+    this(createClassLoader(kafkaDir), properties);
+  }
+
+  public EmbeddedKafkaServer(ClassLoader classLoader, Properties properties) {
+    try {
+      Class<?> configClass = classLoader.loadClass(KAFAK_CONFIG_CLASS);
+      Object config = configClass.getConstructor(Properties.class).newInstance(properties);
+
+      Class<?> serverClass = classLoader.loadClass(KAFKA_SERVER_CLASS);
+      server = serverClass.getConstructor(configClass).newInstance(config);
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    server.getClass().getMethod("startup").invoke(server);
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    server.getClass().getMethod("shutdown").invoke(server);
+    server.getClass().getMethod("awaitShutdown").invoke(server);
+  }
+
+  private static ClassLoader createClassLoader(File kafkaDir) {
+    ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+    ClassLoader thisClassLoader = EmbeddedKafkaServer.class.getClassLoader();
+    ClassLoader parent = contextClassLoader != null
+                            ? contextClassLoader
+                            : thisClassLoader != null
+                                ? thisClassLoader : ClassLoader.getSystemClassLoader();
+
+    return new URLClassLoader(findJars(kafkaDir, Lists.<URL>newArrayList()).toArray(new URL[0]), parent);
+  }
+
+  private static List<URL> findJars(File dir, List<URL> urls) {
+    try {
+      for (File file : dir.listFiles()) {
+        if (file.isDirectory()) {
+          findJars(file, urls);
+        } else if (file.getName().endsWith(".jar")) {
+          urls.add(file.toURI().toURL());
+        }
+      }
+      return urls;
+    } catch (MalformedURLException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/core/src/main/java/org/apache/twill/internal/kafka/client/AbstractCompressedMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/kafka/client/AbstractCompressedMessageSetEncoder.java b/core/src/main/java/org/apache/twill/internal/kafka/client/AbstractCompressedMessageSetEncoder.java
new file mode 100644
index 0000000..a9c3381
--- /dev/null
+++ b/core/src/main/java/org/apache/twill/internal/kafka/client/AbstractCompressedMessageSetEncoder.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import com.google.common.base.Throwables;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A base implementation of {@link MessageSetEncoder} that do message compression.
+ */
+abstract class AbstractCompressedMessageSetEncoder extends AbstractMessageSetEncoder {
+
+  private final Compression compression;
+  private ChannelBufferOutputStream os;
+  private OutputStream compressedOutput;
+
+
+  protected AbstractCompressedMessageSetEncoder(Compression compression) {
+    this.compression = compression;
+    try {
+      this.os = new ChannelBufferOutputStream(ChannelBuffers.dynamicBuffer());
+      this.compressedOutput = createCompressedStream(os);
+    } catch (IOException e) {
+      // Should never happen
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public final MessageSetEncoder add(ChannelBuffer payload) {
+    try {
+      ChannelBuffer encoded = encodePayload(payload);
+      encoded.readBytes(compressedOutput, encoded.readableBytes());
+    } catch (IOException e) {
+      throw Throwables.propagate(e);
+    }
+    return this;
+
+  }
+
+  @Override
+  public final ChannelBuffer finish() {
+    try {
+      compressedOutput.close();
+      ChannelBuffer buf = prefixLength(encodePayload(os.buffer(), compression));
+      compressedOutput = createCompressedStream(os);
+      os.buffer().clear();
+
+      return buf;
+
+    } catch (IOException e) {
+      throw Throwables.propagate(e);
+    }
+
+  }
+
+  protected abstract OutputStream createCompressedStream(OutputStream os) throws IOException;
+}