You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/13 23:27:41 UTC

[25/28] [TWILL-14] Bootstrapping for the site generation. Reorganization of the source tree happens:

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/ProcessLauncher.java b/core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
deleted file mode 100644
index e48a226..0000000
--- a/core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.api.LocalFile;
-
-import java.util.Map;
-
-/**
- * Class for launching container process.
- *
- * @param <T> Type of the object that contains information about the container that the process is going to launch.
- */
-public interface ProcessLauncher<T> {
-
-  /**
-   * Returns information about the container that this launch would launch process in.
-   */
-  T getContainerInfo();
-
-  /**
-   * Returns a preparer with the given default set of environments, resources and credentials.
-   */
-  <C> PrepareLaunchContext prepareLaunch(Map<String, String> environments,
-                                         Iterable<LocalFile> resources, C credentials);
-
-  /**
-   * For setting up the launcher.
-   */
-  interface PrepareLaunchContext {
-
-    ResourcesAdder withResources();
-
-    AfterResources noResources();
-
-    interface ResourcesAdder {
-      MoreResources add(LocalFile localFile);
-    }
-
-    interface AfterResources {
-      EnvironmentAdder withEnvironment();
-
-      AfterEnvironment noEnvironment();
-    }
-
-    interface EnvironmentAdder {
-      <V> MoreEnvironment add(String key, V value);
-    }
-
-    interface MoreEnvironment extends EnvironmentAdder, AfterEnvironment {
-    }
-
-    interface AfterEnvironment {
-      CommandAdder withCommands();
-    }
-
-    interface MoreResources extends ResourcesAdder, AfterResources { }
-
-    interface CommandAdder {
-      StdOutSetter add(String cmd, String...args);
-    }
-
-    interface StdOutSetter {
-      StdErrSetter redirectOutput(String stdout);
-
-      StdErrSetter noOutput();
-    }
-
-    interface StdErrSetter {
-      MoreCommand redirectError(String stderr);
-
-      MoreCommand noError();
-    }
-
-    interface MoreCommand extends CommandAdder {
-      <R> ProcessController<R> launch();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/SingleRunnableApplication.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/SingleRunnableApplication.java b/core/src/main/java/org/apache/twill/internal/SingleRunnableApplication.java
deleted file mode 100644
index a52afe1..0000000
--- a/core/src/main/java/org/apache/twill/internal/SingleRunnableApplication.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.api.ResourceSpecification;
-import org.apache.twill.api.TwillApplication;
-import org.apache.twill.api.TwillRunnable;
-import org.apache.twill.api.TwillRunnableSpecification;
-import org.apache.twill.api.TwillSpecification;
-
-/**
- * A simple {@link org.apache.twill.api.TwillApplication} that contains only one {@link org.apache.twill.api.TwillRunnable}.
- */
-public class SingleRunnableApplication implements TwillApplication {
-
-  private final TwillRunnable runnable;
-  private final ResourceSpecification resourceSpec;
-
-  public SingleRunnableApplication(TwillRunnable runnable, ResourceSpecification resourceSpec) {
-    this.runnable = runnable;
-    this.resourceSpec = resourceSpec;
-  }
-
-  @Override
-  public TwillSpecification configure() {
-    TwillRunnableSpecification runnableSpec = runnable.configure();
-    return TwillSpecification.Builder.with()
-      .setName(runnableSpec.getName())
-      .withRunnable().add(runnableSpec.getName(), runnable, resourceSpec)
-      .noLocalFiles()
-      .anyOrder()
-      .build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/TwillContainerController.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/TwillContainerController.java b/core/src/main/java/org/apache/twill/internal/TwillContainerController.java
deleted file mode 100644
index 8b090bd..0000000
--- a/core/src/main/java/org/apache/twill/internal/TwillContainerController.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.api.ServiceController;
-import org.apache.twill.internal.state.Message;
-import com.google.common.util.concurrent.ListenableFuture;
-
-/**
- * A {@link ServiceController} that allows sending a message directly. Internal use only.
- */
-public interface TwillContainerController extends ServiceController {
-
-  ListenableFuture<Message> sendMessage(Message message);
-
-  /**
-   * Calls to indicated that the container that this controller is associated with is completed.
-   * Any resources it hold will be releases and all pending futures will be cancelled.
-   */
-  void completed(int exitStatus);
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
deleted file mode 100644
index 63f8732..0000000
--- a/core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.api.LocalFile;
-import org.apache.twill.api.RunId;
-import org.apache.twill.api.RuntimeSpecification;
-import org.apache.twill.filesystem.Location;
-import org.apache.twill.internal.state.Message;
-import org.apache.twill.internal.state.StateNode;
-import org.apache.twill.launcher.TwillLauncher;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.ZKClient;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * This class helps launching a container.
- */
-public final class TwillContainerLauncher {
-
-  private static final Logger LOG = LoggerFactory.getLogger(TwillContainerLauncher.class);
-
-  private static final double HEAP_MIN_RATIO = 0.7d;
-
-  private final RuntimeSpecification runtimeSpec;
-  private final ProcessLauncher.PrepareLaunchContext launchContext;
-  private final ZKClient zkClient;
-  private final int instanceCount;
-  private final String jvmOpts;
-  private final int reservedMemory;
-  private final Location secureStoreLocation;
-
-  public TwillContainerLauncher(RuntimeSpecification runtimeSpec, ProcessLauncher.PrepareLaunchContext launchContext,
-                                ZKClient zkClient, int instanceCount, String jvmOpts, int reservedMemory,
-                                Location secureStoreLocation) {
-    this.runtimeSpec = runtimeSpec;
-    this.launchContext = launchContext;
-    this.zkClient = zkClient;
-    this.instanceCount = instanceCount;
-    this.jvmOpts = jvmOpts;
-    this.reservedMemory = reservedMemory;
-    this.secureStoreLocation = secureStoreLocation;
-  }
-
-  public TwillContainerController start(RunId runId, int instanceId, Class<?> mainClass, String classPath) {
-    ProcessLauncher.PrepareLaunchContext.AfterResources afterResources = null;
-    ProcessLauncher.PrepareLaunchContext.ResourcesAdder resourcesAdder = null;
-
-    // Adds all file to be localized to container
-    if (!runtimeSpec.getLocalFiles().isEmpty()) {
-      resourcesAdder = launchContext.withResources();
-
-      for (LocalFile localFile : runtimeSpec.getLocalFiles()) {
-        afterResources = resourcesAdder.add(localFile);
-      }
-    }
-
-    // Optionally localize secure store.
-    try {
-      if (secureStoreLocation != null && secureStoreLocation.exists()) {
-        if (resourcesAdder == null) {
-          resourcesAdder = launchContext.withResources();
-        }
-        afterResources = resourcesAdder.add(new DefaultLocalFile(Constants.Files.CREDENTIALS,
-                                                                 secureStoreLocation.toURI(),
-                                                                 secureStoreLocation.lastModified(),
-                                                                 secureStoreLocation.length(), false, null));
-      }
-    } catch (IOException e) {
-      LOG.warn("Failed to launch container with secure store {}.", secureStoreLocation.toURI());
-    }
-
-    if (afterResources == null) {
-      afterResources = launchContext.noResources();
-    }
-
-    int memory = runtimeSpec.getResourceSpecification().getMemorySize();
-    if (((double) (memory - reservedMemory) / memory) >= HEAP_MIN_RATIO) {
-      // Reduce -Xmx by the reserved memory size.
-      memory = runtimeSpec.getResourceSpecification().getMemorySize() - reservedMemory;
-    } else {
-      // If it is a small VM, just discount it by the min ratio.
-      memory = (int) Math.ceil(memory * HEAP_MIN_RATIO);
-    }
-
-    // Currently no reporting is supported for runnable containers
-    ProcessController<Void> processController = afterResources
-      .withEnvironment()
-      .add(EnvKeys.TWILL_RUN_ID, runId.getId())
-      .add(EnvKeys.TWILL_RUNNABLE_NAME, runtimeSpec.getName())
-      .add(EnvKeys.TWILL_INSTANCE_ID, Integer.toString(instanceId))
-      .add(EnvKeys.TWILL_INSTANCE_COUNT, Integer.toString(instanceCount))
-      .withCommands()
-      .add("java",
-           "-Djava.io.tmpdir=tmp",
-           "-Dyarn.container=$" + EnvKeys.YARN_CONTAINER_ID,
-           "-Dtwill.runnable=$" + EnvKeys.TWILL_APP_NAME + ".$" + EnvKeys.TWILL_RUNNABLE_NAME,
-           "-cp", Constants.Files.LAUNCHER_JAR + ":" + classPath,
-           "-Xmx" + memory + "m",
-           jvmOpts,
-           TwillLauncher.class.getName(),
-           Constants.Files.CONTAINER_JAR,
-           mainClass.getName(),
-           Boolean.TRUE.toString())
-      .redirectOutput(Constants.STDOUT).redirectError(Constants.STDERR)
-      .launch();
-
-    TwillContainerControllerImpl controller = new TwillContainerControllerImpl(zkClient, runId, processController);
-    controller.start();
-    return controller;
-  }
-
-  private static final class TwillContainerControllerImpl extends AbstractZKServiceController
-                                                          implements TwillContainerController {
-
-    private final ProcessController<Void> processController;
-
-    protected TwillContainerControllerImpl(ZKClient zkClient, RunId runId,
-                                           ProcessController<Void> processController) {
-      super(runId, zkClient);
-      this.processController = processController;
-    }
-
-    @Override
-    protected void doStartUp() {
-      // No-op
-    }
-
-    @Override
-    protected void doShutDown() {
-      // No-op
-    }
-
-    @Override
-    protected void instanceNodeUpdated(NodeData nodeData) {
-      // No-op
-    }
-
-    @Override
-    protected void stateNodeUpdated(StateNode stateNode) {
-      // No-op
-    }
-
-    @Override
-    public ListenableFuture<Message> sendMessage(Message message) {
-      return sendMessage(message, message);
-    }
-
-    @Override
-    public synchronized void completed(int exitStatus) {
-      if (exitStatus != 0) {  // If a container terminated with exit code != 0, treat it as error
-//        fireStateChange(new StateNode(State.FAILED, new StackTraceElement[0]));
-      }
-      forceShutDown();
-    }
-
-    @Override
-    public void kill() {
-      processController.cancel();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/ZKMessages.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/ZKMessages.java b/core/src/main/java/org/apache/twill/internal/ZKMessages.java
deleted file mode 100644
index 03575dd..0000000
--- a/core/src/main/java/org/apache/twill/internal/ZKMessages.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.internal.state.Message;
-import org.apache.twill.internal.state.MessageCodec;
-import org.apache.twill.zookeeper.ZKClient;
-import org.apache.twill.zookeeper.ZKOperations;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import org.apache.zookeeper.CreateMode;
-
-/**
- *
- */
-public final class ZKMessages {
-
-  /**
-   * Creates a message node in zookeeper. The message node created is a PERSISTENT_SEQUENTIAL node.
-   *
-   * @param zkClient The ZooKeeper client for interacting with ZooKeeper.
-   * @param messagePathPrefix ZooKeeper path prefix for the message node.
-   * @param message The {@link Message} object for the content of the message node.
-   * @param completionResult Object to set to the result future when the message is processed.
-   * @param <V> Type of the completion result.
-   * @return A {@link ListenableFuture} that will be completed when the message is consumed, which indicated
-   *         by deletion of the node. If there is exception during the process, it will be reflected
-   *         to the future returned.
-   */
-  public static <V> ListenableFuture<V> sendMessage(final ZKClient zkClient, String messagePathPrefix,
-                                                    Message message, final V completionResult) {
-    SettableFuture<V> result = SettableFuture.create();
-    sendMessage(zkClient, messagePathPrefix, message, result, completionResult);
-    return result;
-  }
-
-  /**
-   * Creates a message node in zookeeper. The message node created is a PERSISTENT_SEQUENTIAL node.
-   *
-   * @param zkClient The ZooKeeper client for interacting with ZooKeeper.
-   * @param messagePathPrefix ZooKeeper path prefix for the message node.
-   * @param message The {@link Message} object for the content of the message node.
-   * @param completion A {@link SettableFuture} to reflect the result of message process completion.
-   * @param completionResult Object to set to the result future when the message is processed.
-   * @param <V> Type of the completion result.
-   */
-  public static <V> void sendMessage(final ZKClient zkClient, String messagePathPrefix, Message message,
-                                     final SettableFuture<V> completion, final V completionResult) {
-
-    // Creates a message and watch for its deletion for completion.
-    Futures.addCallback(zkClient.create(messagePathPrefix, MessageCodec.encode(message),
-                                        CreateMode.PERSISTENT_SEQUENTIAL), new FutureCallback<String>() {
-      @Override
-      public void onSuccess(String path) {
-        Futures.addCallback(ZKOperations.watchDeleted(zkClient, path), new FutureCallback<String>() {
-          @Override
-          public void onSuccess(String result) {
-            completion.set(completionResult);
-          }
-
-          @Override
-          public void onFailure(Throwable t) {
-            completion.setException(t);
-          }
-        });
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        completion.setException(t);
-      }
-    });
-  }
-
-  private ZKMessages() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/ZKServiceDecorator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/ZKServiceDecorator.java b/core/src/main/java/org/apache/twill/internal/ZKServiceDecorator.java
deleted file mode 100644
index 7313d33..0000000
--- a/core/src/main/java/org/apache/twill/internal/ZKServiceDecorator.java
+++ /dev/null
@@ -1,482 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.api.RunId;
-import org.apache.twill.api.ServiceController;
-import org.apache.twill.common.ServiceListenerAdapter;
-import org.apache.twill.common.Threads;
-import org.apache.twill.internal.json.StackTraceElementCodec;
-import org.apache.twill.internal.json.StateNodeCodec;
-import org.apache.twill.internal.state.Message;
-import org.apache.twill.internal.state.MessageCallback;
-import org.apache.twill.internal.state.MessageCodec;
-import org.apache.twill.internal.state.StateNode;
-import org.apache.twill.internal.state.SystemMessages;
-import org.apache.twill.zookeeper.NodeChildren;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.OperationFuture;
-import org.apache.twill.zookeeper.ZKClient;
-import org.apache.twill.zookeeper.ZKOperations;
-import com.google.common.base.Charsets;
-import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.AbstractService;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.Service;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * A {@link Service} decorator that wrap another {@link Service} with the service states reflected
- * to ZooKeeper.
- */
-public final class ZKServiceDecorator extends AbstractService {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ZKServiceDecorator.class);
-
-  private final ZKClient zkClient;
-  private final RunId id;
-  private final Supplier<? extends JsonElement> liveNodeData;
-  private final Service decoratedService;
-  private final MessageCallbackCaller messageCallback;
-  private ExecutorService callbackExecutor;
-
-
-  public ZKServiceDecorator(ZKClient zkClient, RunId id, Supplier<? extends JsonElement> liveNodeData,
-                            Service decoratedService) {
-    this(zkClient, id, liveNodeData, decoratedService, null);
-  }
-
-  /**
-   * Creates a ZKServiceDecorator.
-   * @param zkClient ZooKeeper client
-   * @param id The run id of the service
-   * @param liveNodeData A supplier for providing information writing to live node.
-   * @param decoratedService The Service for monitoring state changes
-   * @param finalizer An optional Runnable to run when this decorator terminated.
-   */
-  public ZKServiceDecorator(ZKClient zkClient, RunId id, Supplier <? extends JsonElement> liveNodeData,
-                            Service decoratedService, @Nullable Runnable finalizer) {
-    this.zkClient = zkClient;
-    this.id = id;
-    this.liveNodeData = liveNodeData;
-    this.decoratedService = decoratedService;
-    if (decoratedService instanceof MessageCallback) {
-      this.messageCallback = new MessageCallbackCaller((MessageCallback) decoratedService, zkClient);
-    } else {
-      this.messageCallback = new MessageCallbackCaller(zkClient);
-    }
-    if (finalizer != null) {
-      addFinalizer(finalizer);
-    }
-  }
-
-  /**
-   * Deletes the given ZK path recursively and create the path again.
-   */
-  private ListenableFuture<String> deleteAndCreate(final String path, final byte[] data, final CreateMode mode) {
-    return Futures.transform(ZKOperations.ignoreError(ZKOperations.recursiveDelete(zkClient, path),
-                                                      KeeperException.NoNodeException.class, null),
-                             new AsyncFunction<String, String>() {
-      @Override
-      public ListenableFuture<String> apply(String input) throws Exception {
-        return zkClient.create(path, data, mode);
-      }
-    }, Threads.SAME_THREAD_EXECUTOR);
-  }
-
-  @Override
-  protected void doStart() {
-    callbackExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("message-callback"));
-    Futures.addCallback(createLiveNode(), new FutureCallback<String>() {
-      @Override
-      public void onSuccess(String result) {
-        // Create nodes for states and messaging
-        StateNode stateNode = new StateNode(ServiceController.State.STARTING);
-
-        final ListenableFuture<List<String>> createFuture = Futures.allAsList(
-          deleteAndCreate(getZKPath("messages"), null, CreateMode.PERSISTENT),
-          deleteAndCreate(getZKPath("state"), encodeStateNode(stateNode), CreateMode.PERSISTENT)
-        );
-
-        createFuture.addListener(new Runnable() {
-          @Override
-          public void run() {
-            try {
-              createFuture.get();
-              // Starts the decorated service
-              decoratedService.addListener(createListener(), Threads.SAME_THREAD_EXECUTOR);
-              decoratedService.start();
-            } catch (Exception e) {
-              notifyFailed(e);
-            }
-          }
-        }, Threads.SAME_THREAD_EXECUTOR);
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        notifyFailed(t);
-      }
-    });
-  }
-
-  @Override
-  protected void doStop() {
-    // Stops the decorated service
-    decoratedService.stop();
-    callbackExecutor.shutdownNow();
-  }
-
-  private void addFinalizer(final Runnable finalizer) {
-    addListener(new ServiceListenerAdapter() {
-      @Override
-      public void terminated(State from) {
-        try {
-          finalizer.run();
-        } catch (Throwable t) {
-          LOG.warn("Exception when running finalizer.", t);
-        }
-      }
-
-      @Override
-      public void failed(State from, Throwable failure) {
-        try {
-          finalizer.run();
-        } catch (Throwable t) {
-          LOG.warn("Exception when running finalizer.", t);
-        }
-      }
-    }, Threads.SAME_THREAD_EXECUTOR);
-  }
-
-  private OperationFuture<String> createLiveNode() {
-    String liveNode = getLiveNodePath();
-    LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNode);
-
-    JsonObject content = new JsonObject();
-    content.add("data", liveNodeData.get());
-    return ZKOperations.ignoreError(zkClient.create(liveNode, encodeJson(content), CreateMode.EPHEMERAL),
-                                    KeeperException.NodeExistsException.class, liveNode);
-  }
-
-  private OperationFuture<String> removeLiveNode() {
-    String liveNode = getLiveNodePath();
-    LOG.info("Remove live node {}{}", zkClient.getConnectString(), liveNode);
-    return ZKOperations.ignoreError(zkClient.delete(liveNode), KeeperException.NoNodeException.class, liveNode);
-  }
-
-  private OperationFuture<String> removeServiceNode() {
-    String serviceNode = String.format("/%s", id.getId());
-    LOG.info("Remove service node {}{}", zkClient.getConnectString(), serviceNode);
-    return ZKOperations.recursiveDelete(zkClient, serviceNode);
-  }
-
-  private void watchMessages() {
-    final String messagesPath = getZKPath("messages");
-    Futures.addCallback(zkClient.getChildren(messagesPath, new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        // TODO: Do we need to deal with other type of events?
-        if (event.getType() == Event.EventType.NodeChildrenChanged && decoratedService.isRunning()) {
-          watchMessages();
-        }
-      }
-    }), new FutureCallback<NodeChildren>() {
-      @Override
-      public void onSuccess(NodeChildren result) {
-        // Sort by the name, which is the messageId. Assumption is that message ids is ordered by time.
-        List<String> messages = Lists.newArrayList(result.getChildren());
-        Collections.sort(messages);
-        for (String messageId : messages) {
-          processMessage(messagesPath + "/" + messageId, messageId);
-        }
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        // TODO: what could be done besides just logging?
-        LOG.error("Failed to watch messages.", t);
-      }
-    });
-  }
-
-  private void processMessage(final String path, final String messageId) {
-    Futures.addCallback(zkClient.getData(path), new FutureCallback<NodeData>() {
-      @Override
-      public void onSuccess(NodeData result) {
-        Message message = MessageCodec.decode(result.getData());
-        if (message == null) {
-          LOG.error("Failed to decode message for " + messageId + " in " + path);
-          listenFailure(zkClient.delete(path, result.getStat().getVersion()));
-          return;
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Message received from " + path + ": " + new String(MessageCodec.encode(message), Charsets.UTF_8));
-        }
-        if (handleStopMessage(message, getDeleteSupplier(path, result.getStat().getVersion()))) {
-          return;
-        }
-        messageCallback.onReceived(callbackExecutor, path, result.getStat().getVersion(), messageId, message);
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        LOG.error("Failed to fetch message content.", t);
-      }
-    });
-  }
-
-  private <V> boolean handleStopMessage(Message message, final Supplier<OperationFuture<V>> postHandleSupplier) {
-    if (message.getType() == Message.Type.SYSTEM && SystemMessages.STOP_COMMAND.equals(message.getCommand())) {
-      callbackExecutor.execute(new Runnable() {
-        @Override
-        public void run() {
-          decoratedService.stop().addListener(new Runnable() {
-
-            @Override
-            public void run() {
-              stopServiceOnComplete(postHandleSupplier.get(), ZKServiceDecorator.this);
-            }
-          }, MoreExecutors.sameThreadExecutor());
-        }
-      });
-      return true;
-    }
-    return false;
-  }
-
-
-  private Supplier<OperationFuture<String>> getDeleteSupplier(final String path, final int version) {
-    return new Supplier<OperationFuture<String>>() {
-      @Override
-      public OperationFuture<String> get() {
-        return zkClient.delete(path, version);
-      }
-    };
-  }
-
-  private Listener createListener() {
-    return new DecoratedServiceListener();
-  }
-
-  private <V> byte[] encode(V data, Class<? extends V> clz) {
-    return new GsonBuilder().registerTypeAdapter(StateNode.class, new StateNodeCodec())
-                            .registerTypeAdapter(StackTraceElement.class, new StackTraceElementCodec())
-                            .create()
-      .toJson(data, clz).getBytes(Charsets.UTF_8);
-  }
-
-  private byte[] encodeStateNode(StateNode stateNode) {
-    return encode(stateNode, StateNode.class);
-  }
-
-  private <V extends JsonElement> byte[] encodeJson(V json) {
-    return new Gson().toJson(json).getBytes(Charsets.UTF_8);
-  }
-
-  private String getZKPath(String path) {
-    return String.format("/%s/%s", id, path);
-  }
-
-  private String getLiveNodePath() {
-    return "/instances/" + id;
-  }
-
-  private static <V> OperationFuture<V> listenFailure(final OperationFuture<V> operationFuture) {
-    operationFuture.addListener(new Runnable() {
-
-      @Override
-      public void run() {
-        try {
-          if (!operationFuture.isCancelled()) {
-            operationFuture.get();
-          }
-        } catch (Exception e) {
-          // TODO: what could be done besides just logging?
-          LOG.error("Operation execution failed for " + operationFuture.getRequestPath(), e);
-        }
-      }
-    }, Threads.SAME_THREAD_EXECUTOR);
-    return operationFuture;
-  }
-
-  private static final class MessageCallbackCaller {
-    private final MessageCallback callback;
-    private final ZKClient zkClient;
-
-    private MessageCallbackCaller(ZKClient zkClient) {
-      this(null, zkClient);
-    }
-
-    private MessageCallbackCaller(MessageCallback callback, ZKClient zkClient) {
-      this.callback = callback;
-      this.zkClient = zkClient;
-    }
-
-    public void onReceived(Executor executor, final String path,
-                           final int version, final String id, final Message message) {
-      if (callback == null) {
-        // Simply delete the message
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Ignoring incoming message from " + path + ": " + message);
-        }
-        listenFailure(zkClient.delete(path, version));
-        return;
-      }
-
-      executor.execute(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            // Message process is synchronous for now. Making it async needs more thoughts about race conditions.
-            // The executor is the callbackExecutor which is a single thread executor.
-            callback.onReceived(id, message).get();
-          } catch (Throwable t) {
-            LOG.error("Exception when processing message: {}, {}, {}", id, message, path, t);
-          } finally {
-            listenFailure(zkClient.delete(path, version));
-          }
-        }
-      });
-    }
-  }
-
-  private final class DecoratedServiceListener implements Listener {
-    private volatile boolean zkFailure = false;
-
-    @Override
-    public void starting() {
-      LOG.info("Starting: " + id);
-      saveState(ServiceController.State.STARTING);
-    }
-
-    @Override
-    public void running() {
-      LOG.info("Running: " + id);
-      notifyStarted();
-      watchMessages();
-      saveState(ServiceController.State.RUNNING);
-    }
-
-    @Override
-    public void stopping(State from) {
-      LOG.info("Stopping: " + id);
-      saveState(ServiceController.State.STOPPING);
-    }
-
-    @Override
-    public void terminated(State from) {
-      LOG.info("Terminated: " + from + " " + id);
-      if (zkFailure) {
-        return;
-      }
-
-      ImmutableList<OperationFuture<String>> futures = ImmutableList.of(removeLiveNode(), removeServiceNode());
-      final ListenableFuture<List<String>> future = Futures.allAsList(futures);
-      Futures.successfulAsList(futures).addListener(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            future.get();
-            LOG.info("Service and state node removed");
-            notifyStopped();
-          } catch (Exception e) {
-            LOG.warn("Failed to remove ZK nodes.", e);
-            notifyFailed(e);
-          }
-        }
-      }, Threads.SAME_THREAD_EXECUTOR);
-    }
-
-    @Override
-    public void failed(State from, final Throwable failure) {
-      LOG.info("Failed: {} {}.", from, id, failure);
-      if (zkFailure) {
-        return;
-      }
-
-      ImmutableList<OperationFuture<String>> futures = ImmutableList.of(removeLiveNode(), removeServiceNode());
-      Futures.successfulAsList(futures).addListener(new Runnable() {
-        @Override
-        public void run() {
-          LOG.info("Service and state node removed");
-          notifyFailed(failure);
-        }
-      }, Threads.SAME_THREAD_EXECUTOR);
-    }
-
-    private void saveState(ServiceController.State state) {
-      if (zkFailure) {
-        return;
-      }
-      StateNode stateNode = new StateNode(state);
-      stopOnFailure(zkClient.setData(getZKPath("state"), encodeStateNode(stateNode)));
-    }
-
-    private <V> void stopOnFailure(final OperationFuture<V> future) {
-      future.addListener(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            future.get();
-          } catch (final Exception e) {
-            LOG.error("ZK operation failed", e);
-            zkFailure = true;
-            decoratedService.stop().addListener(new Runnable() {
-              @Override
-              public void run() {
-                notifyFailed(e);
-              }
-            }, Threads.SAME_THREAD_EXECUTOR);
-          }
-        }
-      }, Threads.SAME_THREAD_EXECUTOR);
-    }
-  }
-
-  private <V> ListenableFuture<State> stopServiceOnComplete(ListenableFuture <V> future, final Service service) {
-    return Futures.transform(future, new AsyncFunction<V, State>() {
-      @Override
-      public ListenableFuture<State> apply(V input) throws Exception {
-        return service.stop();
-      }
-    });
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java b/core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java
deleted file mode 100644
index 07d4c1d..0000000
--- a/core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import org.apache.twill.internal.Arguments;
-import com.google.common.collect.ImmutableMultimap;
-import com.google.common.io.InputSupplier;
-import com.google.common.io.OutputSupplier;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-
-import java.io.IOException;
-import java.io.Reader;
-import java.io.Writer;
-import java.lang.reflect.Type;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-/**
- *
- */
-public final class ArgumentsCodec implements JsonSerializer<Arguments>, JsonDeserializer<Arguments> {
-
-  private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Arguments.class, new ArgumentsCodec())
-                                                    .create();
-
-  public static void encode(Arguments arguments, OutputSupplier<? extends Writer> writerSupplier) throws IOException {
-    Writer writer = writerSupplier.getOutput();
-    try {
-      GSON.toJson(arguments, writer);
-    } finally {
-      writer.close();
-    }
-  }
-
-
-  public static Arguments decode(InputSupplier<? extends Reader> readerSupplier) throws IOException {
-    Reader reader = readerSupplier.getInput();
-    try {
-      return GSON.fromJson(reader, Arguments.class);
-    } finally {
-      reader.close();
-    }
-  }
-
-  @Override
-  public JsonElement serialize(Arguments src, Type typeOfSrc,
-                               JsonSerializationContext context) {
-    JsonObject json = new JsonObject();
-    json.add("arguments", context.serialize(src.getArguments()));
-    json.add("runnableArguments", context.serialize(src.getRunnableArguments().asMap()));
-
-    return json;
-  }
-
-  @Override
-  public Arguments deserialize(JsonElement json, Type typeOfT,
-                              JsonDeserializationContext context) throws JsonParseException {
-    JsonObject jsonObj = json.getAsJsonObject();
-    List<String> arguments = context.deserialize(jsonObj.get("arguments"), new TypeToken<List<String>>() {}.getType());
-    Map<String, Collection<String>> args = context.deserialize(jsonObj.get("runnableArguments"),
-                                                               new TypeToken<Map<String, Collection<String>>>(){
-                                                               }.getType());
-
-    ImmutableMultimap.Builder<String, String> builder = ImmutableMultimap.builder();
-    for (Map.Entry<String, Collection<String>> entry : args.entrySet()) {
-      builder.putAll(entry.getKey(), entry.getValue());
-    }
-    return new Arguments(arguments, builder.build());
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/json/JsonUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/JsonUtils.java b/core/src/main/java/org/apache/twill/internal/json/JsonUtils.java
deleted file mode 100644
index 9556ad8..0000000
--- a/core/src/main/java/org/apache/twill/internal/json/JsonUtils.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-
-/**
- * Collections of helper functions for json codec.
- */
-public final class JsonUtils {
-
-  private JsonUtils() {
-  }
-
-  /**
-   * Returns a String representation of the given property.
-   */
-  public static String getAsString(JsonObject json, String property) {
-    JsonElement jsonElement = json.get(property);
-    if (jsonElement.isJsonNull()) {
-      return null;
-    }
-    if (jsonElement.isJsonPrimitive()) {
-      return jsonElement.getAsString();
-    }
-    return jsonElement.toString();
-  }
-
-  /**
-   * Returns a long representation of the given property.
-   */
-  public static long getAsLong(JsonObject json, String property, long defaultValue) {
-    try {
-      return json.get(property).getAsLong();
-    } catch (Exception e) {
-      return defaultValue;
-    }
-  }
-
-  /**
-   * Returns a long representation of the given property.
-   */
-  public static int getAsInt(JsonObject json, String property, int defaultValue) {
-    try {
-      return json.get(property).getAsInt();
-    } catch (Exception e) {
-      return defaultValue;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/json/LocalFileCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/LocalFileCodec.java b/core/src/main/java/org/apache/twill/internal/json/LocalFileCodec.java
deleted file mode 100644
index 680a36c..0000000
--- a/core/src/main/java/org/apache/twill/internal/json/LocalFileCodec.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import org.apache.twill.api.LocalFile;
-import org.apache.twill.internal.DefaultLocalFile;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-
-import java.lang.reflect.Type;
-import java.net.URI;
-
-/**
- *
- */
-public final class LocalFileCodec implements JsonSerializer<LocalFile>, JsonDeserializer<LocalFile> {
-
-  @Override
-  public JsonElement serialize(LocalFile src, Type typeOfSrc, JsonSerializationContext context) {
-    JsonObject json = new JsonObject();
-
-    json.addProperty("name", src.getName());
-    json.addProperty("uri", src.getURI().toASCIIString());
-    json.addProperty("lastModified", src.getLastModified());
-    json.addProperty("size", src.getSize());
-    json.addProperty("archive", src.isArchive());
-    json.addProperty("pattern", src.getPattern());
-
-    return json;
-  }
-
-  @Override
-  public LocalFile deserialize(JsonElement json, Type typeOfT,
-                               JsonDeserializationContext context) throws JsonParseException {
-    JsonObject jsonObj = json.getAsJsonObject();
-
-    String name = jsonObj.get("name").getAsString();
-    URI uri = URI.create(jsonObj.get("uri").getAsString());
-    long lastModified = jsonObj.get("lastModified").getAsLong();
-    long size = jsonObj.get("size").getAsLong();
-    boolean archive = jsonObj.get("archive").getAsBoolean();
-    JsonElement pattern = jsonObj.get("pattern");
-
-    return new DefaultLocalFile(name, uri, lastModified, size,
-                                archive, (pattern == null || pattern.isJsonNull()) ? null : pattern.getAsString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/json/ResourceReportAdapter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/ResourceReportAdapter.java b/core/src/main/java/org/apache/twill/internal/json/ResourceReportAdapter.java
deleted file mode 100644
index e473fe7..0000000
--- a/core/src/main/java/org/apache/twill/internal/json/ResourceReportAdapter.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import org.apache.twill.api.ResourceReport;
-import org.apache.twill.api.TwillRunResources;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-
-import java.io.Reader;
-import java.io.Writer;
-
-/**
- * This class provides utility to help encode/decode {@link ResourceReport} to/from Json.
- */
-public final class ResourceReportAdapter {
-
-  private final Gson gson;
-
-  public static ResourceReportAdapter create() {
-    return new ResourceReportAdapter();
-  }
-
-  private ResourceReportAdapter() {
-    gson = new GsonBuilder()
-              .serializeNulls()
-              .registerTypeAdapter(TwillRunResources.class, new TwillRunResourcesCodec())
-              .registerTypeAdapter(ResourceReport.class, new ResourceReportCodec())
-              .create();
-  }
-
-  public String toJson(ResourceReport report) {
-    return gson.toJson(report, ResourceReport.class);
-  }
-
-  public void toJson(ResourceReport report, Writer writer) {
-    gson.toJson(report, ResourceReport.class, writer);
-  }
-
-  public ResourceReport fromJson(String json) {
-    return gson.fromJson(json, ResourceReport.class);
-  }
-
-  public ResourceReport fromJson(Reader reader) {
-    return gson.fromJson(reader, ResourceReport.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/json/ResourceReportCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/ResourceReportCodec.java b/core/src/main/java/org/apache/twill/internal/json/ResourceReportCodec.java
deleted file mode 100644
index 884d889..0000000
--- a/core/src/main/java/org/apache/twill/internal/json/ResourceReportCodec.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import org.apache.twill.api.ResourceReport;
-import org.apache.twill.api.TwillRunResources;
-import org.apache.twill.internal.DefaultResourceReport;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-import com.google.gson.reflect.TypeToken;
-
-import java.lang.reflect.Type;
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * Codec for serializing and deserializing a {@link ResourceReport} object using json.
- */
-public final class ResourceReportCodec implements JsonSerializer<ResourceReport>,
-                                           JsonDeserializer<ResourceReport> {
-
-  @Override
-  public JsonElement serialize(ResourceReport src, Type typeOfSrc, JsonSerializationContext context) {
-    JsonObject json = new JsonObject();
-
-    json.addProperty("appMasterId", src.getApplicationId());
-    json.add("appMasterResources", context.serialize(
-      src.getAppMasterResources(), new TypeToken<TwillRunResources>(){}.getType()));
-    json.add("runnableResources", context.serialize(
-      src.getResources(), new TypeToken<Map<String, Collection<TwillRunResources>>>(){}.getType()));
-
-    return json;
-  }
-
-  @Override
-  public ResourceReport deserialize(JsonElement json, Type typeOfT,
-                                           JsonDeserializationContext context) throws JsonParseException {
-    JsonObject jsonObj = json.getAsJsonObject();
-    String appMasterId = jsonObj.get("appMasterId").getAsString();
-    TwillRunResources masterResources = context.deserialize(
-      jsonObj.get("appMasterResources"), TwillRunResources.class);
-    Map<String, Collection<TwillRunResources>> resources = context.deserialize(
-      jsonObj.get("runnableResources"), new TypeToken<Map<String, Collection<TwillRunResources>>>(){}.getType());
-
-    return new DefaultResourceReport(appMasterId, masterResources, resources);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java b/core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
deleted file mode 100644
index bea73c4..0000000
--- a/core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import org.apache.twill.api.ResourceSpecification;
-import org.apache.twill.internal.DefaultResourceSpecification;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-
-import java.lang.reflect.Type;
-
-/**
- *
- */
-final class ResourceSpecificationCodec implements JsonSerializer<ResourceSpecification>,
-                                                  JsonDeserializer<ResourceSpecification> {
-
-  @Override
-  public JsonElement serialize(ResourceSpecification src, Type typeOfSrc, JsonSerializationContext context) {
-    JsonObject json = new JsonObject();
-
-    json.addProperty("cores", src.getVirtualCores());
-    json.addProperty("memorySize", src.getMemorySize());
-    json.addProperty("instances", src.getInstances());
-    json.addProperty("uplink", src.getUplink());
-    json.addProperty("downlink", src.getDownlink());
-
-    return json;
-  }
-
-  @Override
-  public ResourceSpecification deserialize(JsonElement json, Type typeOfT,
-                                           JsonDeserializationContext context) throws JsonParseException {
-    JsonObject jsonObj = json.getAsJsonObject();
-    return new DefaultResourceSpecification(jsonObj.get("cores").getAsInt(),
-                                            jsonObj.get("memorySize").getAsInt(),
-                                            jsonObj.get("instances").getAsInt(),
-                                            jsonObj.get("uplink").getAsInt(),
-                                            jsonObj.get("downlink").getAsInt());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/json/RuntimeSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/RuntimeSpecificationCodec.java b/core/src/main/java/org/apache/twill/internal/json/RuntimeSpecificationCodec.java
deleted file mode 100644
index 867f4a8..0000000
--- a/core/src/main/java/org/apache/twill/internal/json/RuntimeSpecificationCodec.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import org.apache.twill.api.LocalFile;
-import org.apache.twill.api.ResourceSpecification;
-import org.apache.twill.api.RuntimeSpecification;
-import org.apache.twill.api.TwillRunnableSpecification;
-import org.apache.twill.internal.DefaultRuntimeSpecification;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-
-import java.lang.reflect.Type;
-import java.util.Collection;
-
-/**
- *
- */
-final class RuntimeSpecificationCodec implements JsonSerializer<RuntimeSpecification>,
-                                                 JsonDeserializer<RuntimeSpecification> {
-
-  @Override
-  public JsonElement serialize(RuntimeSpecification src, Type typeOfSrc, JsonSerializationContext context) {
-    JsonObject json = new JsonObject();
-    json.addProperty("name", src.getName());
-    json.add("runnable", context.serialize(src.getRunnableSpecification(), TwillRunnableSpecification.class));
-    json.add("resources", context.serialize(src.getResourceSpecification(), ResourceSpecification.class));
-    json.add("files", context.serialize(src.getLocalFiles(), new TypeToken<Collection<LocalFile>>(){}.getType()));
-
-    return json;
-  }
-
-  @Override
-  public RuntimeSpecification deserialize(JsonElement json, Type typeOfT,
-                                          JsonDeserializationContext context) throws JsonParseException {
-    JsonObject jsonObj = json.getAsJsonObject();
-
-    String name = jsonObj.get("name").getAsString();
-    TwillRunnableSpecification runnable = context.deserialize(jsonObj.get("runnable"),
-                                                               TwillRunnableSpecification.class);
-    ResourceSpecification resources = context.deserialize(jsonObj.get("resources"),
-                                                          ResourceSpecification.class);
-    Collection<LocalFile> files = context.deserialize(jsonObj.get("files"),
-                                                      new TypeToken<Collection<LocalFile>>(){}.getType());
-
-    return new DefaultRuntimeSpecification(name, runnable, resources, files);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/json/StackTraceElementCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/StackTraceElementCodec.java b/core/src/main/java/org/apache/twill/internal/json/StackTraceElementCodec.java
deleted file mode 100644
index 9a57b46..0000000
--- a/core/src/main/java/org/apache/twill/internal/json/StackTraceElementCodec.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-
-import java.lang.reflect.Type;
-
-/**
- *
- */
-public final class StackTraceElementCodec implements JsonSerializer<StackTraceElement>,
-                                                     JsonDeserializer<StackTraceElement> {
-
-  @Override
-  public StackTraceElement deserialize(JsonElement json, Type typeOfT,
-                                       JsonDeserializationContext context) throws JsonParseException {
-    JsonObject jsonObj = json.getAsJsonObject();
-    return new StackTraceElement(JsonUtils.getAsString(jsonObj, "className"),
-                                 JsonUtils.getAsString(jsonObj, "method"),
-                                 JsonUtils.getAsString(jsonObj, "file"),
-                                 JsonUtils.getAsInt(jsonObj, "line", -1));
-  }
-
-  @Override
-  public JsonElement serialize(StackTraceElement src, Type typeOfSrc, JsonSerializationContext context) {
-    JsonObject jsonObj = new JsonObject();
-    jsonObj.addProperty("className", src.getClassName());
-    jsonObj.addProperty("method", src.getMethodName());
-    jsonObj.addProperty("file", src.getFileName());
-    jsonObj.addProperty("line", src.getLineNumber());
-
-    return jsonObj;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/json/StateNodeCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/StateNodeCodec.java b/core/src/main/java/org/apache/twill/internal/json/StateNodeCodec.java
deleted file mode 100644
index c1e9d1c..0000000
--- a/core/src/main/java/org/apache/twill/internal/json/StateNodeCodec.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import org.apache.twill.api.ServiceController;
-import org.apache.twill.internal.state.StateNode;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-
-import java.lang.reflect.Type;
-
-/**
- *
- */
-public final class StateNodeCodec implements JsonSerializer<StateNode>, JsonDeserializer<StateNode> {
-
-  @Override
-  public StateNode deserialize(JsonElement json, Type typeOfT,
-                               JsonDeserializationContext context) throws JsonParseException {
-    JsonObject jsonObj = json.getAsJsonObject();
-    ServiceController.State state = ServiceController.State.valueOf(jsonObj.get("state").getAsString());
-    String errorMessage = jsonObj.has("errorMessage") ? jsonObj.get("errorMessage").getAsString() : null;
-
-    return new StateNode(state, errorMessage,
-                         context.<StackTraceElement[]>deserialize(jsonObj.get("stackTraces"), StackTraceElement[].class));
-  }
-
-  @Override
-  public JsonElement serialize(StateNode src, Type typeOfSrc, JsonSerializationContext context) {
-    JsonObject jsonObj = new JsonObject();
-    jsonObj.addProperty("state", src.getState().name());
-    if (src.getErrorMessage() != null) {
-      jsonObj.addProperty("errorMessage", src.getErrorMessage());
-    }
-    if (src.getStackTraces() != null) {
-      jsonObj.add("stackTraces", context.serialize(src.getStackTraces(), StackTraceElement[].class));
-    }
-    return jsonObj;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java b/core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
deleted file mode 100644
index 8951173..0000000
--- a/core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import org.apache.twill.api.TwillRunResources;
-import org.apache.twill.internal.DefaultTwillRunResources;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-
-import java.lang.reflect.Type;
-
-/**
- * Codec for serializing and deserializing a {@link org.apache.twill.api.TwillRunResources} object using json.
- */
-public final class TwillRunResourcesCodec implements JsonSerializer<TwillRunResources>,
-                                              JsonDeserializer<TwillRunResources> {
-
-  @Override
-  public JsonElement serialize(TwillRunResources src, Type typeOfSrc, JsonSerializationContext context) {
-    JsonObject json = new JsonObject();
-
-    json.addProperty("containerId", src.getContainerId());
-    json.addProperty("instanceId", src.getInstanceId());
-    json.addProperty("host", src.getHost());
-    json.addProperty("memoryMB", src.getMemoryMB());
-    json.addProperty("virtualCores", src.getVirtualCores());
-
-    return json;
-  }
-
-  @Override
-  public TwillRunResources deserialize(JsonElement json, Type typeOfT,
-                                           JsonDeserializationContext context) throws JsonParseException {
-    JsonObject jsonObj = json.getAsJsonObject();
-    return new DefaultTwillRunResources(jsonObj.get("instanceId").getAsInt(),
-                                        jsonObj.get("containerId").getAsString(),
-                                        jsonObj.get("virtualCores").getAsInt(),
-                                        jsonObj.get("memoryMB").getAsInt(),
-                                        jsonObj.get("host").getAsString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/json/TwillRunnableSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/TwillRunnableSpecificationCodec.java b/core/src/main/java/org/apache/twill/internal/json/TwillRunnableSpecificationCodec.java
deleted file mode 100644
index f37c1e8..0000000
--- a/core/src/main/java/org/apache/twill/internal/json/TwillRunnableSpecificationCodec.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import org.apache.twill.api.TwillRunnableSpecification;
-import org.apache.twill.internal.DefaultTwillRunnableSpecification;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-
-import java.lang.reflect.Type;
-import java.util.Map;
-
-/**
- *
- */
-final class TwillRunnableSpecificationCodec implements JsonSerializer<TwillRunnableSpecification>,
-                                                       JsonDeserializer<TwillRunnableSpecification> {
-
-  @Override
-  public JsonElement serialize(TwillRunnableSpecification src, Type typeOfSrc, JsonSerializationContext context) {
-    JsonObject json = new JsonObject();
-
-    json.addProperty("classname", src.getClassName());
-    json.addProperty("name", src.getName());
-    json.add("arguments", context.serialize(src.getConfigs(), new TypeToken<Map<String, String>>(){}.getType()));
-
-    return json;
-  }
-
-  @Override
-  public TwillRunnableSpecification deserialize(JsonElement json, Type typeOfT,
-                                                JsonDeserializationContext context) throws JsonParseException {
-    JsonObject jsonObj = json.getAsJsonObject();
-
-    String className = jsonObj.get("classname").getAsString();
-    String name = jsonObj.get("name").getAsString();
-    Map<String, String> arguments = context.deserialize(jsonObj.get("arguments"),
-                                                        new TypeToken<Map<String, String>>(){}.getType());
-
-    return new DefaultTwillRunnableSpecification(className, name, arguments);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java b/core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
deleted file mode 100644
index 67c15a2..0000000
--- a/core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.Maps;
-import com.google.common.io.Files;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.TypeAdapter;
-import com.google.gson.TypeAdapterFactory;
-import com.google.gson.reflect.TypeToken;
-import com.google.gson.stream.JsonReader;
-import com.google.gson.stream.JsonToken;
-import com.google.gson.stream.JsonWriter;
-import org.apache.twill.api.EventHandlerSpecification;
-import org.apache.twill.api.LocalFile;
-import org.apache.twill.api.ResourceSpecification;
-import org.apache.twill.api.RuntimeSpecification;
-import org.apache.twill.api.TwillRunnableSpecification;
-import org.apache.twill.api.TwillSpecification;
-import org.apache.twill.internal.json.TwillSpecificationCodec.EventHandlerSpecificationCoder;
-import org.apache.twill.internal.json.TwillSpecificationCodec.TwillSpecificationOrderCoder;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Reader;
-import java.io.Writer;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.util.Map;
-
-/**
- *
- */
-public final class TwillSpecificationAdapter {
-
-  private final Gson gson;
-
-  public static TwillSpecificationAdapter create() {
-    return new TwillSpecificationAdapter();
-  }
-
-  private TwillSpecificationAdapter() {
-    gson = new GsonBuilder()
-              .serializeNulls()
-              .registerTypeAdapter(TwillSpecification.class, new TwillSpecificationCodec())
-              .registerTypeAdapter(TwillSpecification.Order.class, new TwillSpecificationOrderCoder())
-              .registerTypeAdapter(EventHandlerSpecification.class, new EventHandlerSpecificationCoder())
-              .registerTypeAdapter(RuntimeSpecification.class, new RuntimeSpecificationCodec())
-              .registerTypeAdapter(TwillRunnableSpecification.class, new TwillRunnableSpecificationCodec())
-              .registerTypeAdapter(ResourceSpecification.class, new ResourceSpecificationCodec())
-              .registerTypeAdapter(LocalFile.class, new LocalFileCodec())
-              .registerTypeAdapterFactory(new TwillSpecificationTypeAdapterFactory())
-              .create();
-  }
-
-  public String toJson(TwillSpecification spec) {
-    return gson.toJson(spec, TwillSpecification.class);
-  }
-
-  public void toJson(TwillSpecification spec, Writer writer) {
-    gson.toJson(spec, TwillSpecification.class, writer);
-  }
-
-  public void toJson(TwillSpecification spec, File file) throws IOException {
-    Writer writer = Files.newWriter(file, Charsets.UTF_8);
-    try {
-      toJson(spec, writer);
-    } finally {
-      writer.close();
-    }
-  }
-
-  public TwillSpecification fromJson(String json) {
-    return gson.fromJson(json, TwillSpecification.class);
-  }
-
-  public TwillSpecification fromJson(Reader reader) {
-    return gson.fromJson(reader, TwillSpecification.class);
-  }
-
-  public TwillSpecification fromJson(File file) throws IOException {
-    Reader reader = Files.newReader(file, Charsets.UTF_8);
-    try {
-      return fromJson(reader);
-    } finally {
-      reader.close();
-    }
-  }
-
-  // This is to get around gson ignoring of inner class
-  private static final class TwillSpecificationTypeAdapterFactory implements TypeAdapterFactory {
-
-    @Override
-    public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> type) {
-      Class<?> rawType = type.getRawType();
-      if (!Map.class.isAssignableFrom(rawType)) {
-        return null;
-      }
-      Type[] typeArgs = ((ParameterizedType) type.getType()).getActualTypeArguments();
-      TypeToken<?> keyType = TypeToken.get(typeArgs[0]);
-      TypeToken<?> valueType = TypeToken.get(typeArgs[1]);
-      if (keyType.getRawType() != String.class) {
-        return null;
-      }
-      return (TypeAdapter<T>) mapAdapter(gson, valueType);
-    }
-
-    private <V> TypeAdapter<Map<String, V>> mapAdapter(Gson gson, TypeToken<V> valueType) {
-      final TypeAdapter<V> valueAdapter = gson.getAdapter(valueType);
-
-      return new TypeAdapter<Map<String, V>>() {
-        @Override
-        public void write(JsonWriter writer, Map<String, V> map) throws IOException {
-          if (map == null) {
-            writer.nullValue();
-            return;
-          }
-          writer.beginObject();
-          for (Map.Entry<String, V> entry : map.entrySet()) {
-            writer.name(entry.getKey());
-            valueAdapter.write(writer, entry.getValue());
-          }
-          writer.endObject();
-        }
-
-        @Override
-        public Map<String, V> read(JsonReader reader) throws IOException {
-          if (reader.peek() == JsonToken.NULL) {
-            reader.nextNull();
-            return null;
-          }
-          if (reader.peek() != JsonToken.BEGIN_OBJECT) {
-            return null;
-          }
-          Map<String, V> map = Maps.newHashMap();
-          reader.beginObject();
-          while (reader.peek() != JsonToken.END_OBJECT) {
-            map.put(reader.nextName(), valueAdapter.read(reader));
-          }
-          reader.endObject();
-          return map;
-        }
-      };
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java b/core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
deleted file mode 100644
index 5d88350..0000000
--- a/core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import org.apache.twill.api.EventHandlerSpecification;
-import org.apache.twill.api.RuntimeSpecification;
-import org.apache.twill.api.TwillSpecification;
-import org.apache.twill.internal.DefaultEventHandlerSpecification;
-import org.apache.twill.internal.DefaultTwillSpecification;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-
-import java.lang.reflect.Type;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * An implementation of gson serializer/deserializer {@link org.apache.twill.api.TwillSpecification}.
- */
-final class TwillSpecificationCodec implements JsonSerializer<TwillSpecification>,
-                                               JsonDeserializer<TwillSpecification> {
-
-  @Override
-  public JsonElement serialize(TwillSpecification src, Type typeOfSrc, JsonSerializationContext context) {
-    JsonObject json = new JsonObject();
-    json.addProperty("name", src.getName());
-    json.add("runnables", context.serialize(src.getRunnables(),
-                                            new TypeToken<Map<String, RuntimeSpecification>>(){}.getType()));
-    json.add("orders", context.serialize(src.getOrders(),
-                                         new TypeToken<List<TwillSpecification.Order>>(){}.getType()));
-    EventHandlerSpecification eventHandler = src.getEventHandler();
-    if (eventHandler != null) {
-      json.add("handler", context.serialize(eventHandler, EventHandlerSpecification.class));
-    }
-
-    return json;
-  }
-
-  @Override
-  public TwillSpecification deserialize(JsonElement json, Type typeOfT,
-                                        JsonDeserializationContext context) throws JsonParseException {
-    JsonObject jsonObj = json.getAsJsonObject();
-
-    String name = jsonObj.get("name").getAsString();
-    Map<String, RuntimeSpecification> runnables = context.deserialize(
-      jsonObj.get("runnables"), new TypeToken<Map<String, RuntimeSpecification>>(){}.getType());
-    List<TwillSpecification.Order> orders = context.deserialize(
-      jsonObj.get("orders"), new TypeToken<List<TwillSpecification.Order>>(){}.getType());
-
-    JsonElement handler = jsonObj.get("handler");
-    EventHandlerSpecification eventHandler = null;
-    if (handler != null && !handler.isJsonNull()) {
-      eventHandler = context.deserialize(handler, EventHandlerSpecification.class);
-    }
-
-    return new DefaultTwillSpecification(name, runnables, orders, eventHandler);
-  }
-
-  static final class TwillSpecificationOrderCoder implements JsonSerializer<TwillSpecification.Order>,
-                                                             JsonDeserializer<TwillSpecification.Order> {
-
-    @Override
-    public JsonElement serialize(TwillSpecification.Order src, Type typeOfSrc, JsonSerializationContext context) {
-      JsonObject json = new JsonObject();
-      json.add("names", context.serialize(src.getNames(), new TypeToken<Set<String>>(){}.getType()));
-      json.addProperty("type", src.getType().name());
-      return json;
-    }
-
-    @Override
-    public TwillSpecification.Order deserialize(JsonElement json, Type typeOfT,
-                                                JsonDeserializationContext context) throws JsonParseException {
-      JsonObject jsonObj = json.getAsJsonObject();
-
-      Set<String> names = context.deserialize(jsonObj.get("names"), new TypeToken<Set<String>>(){}.getType());
-      TwillSpecification.Order.Type type = TwillSpecification.Order.Type.valueOf(jsonObj.get("type").getAsString());
-
-      return new DefaultTwillSpecification.DefaultOrder(names, type);
-    }
-  }
-
-  static final class EventHandlerSpecificationCoder implements JsonSerializer<EventHandlerSpecification>,
-                                                               JsonDeserializer<EventHandlerSpecification> {
-
-    @Override
-    public JsonElement serialize(EventHandlerSpecification src, Type typeOfSrc, JsonSerializationContext context) {
-      JsonObject json = new JsonObject();
-      json.addProperty("classname", src.getClassName());
-      json.add("configs", context.serialize(src.getConfigs(), new TypeToken<Map<String, String>>(){}.getType()));
-      return json;
-    }
-
-    @Override
-    public EventHandlerSpecification deserialize(JsonElement json, Type typeOfT,
-                                                 JsonDeserializationContext context) throws JsonParseException {
-      JsonObject jsonObj = json.getAsJsonObject();
-      String className = jsonObj.get("classname").getAsString();
-      Map<String, String> configs = context.deserialize(jsonObj.get("configs"),
-                                                        new TypeToken<Map<String, String>>() {
-                                                        }.getType());
-
-      return new DefaultEventHandlerSpecification(className, configs);
-    }
-  }
-}