You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2014/10/24 20:09:05 UTC

[1/2] (TWILL-105) Remove ZKDecoratorService and some pom file fixes

Repository: incubator-twill
Updated Branches:
  refs/heads/branch-0.4.0 d49f794dc -> 7da6e6a42


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/7da6e6a4/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java
new file mode 100644
index 0000000..4704345
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.twill.api.RunId;
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.internal.AbstractTwillService;
+import org.apache.twill.internal.Constants;
+import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.state.SystemMessages;
+import org.apache.twill.zookeeper.ZKClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+/**
+ * Abstract class for implementing {@link com.google.common.util.concurrent.Service} that runs in
+ * YARN container which provides methods to handle secure token updates.
+ */
+public abstract class AbstractYarnTwillService extends AbstractTwillService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractYarnTwillService.class);
+  protected final Location applicationLocation;
+  protected volatile Credentials credentials;
+
+  protected AbstractYarnTwillService(ZKClient zkClient, RunId runId, Location applicationLocation) {
+    super(zkClient, runId);
+    this.applicationLocation = applicationLocation;
+  }
+
+  /**
+   * Returns the location of the secure store, or {@code null} if either not running in secure mode or an error
+   * occur when trying to acquire the location.
+   */
+  protected final Location getSecureStoreLocation() {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return null;
+    }
+    try {
+      return applicationLocation.append(Constants.Files.CREDENTIALS);
+    } catch (IOException e) {
+      LOG.error("Failed to create secure store location.", e);
+      return null;
+    }
+  }
+
+  /**
+   * Attempts to handle secure store update.
+   *
+   * @param message The message received
+   * @return {@code true} if the message requests for secure store update, {@code false} otherwise.
+   */
+  protected final boolean handleSecureStoreUpdate(Message message) {
+    if (!SystemMessages.SECURE_STORE_UPDATED.equals(message)) {
+      return false;
+    }
+
+    // If not in secure mode, simply ignore the message.
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return true;
+    }
+
+    try {
+      Credentials credentials = new Credentials();
+      Location location = getSecureStoreLocation();
+      DataInputStream input = new DataInputStream(new BufferedInputStream(location.getInputStream()));
+      try {
+        credentials.readTokenStorageStream(input);
+      } finally {
+        input.close();
+      }
+
+      UserGroupInformation.getCurrentUser().addCredentials(credentials);
+      this.credentials = credentials;
+
+      LOG.info("Secure store updated from {}.", location.toURI());
+
+    } catch (Throwable t) {
+      LOG.error("Failed to update secure store.", t);
+    }
+
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/7da6e6a4/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
index 5afd679..45b8e0c 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
@@ -33,7 +33,6 @@ import org.apache.twill.internal.AbstractTwillController;
 import org.apache.twill.internal.Constants;
 import org.apache.twill.internal.ProcessController;
 import org.apache.twill.internal.appmaster.TrackerService;
-import org.apache.twill.internal.state.StateNode;
 import org.apache.twill.internal.state.SystemMessages;
 import org.apache.twill.internal.yarn.YarnApplicationReport;
 import org.apache.twill.zookeeper.NodeData;
@@ -215,11 +214,6 @@ final class YarnTwillController extends AbstractTwillController implements Twill
     startPollStatus(report.getApplicationId());
   }
 
-  @Override
-  protected void stateNodeUpdated(StateNode stateNode) {
-
-  }
-
   private synchronized void startPollStatus(ApplicationId appId) {
     if (statusPollingThread == null) {
       statusPollingThread = new Thread(createStatusPollingRunnable(),

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/7da6e6a4/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
index 2634e78..6a33131 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
@@ -447,7 +447,10 @@ public final class YarnTwillRunnerService extends AbstractIdleService implements
 
       @Override
       public Iterator<LiveInfo> iterator() {
-        Map<String, Map<RunId, YarnTwillController>> controllerMap = ImmutableTable.copyOf(controllers).rowMap();
+        Map<String, Map<RunId, YarnTwillController>> controllerMap;
+        synchronized (YarnTwillRunnerService.this) {
+          controllerMap = ImmutableTable.copyOf(controllers).rowMap();
+        }
         return Iterators.transform(controllerMap.entrySet().iterator(),
                                    new Function<Map.Entry<String, Map<RunId, YarnTwillController>>, LiveInfo>() {
           @Override

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/7da6e6a4/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java b/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
index 304c490..f041be4 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
@@ -17,9 +17,14 @@
  */
 package org.apache.twill.yarn;
 
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
@@ -27,11 +32,29 @@ import java.io.IOException;
  * Base class for all YARN tests.
  */
 public abstract class BaseYarnTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BaseYarnTest.class);
+
   @ClassRule
   public static TemporaryFolder tmpFolder = new TemporaryFolder();
 
   @BeforeClass
-  public static final void init() throws IOException {
+  public static void init() throws IOException {
     YarnTestUtils.initOnce();
   }
+
+  @After
+  public final void cleanupTest() {
+    // Make sure all applications are stopped after a test case is executed, even it failed.
+    TwillRunner twillRunner = YarnTestUtils.getTwillRunner();
+    for (TwillRunner.LiveInfo liveInfo : twillRunner.lookupLive()) {
+      for (TwillController controller : liveInfo.getControllers()) {
+        try {
+          controller.stopAndWait();
+        } catch (Throwable t) {
+          LOG.error("Failed to stop application {}", liveInfo.getApplicationName(), t);
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/7da6e6a4/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
index 683b452..691fc30 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
@@ -17,6 +17,7 @@
  */
 package org.apache.twill.yarn;
 
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.twill.api.AbstractTwillRunnable;
 import org.apache.twill.api.TwillController;
 import org.apache.twill.api.TwillRunner;
@@ -91,7 +92,7 @@ public class LogHandlerTestRun extends BaseYarnTest {
     LogThrowable t = throwables.poll();
     Assert.assertEquals(RuntimeException.class.getName(), t.getClassName());
     Assert.assertNotNull(t.getCause());
-    Assert.assertEquals(4, t.getStackTraces().length);
+    Assert.assertEquals(5, t.getStackTraces().length);
 
     t = t.getCause();
     Assert.assertEquals(Exception.class.getName(), t.getClassName());
@@ -120,11 +121,7 @@ public class LogHandlerTestRun extends BaseYarnTest {
         LOG.error("Got exception", t);
       }
 
-      try {
-        stopLatch.await();
-      } catch (InterruptedException e) {
-        LOG.error("Interrupted", e);
-      }
+      Uninterruptibles.awaitUninterruptibly(stopLatch);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/7da6e6a4/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
index 3a82272..c7a0f86 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
@@ -188,7 +188,7 @@ public final class ResourceReportTestRun extends BaseYarnTest {
       count++;
       TimeUnit.SECONDS.sleep(1);
     }
-    Assert.assertTrue("Still has 2 contains running after 20 seconds", count < 20);
+    Assert.assertTrue("Still has 2 contains running after 100 seconds", count < 100);
 
     controller.stop().get(100, TimeUnit.SECONDS);
     // Sleep a bit before exiting.


[2/2] git commit: (TWILL-105) Remove ZKDecoratorService and some pom file fixes

Posted by ch...@apache.org.
(TWILL-105) Remove ZKDecoratorService and some pom file fixes

- The ZKDecoratorSerivce is complicates and is not necessarily and have complicated race condition inside
- Most of the functionalities in ZKDecoratorService are moved into AbstractTwillService
  - It uses AbstractExecutionThreadService from guava that simplifies a lot of service+zk related interactions
  - Remove use of StateNode and updating of Service states in ZK as no one is actually using it
- The Hadoop secure store related code in AbstractTwillService are moved into AbstractYarnTwillService
- Fixes in the pom.xml
  - There should be one Hadoop version profile that is activeByDefault
  - Increase memory size for unit-test
- Fix YARN test cases
  - Increase test timeout for PolicyPlacementTest
  - Correct assertion in ResourceReportTestRun.testResourceReportWithFailingContainers

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/7da6e6a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/7da6e6a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/7da6e6a4

Branch: refs/heads/branch-0.4.0
Commit: 7da6e6a426545779fdc22b1b6a1eddb931288760
Parents: d49f794
Author: Terence Yim <ch...@apache.org>
Authored: Tue Oct 21 18:18:40 2014 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Fri Oct 24 11:08:46 2014 -0700

----------------------------------------------------------------------
 pom.xml                                         |   8 +-
 .../twill/internal/AbstractTwillService.java    | 377 ++++++++++++++
 .../internal/AbstractZKServiceController.java   |  72 +--
 .../twill/internal/TwillContainerLauncher.java  |   9 -
 .../twill/internal/ZKServiceDecorator.java      | 497 -------------------
 .../twill/internal/json/StateNodeCodec.java     |  61 ---
 .../apache/twill/internal/state/StateNode.java  |  84 ----
 .../apache/twill/internal/ControllerTest.java   |  32 +-
 .../internal/state/ZKServiceDecoratorTest.java  | 157 ------
 .../twill/internal/AbstractTwillService.java    | 141 ------
 .../appmaster/ApplicationMasterService.java     | 188 +++----
 .../container/TwillContainerService.java        | 129 ++---
 .../internal/yarn/AbstractYarnTwillService.java | 104 ++++
 .../apache/twill/yarn/YarnTwillController.java  |   6 -
 .../twill/yarn/YarnTwillRunnerService.java      |   5 +-
 .../org/apache/twill/yarn/BaseYarnTest.java     |  25 +-
 .../apache/twill/yarn/LogHandlerTestRun.java    |   9 +-
 .../twill/yarn/ResourceReportTestRun.java       |   2 +-
 18 files changed, 650 insertions(+), 1256 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/7da6e6a4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 30467eb..e244e3d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -339,7 +339,7 @@
                 <artifactId>maven-surefire-plugin</artifactId>
                 <version>2.14.1</version>
                 <configuration>
-                    <argLine>-Xmx512m -Djava.awt.headless=true</argLine>
+                    <argLine>-Xmx1024m -XX:MaxPermSize=256m -Djava.awt.headless=true</argLine>
                     <redirectTestOutputToFile>${surefire.redirectTestOutputToFile}</redirectTestOutputToFile>
                     <systemPropertyVariables>
                         <java.io.tmpdir>${project.build.directory}</java.io.tmpdir>
@@ -579,9 +579,6 @@
             <properties>
                 <hadoop.version>2.4.1</hadoop.version>
             </properties>
-            <activation>
-                <activeByDefault>true</activeByDefault>
-            </activation>
             <build>
                 <plugins>
                     <plugin>
@@ -624,9 +621,6 @@
             <properties>
                 <hadoop.version>2.5.1</hadoop.version>
             </properties>
-            <activation>
-                <activeByDefault>true</activeByDefault>
-            </activation>
             <build>
                 <plugins>
                     <plugin>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/7da6e6a4/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
new file mode 100644
index 0000000..a6939e2
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import org.apache.twill.api.RunId;
+import org.apache.twill.common.Threads;
+import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.state.MessageCallback;
+import org.apache.twill.internal.state.MessageCodec;
+import org.apache.twill.internal.state.SystemMessages;
+import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.OperationFuture;
+import org.apache.twill.zookeeper.ZKClient;
+import org.apache.twill.zookeeper.ZKOperations;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A base implementation of {@link Service} that uses ZooKeeper to transmit states and messages. It uses
+ * the following directory structure in ZK:
+ *
+ * <pre>
+ * /instances
+ *     |- [runId_1]
+ *     |- [runId_2]
+ *     |- ...
+ * /[runId_1]
+ *     |- messages
+ *          |- [messageId_1]
+ *          |- [messageId_2]
+ *          |- ....
+ * /[runId_2]
+ *     |- messages
+ * </pre>
+ *
+ * It assumes that the zk root node is already namespaced
+ * (either with applicationId for AM or runnableId for containers).
+ * <p/>
+ * The ephemeral nodes under {@code /instances} are the {@code liveNode} for each running instance. It can carries data
+ * about that service, which is set by the corresponding implementation.
+ * <p/>
+ * Each running instance also has its own node named by the runId. Under that node, it has a {@code messages} node for
+ * receiving messages from the controller. New message is created by creating a sequence node under the {@code messages}
+ * node, with the node data carrying the message content. The message node will be removed once the message
+ * is being processed by the service.
+ */
+public abstract class AbstractTwillService extends AbstractExecutionThreadService implements MessageCallback {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractTwillService.class);
+  private static final Gson GSON = new Gson();
+
+  protected final ZKClient zkClient;
+  protected final RunId runId;
+  private ExecutorService messageCallbackExecutor;
+
+  protected AbstractTwillService(final ZKClient zkClient, RunId runId) {
+    this.zkClient = zkClient;
+    this.runId = runId;
+  }
+
+  /**
+   * Override to perform any work during service start.
+   */
+  protected void doStart() throws Exception {
+    // Default no-op
+  }
+
+  /**
+   * Override to execution service work. When this method returns, this Service will stop.
+   */
+  protected void doRun() throws Exception {
+    // Default no-op
+  }
+
+  /**
+   * Overrides to perform any work during service shutdown.
+   */
+  protected void doStop() throws Exception {
+    // Default no-op
+  }
+
+  /**
+   * Returns an Object to be stored in the live node. The object return will be GSon serialized. If {@code null}
+   * is returned, no data will be stored to the live node.
+   */
+  protected Object getLiveNodeData() {
+    return null;
+  }
+
+  /**
+   * Handles message by simply logging it. Child class should override this method for custom handling of message.
+   *
+   * @see org.apache.twill.internal.state.MessageCallback
+   */
+  @Override
+  public ListenableFuture<String> onReceived(String messageId, Message message) {
+    LOG.info("Message received: {}", message);
+    return Futures.immediateCheckedFuture(messageId);
+  }
+
+  @Override
+  protected final void startUp() throws Exception {
+    // Single thread executor that will discard task silently if it is already terminated, which only
+    // happens when this service is shutting down.
+    messageCallbackExecutor = new ThreadPoolExecutor(1, 1,
+                                                     0L, TimeUnit.MILLISECONDS,
+                                                     new LinkedBlockingQueue<Runnable>(),
+                                                     Threads.createDaemonThreadFactory("message-callback"),
+                                                     new ThreadPoolExecutor.DiscardPolicy());
+
+    // Create the live node, if succeeded, start the service, otherwise fail out.
+    createLiveNode().get();
+
+    // Create node for messaging
+    ZKOperations.ignoreError(zkClient.create(getZKPath("messages"), null, CreateMode.PERSISTENT),
+                             KeeperException.NodeExistsException.class, null).get();
+
+    // Watch for session expiration, recreate the live node if reconnected after expiration.
+    zkClient.addConnectionWatcher(new Watcher() {
+      private boolean expired = false;
+
+      @Override
+      public void process(WatchedEvent event) {
+        if (event.getState() == Event.KeeperState.Expired) {
+          LOG.warn("ZK Session expired for service {} with runId {}.", getServiceName(), runId.getId());
+          expired = true;
+        } else if (event.getState() == Event.KeeperState.SyncConnected && expired) {
+          LOG.info("Reconnected after expiration for service {} with runId {}", getServiceName(), runId.getId());
+          expired = false;
+          logIfFailed(createLiveNode());
+        }
+      }
+    });
+
+    doStart();
+
+    // Starts watching for messages
+    watchMessages();
+  }
+
+  @Override
+  protected final void run() throws Exception {
+    doRun();
+  }
+
+  @Override
+  protected final void shutDown() throws Exception {
+    messageCallbackExecutor.shutdownNow();
+    try {
+      doStop();
+    } finally {
+      ListenableFuture<List<String>> removeCompletion = Futures.successfulAsList(ImmutableList.of(removeServiceNode(),
+                                                                                                  removeLiveNode()));
+      // Given at most 5 seconds to cleanup ZK nodes
+      removeCompletion.get(5, TimeUnit.SECONDS);
+      LOG.info("Service {} with runId {} shutdown completed", getServiceName(), runId.getId());
+    }
+  }
+
+  private OperationFuture<String> createLiveNode() {
+    String liveNode = getLiveNodePath();
+    LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNode);
+
+    JsonObject content = new JsonObject();
+    Object liveNodeData = getLiveNodeData();
+    if (liveNodeData != null) {
+      content.add("data", GSON.toJsonTree(liveNodeData));
+    }
+    return ZKOperations.ignoreError(zkClient.create(liveNode, toJson(content), CreateMode.EPHEMERAL),
+                                    KeeperException.NodeExistsException.class, liveNode);
+  }
+
+  private OperationFuture<String> removeLiveNode() {
+    String liveNode = getLiveNodePath();
+    LOG.info("Remove live node {}{}", zkClient.getConnectString(), liveNode);
+    return ZKOperations.ignoreError(zkClient.delete(liveNode), KeeperException.NoNodeException.class, liveNode);
+  }
+
+  private OperationFuture<String> removeServiceNode() {
+    String serviceNode = String.format("/%s", runId.getId());
+    LOG.info("Remove service node {}{}", zkClient.getConnectString(), serviceNode);
+    return ZKOperations.recursiveDelete(zkClient, serviceNode);
+  }
+
+  /**
+   * Watches for messages that are sent through ZK messages node.
+   */
+  private void watchMessages() {
+    final String messagesPath = getZKPath("messages");
+    Futures.addCallback(zkClient.getChildren(messagesPath, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        if (event.getType() == Event.EventType.NodeChildrenChanged && isRunning()) {
+          watchMessages();
+        }
+      }
+    }), new FutureCallback<NodeChildren>() {
+      @Override
+      public void onSuccess(NodeChildren result) {
+        // Sort by the name, which is the messageId. Assumption is that message ids is ordered by time.
+        List<String> messages = Lists.newArrayList(result.getChildren());
+        Collections.sort(messages);
+        for (String messageId : messages) {
+          processMessage(messagesPath + "/" + messageId, messageId);
+        }
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        // TODO: what could be done besides just logging?
+        LOG.error("Failed to watch messages.", t);
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+  }
+
+  private void processMessage(final String path, final String messageId) {
+    Futures.addCallback(zkClient.getData(path), new FutureCallback<NodeData>() {
+      @Override
+      public void onSuccess(NodeData result) {
+        Runnable messageRemover = createMessageRemover(path, result.getStat().getVersion());
+
+        Message message = MessageCodec.decode(result.getData());
+        if (message == null) {
+          LOG.error("Failed to decode message for {} in {}", messageId, path);
+          messageRemover.run();
+          return;
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Message received from {}: {}", path, new String(MessageCodec.encode(message), Charsets.UTF_8));
+        }
+
+        // Handle the stop message
+        if (handleStopMessage(message, messageRemover)) {
+          return;
+        }
+        // Otherwise, delegate to the child class to handle the message
+        handleMessage(messageId, message, messageRemover);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        LOG.error("Failed to fetch message content from {}", path, t);
+      }
+    }, messageCallbackExecutor);
+  }
+
+  /**
+   * Handles {@link SystemMessages#STOP_COMMAND} if the given message is a stop command. After this service is stopped,
+   * the message node will be removed.
+   *
+   * @param message Message to process
+   * @param messageRemover Runnable to remove the message node when this service is stopped
+   * @return {@code true} if the given message is a stop command, {@code false} otherwise
+   */
+  private boolean handleStopMessage(Message message, final Runnable messageRemover) {
+    if (message.getType() != Message.Type.SYSTEM || !SystemMessages.STOP_COMMAND.equals(message.getCommand())) {
+      return false;
+    }
+
+    // Stop this service.
+    Futures.addCallback(stop(), new FutureCallback<State>() {
+      @Override
+      public void onSuccess(State result) {
+        messageRemover.run();
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        LOG.error("Stop service failed upon STOP command", t);
+        messageRemover.run();
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+    return true;
+  }
+
+
+  /**
+   * Handles the given message by calling {@link #onReceived(java.lang.String, org.apache.twill.internal.state.Message)}
+   * method.
+   *
+   * @param messageId Id of the message
+   * @param message The message
+   * @param messageRemover Runnable to remove the message node when the handling of the message is completed
+   */
+  private void handleMessage(String messageId, final Message message, final Runnable messageRemover) {
+    Futures.addCallback(onReceived(messageId, message), new FutureCallback<String>() {
+      @Override
+      public void onSuccess(String result) {
+        messageRemover.run();
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        LOG.error("Failed to handle message {}", message, t);
+        messageRemover.run();
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+  }
+
+  /**
+   * Creates a {@link Runnable} that encapsulation the action to remove a particular message node.
+   */
+  private Runnable createMessageRemover(final String path, final int version) {
+    return new Runnable() {
+      @Override
+      public void run() {
+        logIfFailed(zkClient.delete(path, version));
+      }
+    };
+  }
+
+  /**
+   * Logs if the given future failed.
+   */
+  private <T> void logIfFailed(ListenableFuture<T> future) {
+    Futures.addCallback(future, new FutureCallback<T>() {
+      @Override
+      public void onSuccess(T result) {
+        // All-good
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        LOG.error("Operation failed for service {} with runId {}", getServiceName(), runId, t);
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+  }
+
+  private String getZKPath(String path) {
+    return String.format("/%s/%s", runId.getId(), path);
+  }
+
+  private String getLiveNodePath() {
+    return "/instances/" + runId.getId();
+  }
+
+  private <T> byte[] toJson(T obj) {
+    return GSON.toJson(obj).getBytes(Charsets.UTF_8);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/7da6e6a4/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
index 80932ea..6d95009 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
@@ -17,21 +17,16 @@
  */
 package org.apache.twill.internal;
 
-import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.gson.GsonBuilder;
 import org.apache.twill.api.Command;
 import org.apache.twill.api.RunId;
 import org.apache.twill.api.ServiceController;
 import org.apache.twill.common.Threads;
-import org.apache.twill.internal.json.StackTraceElementCodec;
-import org.apache.twill.internal.json.StateNodeCodec;
 import org.apache.twill.internal.state.Message;
 import org.apache.twill.internal.state.Messages;
-import org.apache.twill.internal.state.StateNode;
 import org.apache.twill.internal.state.SystemMessages;
 import org.apache.twill.zookeeper.NodeData;
 import org.apache.twill.zookeeper.ZKClient;
@@ -47,7 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * An abstract base class for implementing a {@link ServiceController} using ZooKeeper as a means for
- * communicating with the remote service. This is designed to work in pair with the {@link ZKServiceDecorator}.
+ * communicating with the remote service. This is designed to work in pair with the {@link AbstractTwillService}.
  */
 public abstract class AbstractZKServiceController extends AbstractExecutionServiceController {
 
@@ -55,7 +50,6 @@ public abstract class AbstractZKServiceController extends AbstractExecutionServi
 
   protected final ZKClient zkClient;
   private final InstanceNodeDataCallback instanceNodeDataCallback;
-  private final StateNodeDataCallback stateNodeDataCallback;
   private final List<ListenableFuture<?>> messageFutures;
   private ListenableFuture<State> stopMessageFuture;
 
@@ -63,7 +57,6 @@ public abstract class AbstractZKServiceController extends AbstractExecutionServi
     super(runId);
     this.zkClient = zkClient;
     this.instanceNodeDataCallback = new InstanceNodeDataCallback();
-    this.stateNodeDataCallback = new StateNodeDataCallback();
     this.messageFutures = Lists.newLinkedList();
   }
 
@@ -88,14 +81,6 @@ public abstract class AbstractZKServiceController extends AbstractExecutionServi
         watchInstanceNode();
       }
     });
-
-    // Watch for state node data
-    actOnExists(getZKPath("state"), new Runnable() {
-      @Override
-      public void run() {
-        watchStateNode();
-      }
-    });
   }
 
   @Override
@@ -170,12 +155,6 @@ public abstract class AbstractZKServiceController extends AbstractExecutionServi
    */
   protected abstract void instanceNodeFailed(Throwable cause);
 
-  /**
-   * Called when an update on the state node is detected.
-   * @param stateNode The update state node data or {@code null} if there is an error when fetching the node data.
-   */
-  protected abstract void stateNodeUpdated(StateNode stateNode);
-
   protected synchronized void forceShutDown() {
     if (stopMessageFuture == null) {
       // In force shutdown, don't send message.
@@ -240,27 +219,6 @@ public abstract class AbstractZKServiceController extends AbstractExecutionServi
     }), instanceNodeDataCallback, Threads.SAME_THREAD_EXECUTOR);
   }
 
-  private void watchStateNode() {
-    if (!shouldProcessZKEvent()) {
-      return;
-    }
-    Futures.addCallback(zkClient.getData(getZKPath("state"), new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        if (!shouldProcessZKEvent()) {
-          return;
-        }
-        switch (event.getType()) {
-          case NodeDataChanged:
-            watchStateNode();
-            break;
-          default:
-            LOG.debug("Ignore ZK event for state node: {}", event);
-        }
-      }
-    }), stateNodeDataCallback, Threads.SAME_THREAD_EXECUTOR);
-  }
-
   /**
    * Returns true if ZK events needs to be processed.
    */
@@ -304,32 +262,4 @@ public abstract class AbstractZKServiceController extends AbstractExecutionServi
       }
     }
   }
-
-  private final class StateNodeDataCallback implements FutureCallback<NodeData> {
-
-    @Override
-    public void onSuccess(NodeData result) {
-      if (shouldProcessZKEvent()) {
-        byte[] data = result.getData();
-        if (data == null) {
-          stateNodeUpdated(null);
-          return;
-        }
-        StateNode stateNode = new GsonBuilder().registerTypeAdapter(StateNode.class, new StateNodeCodec())
-          .registerTypeAdapter(StackTraceElement.class, new StackTraceElementCodec())
-          .create()
-          .fromJson(new String(data, Charsets.UTF_8), StateNode.class);
-
-        stateNodeUpdated(stateNode);
-      }
-    }
-
-    @Override
-    public void onFailure(Throwable t) {
-      if (shouldProcessZKEvent()) {
-        LOG.error("Failed in fetching state node data.", t);
-        stateNodeUpdated(null);
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/7da6e6a4/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
index 14bac7a..a891de6 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
@@ -28,7 +28,6 @@ import org.apache.twill.api.RunId;
 import org.apache.twill.api.RuntimeSpecification;
 import org.apache.twill.filesystem.Location;
 import org.apache.twill.internal.state.Message;
-import org.apache.twill.internal.state.StateNode;
 import org.apache.twill.launcher.FindFreePort;
 import org.apache.twill.launcher.TwillLauncher;
 import org.apache.twill.zookeeper.NodeData;
@@ -217,20 +216,12 @@ public final class TwillContainerLauncher {
     }
 
     @Override
-    protected void stateNodeUpdated(StateNode stateNode) {
-      // No-op
-    }
-
-    @Override
     public ListenableFuture<Message> sendMessage(Message message) {
       return sendMessage(message, message);
     }
 
     @Override
     public synchronized void completed(int exitStatus) {
-      if (exitStatus != 0) {  // If a container terminated with exit code != 0, treat it as error
-//        fireStateChange(new StateNode(State.FAILED, new StackTraceElement[0]));
-      }
       forceShutDown();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/7da6e6a4/twill-core/src/main/java/org/apache/twill/internal/ZKServiceDecorator.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ZKServiceDecorator.java b/twill-core/src/main/java/org/apache/twill/internal/ZKServiceDecorator.java
deleted file mode 100644
index efefe36..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/ZKServiceDecorator.java
+++ /dev/null
@@ -1,497 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.AbstractService;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.Service;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import org.apache.twill.api.RunId;
-import org.apache.twill.api.ServiceController;
-import org.apache.twill.common.ServiceListenerAdapter;
-import org.apache.twill.common.Threads;
-import org.apache.twill.internal.json.StackTraceElementCodec;
-import org.apache.twill.internal.json.StateNodeCodec;
-import org.apache.twill.internal.state.Message;
-import org.apache.twill.internal.state.MessageCallback;
-import org.apache.twill.internal.state.MessageCodec;
-import org.apache.twill.internal.state.StateNode;
-import org.apache.twill.internal.state.SystemMessages;
-import org.apache.twill.zookeeper.NodeChildren;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.OperationFuture;
-import org.apache.twill.zookeeper.ZKClient;
-import org.apache.twill.zookeeper.ZKOperations;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import javax.annotation.Nullable;
-
-/**
- * A {@link Service} decorator that wrap another {@link Service} with the service states reflected
- * to ZooKeeper.
- */
-public final class ZKServiceDecorator extends AbstractService {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ZKServiceDecorator.class);
-
-  private final ZKClient zkClient;
-  private final RunId id;
-  private final Supplier<? extends JsonElement> liveNodeData;
-  private final Service decoratedService;
-  private final MessageCallbackCaller messageCallback;
-  private ExecutorService callbackExecutor;
-
-
-  public ZKServiceDecorator(ZKClient zkClient, RunId id, Supplier<? extends JsonElement> liveNodeData,
-                            Service decoratedService) {
-    this(zkClient, id, liveNodeData, decoratedService, null);
-  }
-
-  /**
-   * Creates a ZKServiceDecorator.
-   * @param zkClient ZooKeeper client
-   * @param id The run id of the service
-   * @param liveNodeData A supplier for providing information writing to live node.
-   * @param decoratedService The Service for monitoring state changes
-   * @param finalizer An optional Runnable to run when this decorator terminated.
-   */
-  public ZKServiceDecorator(ZKClient zkClient, RunId id, Supplier <? extends JsonElement> liveNodeData,
-                            Service decoratedService, @Nullable Runnable finalizer) {
-    this.zkClient = zkClient;
-    this.id = id;
-    this.liveNodeData = liveNodeData;
-    this.decoratedService = decoratedService;
-    if (decoratedService instanceof MessageCallback) {
-      this.messageCallback = new MessageCallbackCaller((MessageCallback) decoratedService, zkClient);
-    } else {
-      this.messageCallback = new MessageCallbackCaller(zkClient);
-    }
-    if (finalizer != null) {
-      addFinalizer(finalizer);
-    }
-  }
-
-  @Override
-  protected void doStart() {
-    callbackExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("message-callback"));
-    // Create the live node, if succeeded, start the decorated service, otherwise fail out.
-    Futures.addCallback(createLiveNode(), new FutureCallback<String>() {
-      @Override
-      public void onSuccess(String result) {
-        // Create nodes for states and messaging
-        StateNode stateNode = new StateNode(ServiceController.State.STARTING);
-
-        final ListenableFuture<List<String>> createFuture = Futures.allAsList(
-          ZKOperations.ignoreError(zkClient.create(getZKPath("messages"), null, CreateMode.PERSISTENT),
-                                   KeeperException.NodeExistsException.class, null),
-          zkClient.create(getZKPath("state"), encodeStateNode(stateNode), CreateMode.PERSISTENT)
-        );
-
-        createFuture.addListener(new Runnable() {
-          @Override
-          public void run() {
-            try {
-              createFuture.get();
-              // Starts the decorated service
-              decoratedService.addListener(createListener(), Threads.SAME_THREAD_EXECUTOR);
-              decoratedService.start();
-            } catch (Exception e) {
-              notifyFailed(e);
-            }
-          }
-        }, Threads.SAME_THREAD_EXECUTOR);
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        notifyFailed(t);
-      }
-    });
-
-    // Watch for session expiration, recreate the live node if reconnected after expiration.
-    zkClient.addConnectionWatcher(new Watcher() {
-      private boolean expired = false;
-
-      @Override
-      public void process(WatchedEvent event) {
-        if (event.getState() == Event.KeeperState.Expired) {
-          LOG.warn("ZK Session expired for service {} with runId {}.", decoratedService, id.getId());
-          expired = true;
-        } else if (event.getState() == Event.KeeperState.SyncConnected && expired) {
-          LOG.info("Reconnected after expiration for service {} with runId {}", decoratedService, id.getId());
-          expired = false;
-          Futures.addCallback(createLiveNode(), new FutureCallback<String>() {
-            @Override
-            public void onSuccess(String result) {
-              // All good, no-op
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-              notifyFailed(t);
-            }
-          }, Threads.SAME_THREAD_EXECUTOR);
-        }
-      }
-    });
-  }
-
-  @Override
-  protected void doStop() {
-    // Stops the decorated service
-    decoratedService.stop();
-    callbackExecutor.shutdownNow();
-  }
-
-  private void addFinalizer(final Runnable finalizer) {
-    addListener(new ServiceListenerAdapter() {
-      @Override
-      public void terminated(State from) {
-        try {
-          finalizer.run();
-        } catch (Throwable t) {
-          LOG.warn("Exception when running finalizer.", t);
-        }
-      }
-
-      @Override
-      public void failed(State from, Throwable failure) {
-        try {
-          finalizer.run();
-        } catch (Throwable t) {
-          LOG.warn("Exception when running finalizer.", t);
-        }
-      }
-    }, Threads.SAME_THREAD_EXECUTOR);
-  }
-
-  private OperationFuture<String> createLiveNode() {
-    String liveNode = getLiveNodePath();
-    LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNode);
-
-    JsonObject content = new JsonObject();
-    content.add("data", liveNodeData.get());
-    return ZKOperations.ignoreError(zkClient.create(liveNode, encodeJson(content), CreateMode.EPHEMERAL),
-                                    KeeperException.NodeExistsException.class, liveNode);
-  }
-
-  private OperationFuture<String> removeLiveNode() {
-    String liveNode = getLiveNodePath();
-    LOG.info("Remove live node {}{}", zkClient.getConnectString(), liveNode);
-    return ZKOperations.ignoreError(zkClient.delete(liveNode), KeeperException.NoNodeException.class, liveNode);
-  }
-
-  private OperationFuture<String> removeServiceNode() {
-    String serviceNode = String.format("/%s", id.getId());
-    LOG.info("Remove service node {}{}", zkClient.getConnectString(), serviceNode);
-    return ZKOperations.recursiveDelete(zkClient, serviceNode);
-  }
-
-  private void watchMessages() {
-    final String messagesPath = getZKPath("messages");
-    Futures.addCallback(zkClient.getChildren(messagesPath, new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        // TODO: Do we need to deal with other type of events?
-        if (event.getType() == Event.EventType.NodeChildrenChanged && decoratedService.isRunning()) {
-          watchMessages();
-        }
-      }
-    }), new FutureCallback<NodeChildren>() {
-      @Override
-      public void onSuccess(NodeChildren result) {
-        // Sort by the name, which is the messageId. Assumption is that message ids is ordered by time.
-        List<String> messages = Lists.newArrayList(result.getChildren());
-        Collections.sort(messages);
-        for (String messageId : messages) {
-          processMessage(messagesPath + "/" + messageId, messageId);
-        }
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        // TODO: what could be done besides just logging?
-        LOG.error("Failed to watch messages.", t);
-      }
-    });
-  }
-
-  private void processMessage(final String path, final String messageId) {
-    Futures.addCallback(zkClient.getData(path), new FutureCallback<NodeData>() {
-      @Override
-      public void onSuccess(NodeData result) {
-        Message message = MessageCodec.decode(result.getData());
-        if (message == null) {
-          LOG.error("Failed to decode message for " + messageId + " in " + path);
-          listenFailure(zkClient.delete(path, result.getStat().getVersion()));
-          return;
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Message received from " + path + ": " + new String(MessageCodec.encode(message), Charsets.UTF_8));
-        }
-        if (handleStopMessage(message, getDeleteSupplier(path, result.getStat().getVersion()))) {
-          return;
-        }
-        messageCallback.onReceived(callbackExecutor, path, result.getStat().getVersion(), messageId, message);
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        LOG.error("Failed to fetch message content.", t);
-      }
-    });
-  }
-
-  private <V> boolean handleStopMessage(Message message, final Supplier<OperationFuture<V>> postHandleSupplier) {
-    if (message.getType() == Message.Type.SYSTEM && SystemMessages.STOP_COMMAND.equals(message.getCommand())) {
-      callbackExecutor.execute(new Runnable() {
-        @Override
-        public void run() {
-          decoratedService.stop().addListener(new Runnable() {
-
-            @Override
-            public void run() {
-              stopServiceOnComplete(postHandleSupplier.get(), ZKServiceDecorator.this);
-            }
-          }, MoreExecutors.sameThreadExecutor());
-        }
-      });
-      return true;
-    }
-    return false;
-  }
-
-
-  private Supplier<OperationFuture<String>> getDeleteSupplier(final String path, final int version) {
-    return new Supplier<OperationFuture<String>>() {
-      @Override
-      public OperationFuture<String> get() {
-        return zkClient.delete(path, version);
-      }
-    };
-  }
-
-  private Listener createListener() {
-    return new DecoratedServiceListener();
-  }
-
-  private <V> byte[] encode(V data, Class<? extends V> clz) {
-    return new GsonBuilder().registerTypeAdapter(StateNode.class, new StateNodeCodec())
-                            .registerTypeAdapter(StackTraceElement.class, new StackTraceElementCodec())
-                            .create()
-      .toJson(data, clz).getBytes(Charsets.UTF_8);
-  }
-
-  private byte[] encodeStateNode(StateNode stateNode) {
-    return encode(stateNode, StateNode.class);
-  }
-
-  private <V extends JsonElement> byte[] encodeJson(V json) {
-    return new Gson().toJson(json).getBytes(Charsets.UTF_8);
-  }
-
-  private String getZKPath(String path) {
-    return String.format("/%s/%s", id, path);
-  }
-
-  private String getLiveNodePath() {
-    return "/instances/" + id;
-  }
-
-  private static <V> OperationFuture<V> listenFailure(final OperationFuture<V> operationFuture) {
-    operationFuture.addListener(new Runnable() {
-
-      @Override
-      public void run() {
-        try {
-          if (!operationFuture.isCancelled()) {
-            operationFuture.get();
-          }
-        } catch (Exception e) {
-          // TODO: what could be done besides just logging?
-          LOG.error("Operation execution failed for " + operationFuture.getRequestPath(), e);
-        }
-      }
-    }, Threads.SAME_THREAD_EXECUTOR);
-    return operationFuture;
-  }
-
-  private static final class MessageCallbackCaller {
-    private final MessageCallback callback;
-    private final ZKClient zkClient;
-
-    private MessageCallbackCaller(ZKClient zkClient) {
-      this(null, zkClient);
-    }
-
-    private MessageCallbackCaller(MessageCallback callback, ZKClient zkClient) {
-      this.callback = callback;
-      this.zkClient = zkClient;
-    }
-
-    public void onReceived(Executor executor, final String path,
-                           final int version, final String id, final Message message) {
-      if (callback == null) {
-        // Simply delete the message
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Ignoring incoming message from " + path + ": " + message);
-        }
-        listenFailure(zkClient.delete(path, version));
-        return;
-      }
-
-      executor.execute(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            // Message process is synchronous for now. Making it async needs more thoughts about race conditions.
-            // The executor is the callbackExecutor which is a single thread executor.
-            callback.onReceived(id, message).get();
-          } catch (Throwable t) {
-            LOG.error("Exception when processing message: {}, {}, {}", id, message, path, t);
-          } finally {
-            listenFailure(zkClient.delete(path, version));
-          }
-        }
-      });
-    }
-  }
-
-  private final class DecoratedServiceListener implements Listener {
-    private volatile boolean zkFailure = false;
-
-    @Override
-    public void starting() {
-      LOG.info("Starting: " + id);
-      saveState(ServiceController.State.STARTING);
-    }
-
-    @Override
-    public void running() {
-      LOG.info("Running: " + id);
-      notifyStarted();
-      watchMessages();
-      saveState(ServiceController.State.RUNNING);
-    }
-
-    @Override
-    public void stopping(State from) {
-      LOG.info("Stopping: " + id);
-      saveState(ServiceController.State.STOPPING);
-    }
-
-    @Override
-    public void terminated(State from) {
-      LOG.info("Terminated: " + from + " " + id);
-      if (zkFailure) {
-        return;
-      }
-
-      ImmutableList<OperationFuture<String>> futures = ImmutableList.of(removeLiveNode(), removeServiceNode());
-      final ListenableFuture<List<String>> future = Futures.allAsList(futures);
-      Futures.successfulAsList(futures).addListener(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            future.get();
-            LOG.info("Service and state node removed");
-            notifyStopped();
-          } catch (Exception e) {
-            LOG.warn("Failed to remove ZK nodes.", e);
-            notifyFailed(e);
-          }
-        }
-      }, Threads.SAME_THREAD_EXECUTOR);
-    }
-
-    @Override
-    public void failed(State from, final Throwable failure) {
-      LOG.info("Failed: {} {}.", from, id, failure);
-      if (zkFailure) {
-        return;
-      }
-
-      ImmutableList<OperationFuture<String>> futures = ImmutableList.of(removeLiveNode(), removeServiceNode());
-      Futures.successfulAsList(futures).addListener(new Runnable() {
-        @Override
-        public void run() {
-          LOG.info("Service and state node removed");
-          notifyFailed(failure);
-        }
-      }, Threads.SAME_THREAD_EXECUTOR);
-    }
-
-    private void saveState(ServiceController.State state) {
-      if (zkFailure) {
-        return;
-      }
-      StateNode stateNode = new StateNode(state);
-      stopOnFailure(zkClient.setData(getZKPath("state"), encodeStateNode(stateNode)));
-    }
-
-    private <V> void stopOnFailure(final OperationFuture<V> future) {
-      future.addListener(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            future.get();
-          } catch (final Exception e) {
-            LOG.error("ZK operation failed", e);
-            zkFailure = true;
-            decoratedService.stop().addListener(new Runnable() {
-              @Override
-              public void run() {
-                notifyFailed(e);
-              }
-            }, Threads.SAME_THREAD_EXECUTOR);
-          }
-        }
-      }, Threads.SAME_THREAD_EXECUTOR);
-    }
-  }
-
-  private <V> ListenableFuture<State> stopServiceOnComplete(ListenableFuture <V> future, final Service service) {
-    return Futures.transform(future, new AsyncFunction<V, State>() {
-      @Override
-      public ListenableFuture<State> apply(V input) throws Exception {
-        return service.stop();
-      }
-    });
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/7da6e6a4/twill-core/src/main/java/org/apache/twill/internal/json/StateNodeCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/StateNodeCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/StateNodeCodec.java
deleted file mode 100644
index f620231..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/json/StateNodeCodec.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-import org.apache.twill.api.ServiceController;
-import org.apache.twill.internal.state.StateNode;
-
-import java.lang.reflect.Type;
-
-/**
- * Gson codec for {@link StateNode}.
- */
-public final class StateNodeCodec implements JsonSerializer<StateNode>, JsonDeserializer<StateNode> {
-
-  @Override
-  public StateNode deserialize(JsonElement json, Type typeOfT,
-                               JsonDeserializationContext context) throws JsonParseException {
-    JsonObject jsonObj = json.getAsJsonObject();
-    ServiceController.State state = ServiceController.State.valueOf(jsonObj.get("state").getAsString());
-    String errorMessage = jsonObj.has("errorMessage") ? jsonObj.get("errorMessage").getAsString() : null;
-
-    return new StateNode(state, errorMessage,
-                         context.<StackTraceElement[]>deserialize(jsonObj.get("stackTraces"),
-                                                                  StackTraceElement[].class));
-  }
-
-  @Override
-  public JsonElement serialize(StateNode src, Type typeOfSrc, JsonSerializationContext context) {
-    JsonObject jsonObj = new JsonObject();
-    jsonObj.addProperty("state", src.getState().name());
-    if (src.getErrorMessage() != null) {
-      jsonObj.addProperty("errorMessage", src.getErrorMessage());
-    }
-    if (src.getStackTraces() != null) {
-      jsonObj.add("stackTraces", context.serialize(src.getStackTraces(), StackTraceElement[].class));
-    }
-    return jsonObj;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/7da6e6a4/twill-core/src/main/java/org/apache/twill/internal/state/StateNode.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/state/StateNode.java b/twill-core/src/main/java/org/apache/twill/internal/state/StateNode.java
deleted file mode 100644
index 6cdab48..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/state/StateNode.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.state;
-
-import com.google.common.util.concurrent.Service;
-import org.apache.twill.api.ServiceController;
-
-/**
- *
- */
-public final class StateNode {
-
-  private final ServiceController.State state;
-  private final String errorMessage;
-  private final StackTraceElement[] stackTraces;
-
-  /**
-   * Constructs a StateNode with the given state.
-   */
-  public StateNode(ServiceController.State state) {
-    this(state, null, null);
-  }
-
-  /**
-   * Constructs a StateNode with {@link ServiceController.State#FAILED} caused by the given error.
-   */
-  public StateNode(Throwable error) {
-    this(Service.State.FAILED, error.getMessage(), error.getStackTrace());
-  }
-
-  /**
-   * Constructs a StateNode with the given state, error and stacktraces.
-   * This constructor should only be used by the StateNodeCodec.
-   */
-  public StateNode(ServiceController.State state, String errorMessage, StackTraceElement[] stackTraces) {
-    this.state = state;
-    this.errorMessage = errorMessage;
-    this.stackTraces = stackTraces;
-  }
-
-  public ServiceController.State getState() {
-    return state;
-  }
-
-  public String getErrorMessage() {
-    return errorMessage;
-  }
-
-  public StackTraceElement[] getStackTraces() {
-    return stackTraces;
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder builder = new StringBuilder("state=").append(state);
-
-    if (errorMessage != null) {
-      builder.append("\n").append("error=").append(errorMessage);
-    }
-    if (stackTraces != null) {
-      builder.append("\n");
-      for (StackTraceElement stackTrace : stackTraces) {
-        builder.append("\tat ").append(stackTrace.toString()).append("\n");
-      }
-    }
-    return builder.toString();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/7da6e6a4/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java b/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java
index 4d968bc..62fdd49 100644
--- a/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java
+++ b/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java
@@ -17,11 +17,8 @@
  */
 package org.apache.twill.internal;
 
-import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.Service;
-import com.google.gson.JsonObject;
 import org.apache.twill.api.Command;
 import org.apache.twill.api.ResourceReport;
 import org.apache.twill.api.RunId;
@@ -30,7 +27,6 @@ import org.apache.twill.api.TwillController;
 import org.apache.twill.api.logging.LogHandler;
 import org.apache.twill.common.ServiceListenerAdapter;
 import org.apache.twill.common.Threads;
-import org.apache.twill.internal.state.StateNode;
 import org.apache.twill.internal.zookeeper.InMemoryZKServer;
 import org.apache.twill.zookeeper.NodeData;
 import org.apache.twill.zookeeper.ZKClient;
@@ -126,7 +122,7 @@ public class ControllerTest {
 
       service.stop();
 
-      Assert.assertTrue(stopLatch.await(2, TimeUnit.SECONDS));
+      Assert.assertTrue(stopLatch.await(20000, TimeUnit.SECONDS));
 
     } finally {
       zkServer.stopAndWait();
@@ -168,19 +164,30 @@ public class ControllerTest {
   }
 
   private Service createService(ZKClient zkClient, RunId runId) {
-    return new ZKServiceDecorator(
-      zkClient, runId, Suppliers.ofInstance(new JsonObject()), new AbstractIdleService() {
+    return new AbstractTwillService(zkClient, runId) {
+
+      private final CountDownLatch stopLatch = new CountDownLatch(1);
 
       @Override
-      protected void startUp() throws Exception {
+      protected void doStart() throws Exception {
         LOG.info("Start");
       }
 
       @Override
-      protected void shutDown() throws Exception {
+      protected void doRun() throws Exception {
+        stopLatch.await();
+      }
+
+      @Override
+      protected void doStop() throws Exception {
         LOG.info("Stop");
       }
-    });
+
+      @Override
+      protected void triggerShutdown() {
+        stopLatch.countDown();
+      }
+    };
   }
 
   private TwillController getController(ZKClient zkClient, RunId runId) {
@@ -205,11 +212,6 @@ public class ControllerTest {
       }
 
       @Override
-      protected void stateNodeUpdated(StateNode stateNode) {
-        // No-op
-      }
-
-      @Override
       public ResourceReport getResourceReport() {
         return null;
       }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/7da6e6a4/twill-core/src/test/java/org/apache/twill/internal/state/ZKServiceDecoratorTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/internal/state/ZKServiceDecoratorTest.java b/twill-core/src/test/java/org/apache/twill/internal/state/ZKServiceDecoratorTest.java
deleted file mode 100644
index 401d6e3..0000000
--- a/twill-core/src/test/java/org/apache/twill/internal/state/ZKServiceDecoratorTest.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.state;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Suppliers;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.Service;
-import com.google.gson.Gson;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import org.apache.twill.api.RunId;
-import org.apache.twill.internal.RunIds;
-import org.apache.twill.internal.ZKServiceDecorator;
-import org.apache.twill.internal.zookeeper.InMemoryZKServer;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.twill.zookeeper.ZKClients;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- *
- */
-public class ZKServiceDecoratorTest {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ZKServiceDecoratorTest.class);
-
-  @Test
-  public void testStateTransition() throws InterruptedException, ExecutionException, TimeoutException {
-    InMemoryZKServer zkServer = InMemoryZKServer.builder().build();
-    zkServer.startAndWait();
-
-    try {
-      final String namespace = Joiner.on('/').join("/twill", RunIds.generate(), "runnables", "Runner1");
-
-      final ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
-      zkClient.startAndWait();
-      zkClient.create(namespace, null, CreateMode.PERSISTENT).get();
-
-      try {
-        JsonObject content = new JsonObject();
-        content.addProperty("containerId", "container-123");
-        content.addProperty("host", "localhost");
-
-        RunId runId = RunIds.generate();
-        final Semaphore semaphore = new Semaphore(0);
-        ZKServiceDecorator service = new ZKServiceDecorator(ZKClients.namespace(zkClient, namespace),
-                                                            runId, Suppliers.ofInstance(content),
-                                                            new AbstractIdleService() {
-          @Override
-          protected void startUp() throws Exception {
-            Preconditions.checkArgument(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Fail to start");
-          }
-
-          @Override
-          protected void shutDown() throws Exception {
-            Preconditions.checkArgument(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Fail to stop");
-          }
-        });
-
-        final String runnablePath = namespace + "/" + runId.getId();
-        final AtomicReference<String> stateMatch = new AtomicReference<String>("STARTING");
-        watchDataChange(zkClient, runnablePath + "/state", semaphore, stateMatch);
-        Assert.assertEquals(Service.State.RUNNING, service.start().get(5, TimeUnit.SECONDS));
-
-        stateMatch.set("STOPPING");
-        Assert.assertEquals(Service.State.TERMINATED, service.stop().get(5, TimeUnit.SECONDS));
-
-      } finally {
-        zkClient.stopAndWait();
-      }
-    } finally {
-      zkServer.stopAndWait();
-    }
-  }
-
-  private void watchDataChange(final ZKClientService zkClient, final String path,
-                               final Semaphore semaphore, final AtomicReference<String> stateMatch) {
-    Futures.addCallback(zkClient.getData(path, new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        if (event.getType() == Event.EventType.NodeDataChanged) {
-          watchDataChange(zkClient, path, semaphore, stateMatch);
-        }
-      }
-    }), new FutureCallback<NodeData>() {
-      @Override
-      public void onSuccess(NodeData result) {
-        String content = new String(result.getData(), Charsets.UTF_8);
-        JsonObject json = new Gson().fromJson(content, JsonElement.class).getAsJsonObject();
-        if (stateMatch.get().equals(json.get("state").getAsString())) {
-          semaphore.release();
-        }
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        exists();
-      }
-
-      private void exists() {
-        Futures.addCallback(zkClient.exists(path, new Watcher() {
-          @Override
-          public void process(WatchedEvent event) {
-            if (event.getType() == Event.EventType.NodeCreated) {
-              watchDataChange(zkClient, path, semaphore, stateMatch);
-            }
-          }
-        }), new FutureCallback<Stat>() {
-          @Override
-          public void onSuccess(Stat result) {
-            if (result != null) {
-              watchDataChange(zkClient, path, semaphore, stateMatch);
-            }
-          }
-
-          @Override
-          public void onFailure(Throwable t) {
-            LOG.error(t.getMessage(), t);
-          }
-        });
-      }
-    });
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/7da6e6a4/twill-yarn/src/main/java/org/apache/twill/internal/AbstractTwillService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/twill-yarn/src/main/java/org/apache/twill/internal/AbstractTwillService.java
deleted file mode 100644
index 706039d..0000000
--- a/twill-yarn/src/main/java/org/apache/twill/internal/AbstractTwillService.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Service;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.twill.filesystem.Location;
-import org.apache.twill.internal.state.Message;
-import org.apache.twill.internal.state.SystemMessages;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.concurrent.Executor;
-
-/**
- * A base implementation of {@link Service} handle secure token update.
- */
-public abstract class AbstractTwillService implements Service {
-
-  private static final Logger LOG = LoggerFactory.getLogger(AbstractTwillService.class);
-
-  protected final Location applicationLocation;
-
-  protected volatile Credentials credentials;
-
-  protected AbstractTwillService(Location applicationLocation) {
-    this.applicationLocation = applicationLocation;
-  }
-
-  protected abstract Service getServiceDelegate();
-
-  /**
-   * Returns the location of the secure store, or {@code null} if either not running in secure mode or an error
-   * occur when trying to acquire the location.
-   */
-  protected final Location getSecureStoreLocation() {
-    if (!UserGroupInformation.isSecurityEnabled()) {
-      return null;
-    }
-    try {
-      return applicationLocation.append(Constants.Files.CREDENTIALS);
-    } catch (IOException e) {
-      LOG.error("Failed to create secure store location.", e);
-      return null;
-    }
-  }
-
-  /**
-   * Attempts to handle secure store update.
-   *
-   * @param message The message received
-   * @return {@code true} if the message requests for secure store update, {@code false} otherwise.
-   */
-  protected final boolean handleSecureStoreUpdate(Message message) {
-    if (!SystemMessages.SECURE_STORE_UPDATED.equals(message)) {
-      return false;
-    }
-
-    // If not in secure mode, simply ignore the message.
-    if (!UserGroupInformation.isSecurityEnabled()) {
-      return true;
-    }
-
-    try {
-      Credentials credentials = new Credentials();
-      Location location = getSecureStoreLocation();
-      DataInputStream input = new DataInputStream(new BufferedInputStream(location.getInputStream()));
-      try {
-        credentials.readTokenStorageStream(input);
-      } finally {
-        input.close();
-      }
-
-      UserGroupInformation.getCurrentUser().addCredentials(credentials);
-      this.credentials = credentials;
-
-      LOG.info("Secure store updated from {}.", location.toURI());
-
-    } catch (Throwable t) {
-      LOG.error("Failed to update secure store.", t);
-    }
-
-    return true;
-  }
-
-  @Override
-  public final ListenableFuture<State> start() {
-    return getServiceDelegate().start();
-  }
-
-  @Override
-  public final State startAndWait() {
-    return Futures.getUnchecked(start());
-  }
-
-  @Override
-  public final boolean isRunning() {
-    return getServiceDelegate().isRunning();
-  }
-
-  @Override
-  public final State state() {
-    return getServiceDelegate().state();
-  }
-
-  @Override
-  public final ListenableFuture<State> stop() {
-    return getServiceDelegate().stop();
-  }
-
-  @Override
-  public final State stopAndWait() {
-    return Futures.getUnchecked(stop());
-  }
-
-  @Override
-  public final void addListener(Listener listener, Executor executor) {
-    getServiceDelegate().addListener(listener, executor);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/7da6e6a4/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index b4f9997..d8659dd 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -33,14 +33,10 @@ import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 import com.google.common.io.InputSupplier;
 import com.google.common.reflect.TypeToken;
-import com.google.common.util.concurrent.AbstractExecutionThreadService;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Service;
 import com.google.common.util.concurrent.SettableFuture;
-import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -60,7 +56,6 @@ import org.apache.twill.api.TwillRunResources;
 import org.apache.twill.api.TwillSpecification;
 import org.apache.twill.common.Threads;
 import org.apache.twill.filesystem.Location;
-import org.apache.twill.internal.AbstractTwillService;
 import org.apache.twill.internal.Configs;
 import org.apache.twill.internal.Constants;
 import org.apache.twill.internal.ContainerInfo;
@@ -69,16 +64,15 @@ import org.apache.twill.internal.EnvKeys;
 import org.apache.twill.internal.JvmOptions;
 import org.apache.twill.internal.ProcessLauncher;
 import org.apache.twill.internal.TwillContainerLauncher;
-import org.apache.twill.internal.ZKServiceDecorator;
 import org.apache.twill.internal.json.JvmOptionsCodec;
 import org.apache.twill.internal.json.LocalFileCodec;
 import org.apache.twill.internal.json.TwillSpecificationAdapter;
 import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
 import org.apache.twill.internal.logging.Loggings;
 import org.apache.twill.internal.state.Message;
-import org.apache.twill.internal.state.MessageCallback;
 import org.apache.twill.internal.utils.Instances;
 import org.apache.twill.internal.utils.Networks;
+import org.apache.twill.internal.yarn.AbstractYarnTwillService;
 import org.apache.twill.internal.yarn.YarnAMClient;
 import org.apache.twill.internal.yarn.YarnAMClientFactory;
 import org.apache.twill.internal.yarn.YarnContainerInfo;
@@ -109,7 +103,7 @@ import java.util.concurrent.TimeUnit;
 /**
  *
  */
-public final class ApplicationMasterService extends AbstractTwillService {
+public final class ApplicationMasterService extends AbstractYarnTwillService {
 
   private static final Logger LOG = LoggerFactory.getLogger(ApplicationMasterService.class);
 
@@ -120,7 +114,6 @@ public final class ApplicationMasterService extends AbstractTwillService {
   private final ZKClient zkClient;
   private final TwillSpecification twillSpec;
   private final ApplicationMasterLiveNodeData amLiveNode;
-  private final ZKServiceDecorator serviceDelegate;
   private final RunningContainers runningContainers;
   private final ExpectedContainers expectedContainers;
   private final TrackerService trackerService;
@@ -131,13 +124,14 @@ public final class ApplicationMasterService extends AbstractTwillService {
   private final Location applicationLocation;
   private final PlacementPolicyManager placementPolicyManager;
 
+  private volatile boolean stopped;
   private EmbeddedKafkaServer kafkaServer;
   private Queue<RunnableContainerRequest> runnableContainerRequests;
   private ExecutorService instanceChangeExecutor;
 
   public ApplicationMasterService(RunId runId, ZKClient zkClient, File twillSpecFile,
                                   YarnAMClientFactory amClientFactory, Location applicationLocation) throws Exception {
-    super(applicationLocation);
+    super(zkClient, runId, applicationLocation);
 
     this.runId = runId;
     this.twillSpec = TwillSpecificationAdapter.create().fromJson(twillSpecFile);
@@ -149,17 +143,10 @@ public final class ApplicationMasterService extends AbstractTwillService {
     this.reservedMemory = getReservedMemory();
     this.placementPolicyManager = new PlacementPolicyManager(twillSpec.getPlacementPolicies());
 
-    amLiveNode = new ApplicationMasterLiveNodeData(Integer.parseInt(System.getenv(EnvKeys.YARN_APP_ID)),
-                                                   Long.parseLong(System.getenv(EnvKeys.YARN_APP_ID_CLUSTER_TIME)),
-                                                   amClient.getContainerId().toString());
+    this.amLiveNode = new ApplicationMasterLiveNodeData(Integer.parseInt(System.getenv(EnvKeys.YARN_APP_ID)),
+                                                        Long.parseLong(System.getenv(EnvKeys.YARN_APP_ID_CLUSTER_TIME)),
+                                                        amClient.getContainerId().toString());
 
-    serviceDelegate = new ZKServiceDecorator(zkClient, runId, createLiveNodeDataSupplier(),
-                                             new ServiceDelegate(), new Runnable() {
-      @Override
-      public void run() {
-        amClient.stopAndWait();
-      }
-    });
     expectedContainers = initExpectedContainers(twillSpec);
     runningContainers = initRunningContainers(amClient.getContainerId(), amClient.getHost());
     trackerService = new TrackerService(new Supplier<ResourceReport>() {
@@ -211,15 +198,6 @@ public final class ApplicationMasterService extends AbstractTwillService {
     }
   }
 
-  private Supplier<? extends JsonElement> createLiveNodeDataSupplier() {
-    return new Supplier<JsonElement>() {
-      @Override
-      public JsonElement get() {
-        return new Gson().toJsonTree(amLiveNode);
-      }
-    };
-  }
-
   private RunningContainers initRunningContainers(ContainerId appMasterContainerId,
                                                   String appMasterHost) throws Exception {
     TwillRunResources appMasterResources = new DefaultTwillRunResources(
@@ -240,7 +218,8 @@ public final class ApplicationMasterService extends AbstractTwillService {
     return new ExpectedContainers(expectedCounts);
   }
 
-  private void doStart() throws Exception {
+  @Override
+  protected void doStart() throws Exception {
     LOG.info("Start application master with spec: " + TwillSpecificationAdapter.create().toJson(twillSpec));
 
     // initialize the event handler, if it fails, it will fail the application.
@@ -276,7 +255,8 @@ public final class ApplicationMasterService extends AbstractTwillService {
     runnableContainerRequests = initContainerRequests();
   }
 
-  private void doStop() throws Exception {
+  @Override
+  protected void doStop() throws Exception {
     Thread.interrupted();     // This is just to clear the interrupt flag
 
     LOG.info("Stop application master with spec: {}", TwillSpecificationAdapter.create().toJson(twillSpec));
@@ -323,18 +303,65 @@ public final class ApplicationMasterService extends AbstractTwillService {
       LOG.error("Failed to stop tracker service.", e);
     } finally {
       try {
-        // App location cleanup
-        cleanupDir();
-        Loggings.forceFlush();
-        // Sleep a short while to let kafka clients to have chance to fetch the log
-        TimeUnit.SECONDS.sleep(1);
+        try {
+          // App location cleanup
+          cleanupDir();
+          Loggings.forceFlush();
+          // Sleep a short while to let kafka clients to have chance to fetch the log
+          TimeUnit.SECONDS.sleep(1);
+        } finally {
+          kafkaServer.stopAndWait();
+          LOG.info("Kafka server stopped");
+        }
       } finally {
-        kafkaServer.stopAndWait();
-        LOG.info("Kafka server stopped");
+        // Stops the AMClient
+        amClient.stopAndWait();
       }
     }
   }
 
+  @Override
+  protected Object getLiveNodeData() {
+    return amLiveNode;
+  }
+
+  @Override
+  public ListenableFuture<String> onReceived(String messageId, Message message) {
+    LOG.debug("Message received: {} {}.", messageId, message);
+
+    SettableFuture<String> result = SettableFuture.create();
+    Runnable completion = getMessageCompletion(messageId, result);
+
+    if (handleSecureStoreUpdate(message)) {
+      runningContainers.sendToAll(message, completion);
+      return result;
+    }
+
+    if (handleSetInstances(message, completion)) {
+      return result;
+    }
+
+    // Replicate messages to all runnables
+    if (message.getScope() == Message.Scope.ALL_RUNNABLE) {
+      runningContainers.sendToAll(message, completion);
+      return result;
+    }
+
+    // Replicate message to a particular runnable.
+    if (message.getScope() == Message.Scope.RUNNABLE) {
+      runningContainers.sendToRunnable(message.getRunnableName(), message, completion);
+      return result;
+    }
+
+    LOG.info("Message ignored. {}", message);
+    return Futures.immediateFuture(messageId);
+  }
+
+  @Override
+  protected void triggerShutdown() {
+    stopped = true;
+  }
+
   private void cleanupDir() {
     try {
       if (applicationLocation.delete(true)) {
@@ -347,8 +374,8 @@ public final class ApplicationMasterService extends AbstractTwillService {
     }
   }
 
-
-  private void doRun() throws Exception {
+  @Override
+  protected void doRun() throws Exception {
     // The main loop
     Map.Entry<AllocationSpecification, ? extends Collection<RuntimeSpecification>> currentRequest = null;
     final Queue<ProvisionRequest> provisioning = Lists.newLinkedList();
@@ -368,7 +395,7 @@ public final class ApplicationMasterService extends AbstractTwillService {
     long requestStartTime = 0;
     boolean isRequestRelaxed = false;
     long nextTimeoutCheck = System.currentTimeMillis() + Constants.PROVISION_TIMEOUT;
-    while (isRunning()) {
+    while (!stopped) {
       // Call allocate. It has to be made at first in order to be able to get cluster resource availability.
       amClient.allocate(0.0f, allocateHandler);
 
@@ -730,37 +757,6 @@ public final class ApplicationMasterService extends AbstractTwillService {
     return prop;
   }
 
-  private ListenableFuture<String> processMessage(final String messageId, Message message) {
-    LOG.debug("Message received: {} {}.", messageId, message);
-
-    SettableFuture<String> result = SettableFuture.create();
-    Runnable completion = getMessageCompletion(messageId, result);
-
-    if (handleSecureStoreUpdate(message)) {
-      runningContainers.sendToAll(message, completion);
-      return result;
-    }
-
-    if (handleSetInstances(message, completion)) {
-      return result;
-    }
-
-    // Replicate messages to all runnables
-    if (message.getScope() == Message.Scope.ALL_RUNNABLE) {
-      runningContainers.sendToAll(message, completion);
-      return result;
-    }
-
-    // Replicate message to a particular runnable.
-    if (message.getScope() == Message.Scope.RUNNABLE) {
-      runningContainers.sendToRunnable(message.getRunnableName(), message, completion);
-      return result;
-    }
-
-    LOG.info("Message ignored. {}", message);
-    return Futures.immediateFuture(messageId);
-  }
-
   /**
    * Attempts to change the number of running instances.
    * @return {@code true} if the message does requests for changes in number of running instances of a runnable,
@@ -888,52 +884,4 @@ public final class ApplicationMasterService extends AbstractTwillService {
     capability.setMemory(resourceSpec.getMemorySize());
     return capability;
   }
-
-  @Override
-  protected Service getServiceDelegate() {
-    return serviceDelegate;
-  }
-
-  /**
-   * A private class for service lifecycle. It's done this way so that we can have {@link ZKServiceDecorator} to
-   * wrap around this to reflect status in ZK.
-   */
-  private final class ServiceDelegate extends AbstractExecutionThreadService implements MessageCallback {
-
-    private volatile Thread runThread;
-
-    @Override
-    protected void run() throws Exception {
-      runThread = Thread.currentThread();
-      try {
-        doRun();
-      } catch (InterruptedException e) {
-        // It's ok to get interrupted exception, as it's a signal to stop
-        Thread.currentThread().interrupt();
-      }
-    }
-
-    @Override
-    protected void startUp() throws Exception {
-      doStart();
-    }
-
-    @Override
-    protected void shutDown() throws Exception {
-      doStop();
-    }
-
-    @Override
-    protected void triggerShutdown() {
-      Thread runThread = this.runThread;
-      if (runThread != null) {
-        runThread.interrupt();
-      }
-    }
-
-    @Override
-    public ListenableFuture<String> onReceived(String messageId, Message message) {
-      return processMessage(messageId, message);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/7da6e6a4/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
index a476bc1..dc3761f 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
@@ -18,29 +18,22 @@
 package org.apache.twill.internal.container;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.util.concurrent.AbstractExecutionThreadService;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Service;
 import com.google.common.util.concurrent.SettableFuture;
-import com.google.gson.Gson;
-import com.google.gson.JsonElement;
 import org.apache.twill.api.Command;
 import org.apache.twill.api.RunId;
 import org.apache.twill.api.TwillRunnable;
 import org.apache.twill.api.TwillRunnableSpecification;
 import org.apache.twill.common.Threads;
 import org.apache.twill.filesystem.Location;
-import org.apache.twill.internal.AbstractTwillService;
 import org.apache.twill.internal.BasicTwillContext;
 import org.apache.twill.internal.ContainerInfo;
 import org.apache.twill.internal.ContainerLiveNodeData;
-import org.apache.twill.internal.ZKServiceDecorator;
 import org.apache.twill.internal.logging.Loggings;
 import org.apache.twill.internal.state.Message;
-import org.apache.twill.internal.state.MessageCallback;
 import org.apache.twill.internal.utils.Instances;
+import org.apache.twill.internal.yarn.AbstractYarnTwillService;
 import org.apache.twill.zookeeper.ZKClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,31 +44,46 @@ import java.util.concurrent.Executors;
 /**
  * This class act as a yarn container and run a {@link org.apache.twill.api.TwillRunnable}.
  */
-public final class TwillContainerService extends AbstractTwillService {
+public final class TwillContainerService extends AbstractYarnTwillService {
 
   private static final Logger LOG = LoggerFactory.getLogger(TwillContainerService.class);
 
   private final TwillRunnableSpecification specification;
   private final ClassLoader classLoader;
   private final BasicTwillContext context;
-  private final ZKServiceDecorator serviceDelegate;
+  private final ContainerLiveNodeData containerLiveNodeData;
   private ExecutorService commandExecutor;
   private TwillRunnable runnable;
 
   public TwillContainerService(BasicTwillContext context, ContainerInfo containerInfo, ZKClient zkClient,
                                RunId runId, TwillRunnableSpecification specification, ClassLoader classLoader,
                                Location applicationLocation) {
-    super(applicationLocation);
+    super(zkClient, runId, applicationLocation);
 
     this.specification = specification;
     this.classLoader = classLoader;
-    this.serviceDelegate = new ZKServiceDecorator(zkClient, runId,
-                                                  createLiveNodeSupplier(createLiveNodeData(containerInfo)),
-                                                  new ServiceDelegate());
+    this.containerLiveNodeData = createLiveNodeData(containerInfo);
     this.context = context;
   }
 
-  private ListenableFuture<String> processMessage(final String messageId, final Message message) {
+  private ContainerLiveNodeData createLiveNodeData(ContainerInfo containerInfo) {
+    // if debugging is enabled, log the port and register it in service discovery.
+    String debugPort = System.getProperty("twill.debug.port");
+    if (debugPort != null) {
+      LOG.info("JVM is listening for debugger on port {}", debugPort);
+    }
+    return new ContainerLiveNodeData(containerInfo.getId(),
+                                     containerInfo.getHost().getCanonicalHostName(),
+                                     debugPort);
+  }
+
+  @Override
+  protected Object getLiveNodeData() {
+    return containerLiveNodeData;
+  }
+
+  @Override
+  public ListenableFuture<String> onReceived(final String messageId, final Message message) {
     LOG.debug("Message received: {} {}.", messageId, message);
 
     if (handleSecureStoreUpdate(message)) {
@@ -85,7 +93,7 @@ public final class TwillContainerService extends AbstractTwillService {
     final SettableFuture<String> result = SettableFuture.create();
     Command command = message.getCommand();
     if (message.getType() == Message.Type.SYSTEM
-          && "instances".equals(command.getCommand()) && command.getOptions().containsKey("count")) {
+      && "instances".equals(command.getCommand()) && command.getOptions().containsKey("count")) {
       context.setInstanceCount(Integer.parseInt(command.getOptions().get("count")));
     }
 
@@ -104,76 +112,39 @@ public final class TwillContainerService extends AbstractTwillService {
     return result;
   }
 
-  private ContainerLiveNodeData createLiveNodeData(ContainerInfo containerInfo) {
-    // if debugging is enabled, log the port and register it in service discovery.
-    String debugPort = System.getProperty("twill.debug.port");
-    if (debugPort != null) {
-      LOG.info("JVM is listening for debugger on port {}", debugPort);
-    }
-    return new ContainerLiveNodeData(containerInfo.getId(),
-                                     containerInfo.getHost().getCanonicalHostName(),
-                                     debugPort);
-  }
+  @SuppressWarnings("unchecked")
+  @Override
+  protected void doStart() throws Exception {
+    commandExecutor = Executors.newSingleThreadExecutor(
+      Threads.createDaemonThreadFactory("runnable-command-executor"));
 
-  private Supplier<? extends JsonElement> createLiveNodeSupplier(final ContainerLiveNodeData data) {
-    return new Supplier<JsonElement>() {
-      @Override
-      public JsonElement get() {
-        return new Gson().toJsonTree(data);
-      }
-    };
+    Class<?> runnableClass = classLoader.loadClass(specification.getClassName());
+    Preconditions.checkArgument(TwillRunnable.class.isAssignableFrom(runnableClass),
+                                "Class %s is not instance of TwillRunnable.", specification.getClassName());
+
+    runnable = Instances.newInstance((Class<TwillRunnable>) runnableClass);
+    runnable.initialize(context);
   }
 
   @Override
-  protected Service getServiceDelegate() {
-    return serviceDelegate;
+  protected void doRun() throws Exception {
+    runnable.run();
   }
 
-  private final class ServiceDelegate extends AbstractExecutionThreadService implements MessageCallback {
-
-    @Override
-    protected void startUp() throws Exception {
-
-      commandExecutor = Executors.newSingleThreadExecutor(
-        Threads.createDaemonThreadFactory("runnable-command-executor"));
-
-      Class<?> runnableClass = classLoader.loadClass(specification.getClassName());
-      Preconditions.checkArgument(TwillRunnable.class.isAssignableFrom(runnableClass),
-                                  "Class %s is not instance of TwillRunnable.", specification.getClassName());
-
-      runnable = Instances.newInstance((Class<TwillRunnable>) runnableClass);
-      runnable.initialize(context);
-    }
-
-    @Override
-    protected void triggerShutdown() {
-      try {
-        runnable.stop();
-      } catch (Throwable t) {
-        LOG.error("Exception when stopping runnable.", t);
-      }
-    }
-
-    @Override
-    protected void shutDown() throws Exception {
-      commandExecutor.shutdownNow();
-      runnable.destroy();
-      context.stop();
-      Loggings.forceFlush();
-    }
-
-    @Override
-    protected void run() throws Exception {
-      runnable.run();
-    }
+  @Override
+  protected void doStop() throws Exception {
+    commandExecutor.shutdownNow();
+    runnable.destroy();
+    context.stop();
+    Loggings.forceFlush();
+  }
 
-    @Override
-    public ListenableFuture<String> onReceived(String messageId, Message message) {
-      if (state() == State.RUNNING) {
-        // Only process message if the service is still alive
-        return processMessage(messageId, message);
-      }
-      return Futures.immediateFuture(messageId);
+  @Override
+  protected void triggerShutdown() {
+    try {
+      runnable.stop();
+    } catch (Throwable t) {
+      LOG.error("Exception when stopping runnable.", t);
     }
   }
 }