You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2015/01/26 23:32:21 UTC

[3/5] incubator-twill git commit: (TWILL-80) Perform log force flush correctly. Also there are refactoring around service starts/stops for container (both AM and runnable) to have a cleaner view on the service dependencies. The help making sure unregistr

(TWILL-80) Perform log force flush correctly. Also there are refactoring around
service starts/stops for container (both AM and runnable) to have a cleaner view
on the service dependencies. The help making sure unregistration to RM is done
last on AM, providing a more accurate view for RM.


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/5dfe40d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/5dfe40d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/5dfe40d3

Branch: refs/heads/feature/improve-log
Commit: 5dfe40d37dfbaedd13e9597ece54ecbe821d2c4e
Parents: 73ecf8e
Author: Terence Yim <ch...@apache.org>
Authored: Fri Dec 12 14:47:58 2014 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Dec 15 15:11:18 2014 -0800

----------------------------------------------------------------------
 pom.xml                                         |   2 +-
 twill-common/pom.xml                            |   1 -
 .../apache/twill/common/CompositeService.java   | 109 ++++++++++++++
 .../apache/twill/filesystem/LocalLocation.java  |   5 +
 .../twill/common/CompositeServiceTest.java      | 145 ++++++++++++++++++
 .../twill/internal/AbstractTwillService.java    |  25 +---
 .../twill/internal/ApplicationBundler.java      |  17 +--
 .../twill/internal/TwillContainerLauncher.java  |   2 +-
 .../twill/internal/logging/KafkaAppender.java   |   6 +-
 .../apache/twill/filesystem/HDFSLocation.java   |   5 +
 .../org/apache/twill/internal/ServiceMain.java  |  91 ++++++++++--
 .../appmaster/ApplicationMasterMain.java        | 146 +++++++++++++++++--
 .../appmaster/ApplicationMasterService.java     | 128 ++--------------
 .../internal/appmaster/TrackerService.java      |  37 +++--
 .../internal/container/TwillContainerMain.java  |  36 +++--
 .../container/TwillContainerService.java        |  13 +-
 .../internal/yarn/AbstractYarnTwillService.java |   2 +-
 .../apache/twill/yarn/YarnTwillPreparer.java    |   2 +-
 .../twill/yarn/YarnTwillRunnerService.java      |   2 +-
 .../twill/yarn/InitializeFailTestRun.java       |  33 ++++-
 .../apache/twill/yarn/SessionExpireTestRun.java |   4 +-
 .../zookeeper/DefaultZKClientService.java       |   5 +-
 22 files changed, 608 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index be184dd..3b8dfed 100644
--- a/pom.xml
+++ b/pom.xml
@@ -339,7 +339,7 @@
                 <artifactId>maven-surefire-plugin</artifactId>
                 <version>2.14.1</version>
                 <configuration>
-                    <argLine>-Xmx1024m -XX:MaxPermSize=256m -Djava.awt.headless=true</argLine>
+                    <argLine>-Xmx2048m -XX:MaxPermSize=256m -Djava.awt.headless=true</argLine>
                     <redirectTestOutputToFile>${surefire.redirectTestOutputToFile}</redirectTestOutputToFile>
                     <systemPropertyVariables>
                         <java.io.tmpdir>${project.build.directory}</java.io.tmpdir>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-common/pom.xml
----------------------------------------------------------------------
diff --git a/twill-common/pom.xml b/twill-common/pom.xml
index e57bad9..cf4c2c0 100644
--- a/twill-common/pom.xml
+++ b/twill-common/pom.xml
@@ -41,7 +41,6 @@
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>junit</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-common/src/main/java/org/apache/twill/common/CompositeService.java
----------------------------------------------------------------------
diff --git a/twill-common/src/main/java/org/apache/twill/common/CompositeService.java b/twill-common/src/main/java/org/apache/twill/common/CompositeService.java
new file mode 100644
index 0000000..2817988
--- /dev/null
+++ b/twill-common/src/main/java/org/apache/twill/common/CompositeService.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.common;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Service;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Iterator;
+
+/**
+ * A {@link Service} that starts/stops list of services in order.
+ */
+public final class CompositeService extends AbstractIdleService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CompositeService.class);
+  private final Deque<Service> services;
+
+  public CompositeService(Service...services) {
+    this(ImmutableList.copyOf(services));
+  }
+
+  public CompositeService(Iterable<? extends Service> services) {
+    this.services = new ArrayDeque<Service>();
+    Iterables.addAll(this.services, services);
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    Throwable failureCause = null;
+
+    for (Service service : services) {
+      try {
+        service.startAndWait();
+      } catch (UncheckedExecutionException e) {
+        failureCause = e.getCause();
+        break;
+      }
+    }
+
+    if (failureCause != null) {
+      // Stop all running services and then throw the failure exception
+      try {
+        stopAll();
+      } catch (Throwable t) {
+        // Ignore the stop error. Just log.
+        LOG.warn("Failed when stopping all services on start failure", t);
+      }
+
+      Throwables.propagateIfPossible(failureCause, Exception.class);
+      throw new RuntimeException(failureCause);
+    }
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    stopAll();
+  }
+
+  private void stopAll() throws Exception {
+    Throwable failureCause = null;
+
+    // Stop services in reverse order.
+    Iterator<Service> itor = services.descendingIterator();
+    while (itor.hasNext()) {
+      Service service = itor.next();
+      try {
+        if (service.isRunning() || service.state() == State.STARTING) {
+          service.stopAndWait();
+        }
+      } catch (UncheckedExecutionException e) {
+        // Just catch as we want all services stopped
+        if (failureCause == null) {
+          failureCause = e.getCause();
+        } else {
+          // Log for sub-sequence service shutdown error, as only the first failure cause will be thrown.
+          LOG.warn("Failed to stop service {}", service, e);
+        }
+      }
+    }
+
+    if (failureCause != null) {
+      Throwables.propagateIfPossible(failureCause, Exception.class);
+      throw new RuntimeException(failureCause);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
----------------------------------------------------------------------
diff --git a/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java b/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
index 6dec09c..3b01e61 100644
--- a/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
+++ b/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
@@ -251,4 +251,9 @@ final class LocalLocation implements Location {
   public int hashCode() {
     return file.hashCode();
   }
+
+  @Override
+  public String toString() {
+    return file.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-common/src/test/java/org/apache/twill/common/CompositeServiceTest.java
----------------------------------------------------------------------
diff --git a/twill-common/src/test/java/org/apache/twill/common/CompositeServiceTest.java b/twill-common/src/test/java/org/apache/twill/common/CompositeServiceTest.java
new file mode 100644
index 0000000..1c45892
--- /dev/null
+++ b/twill-common/src/test/java/org/apache/twill/common/CompositeServiceTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.common;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Service;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Tests for {@link CompositeService}.
+ */
+public class CompositeServiceTest {
+
+  @Test
+  public void testOrder() throws InterruptedException, ExecutionException, TimeoutException {
+    List<Service> services = Lists.newArrayList();
+
+    // Start 10 services and check their start sequence is ordered.
+    Semaphore semaphore = new Semaphore(0);
+    for (int i = 0; i < 10; i++) {
+      services.add(new TestService(semaphore, i));
+    }
+
+    Service service = new CompositeService(services);
+    service.start().get(5, TimeUnit.SECONDS);
+
+    // There should be 10 permits after all 10 services started
+    Assert.assertTrue(semaphore.tryAcquire(10, 5, TimeUnit.SECONDS));
+
+    // Check all services are running
+    Assert.assertTrue(Iterables.all(services, serviceStatePredicate(Service.State.RUNNING)));
+
+    // Release 10 permits for the stop sequence to start
+    semaphore.release(10);
+    service.stop().get(5, TimeUnit.SECONDS);
+
+    // There should be no permit left after all 10 services stopped
+    Assert.assertFalse(semaphore.tryAcquire(10));
+
+    // Check all services are stopped
+    Assert.assertTrue(Iterables.all(services, serviceStatePredicate(Service.State.TERMINATED)));
+  }
+
+  @Test
+  public void testErrorStart() throws InterruptedException {
+    List<Service> services = Lists.newArrayList();
+
+    // Create 5 services. The forth one will got a start failure.
+    Semaphore semaphore = new Semaphore(0);
+    for (int i = 0; i < 5; i++) {
+      services.add(new TestService(semaphore, i, i == 3));
+    }
+
+    Service service = new CompositeService(services);
+    try {
+      service.start().get();
+      Assert.fail();
+    } catch (ExecutionException e) {
+      // Expected
+    }
+
+    // Verify all services are not in running state
+    Assert.assertTrue(Iterables.all(services, Predicates.not(serviceStatePredicate(Service.State.RUNNING))));
+
+    // There should be one service in error state
+    Assert.assertTrue(Iterables.removeIf(services, serviceStatePredicate(Service.State.FAILED)));
+
+    for (Service s : services) {
+      Assert.assertNotEquals(3, ((TestService) s).getOrder());
+    }
+  }
+
+  private Predicate<Service> serviceStatePredicate(final Service.State state) {
+    return new Predicate<Service>() {
+      @Override
+      public boolean apply(Service service) {
+        return service.state() == state;
+      }
+    };
+  }
+
+  private static final class TestService extends AbstractIdleService {
+
+    private final Semaphore semaphore;
+    private final int order;
+    private final boolean startFail;
+
+    private TestService(Semaphore semaphore, int order) {
+      this(semaphore, order, false);
+    }
+
+    private TestService(Semaphore semaphore, int order, boolean startFail) {
+      this.semaphore = semaphore;
+      this.order = order;
+      this.startFail = startFail;
+    }
+
+    public int getOrder() {
+      return order;
+    }
+
+    @Override
+    protected void startUp() throws Exception {
+      Preconditions.checkState(!startFail, "Fail to start service of order %s", order);
+
+      // Acquire all permits emitted by previous service
+      semaphore.acquire(order);
+      // Emit enough permits for next service
+      semaphore.release(order + 1);
+    }
+
+    @Override
+    protected void shutDown() throws Exception {
+      semaphore.acquire(order + 1);
+      semaphore.release(order);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
index a6939e2..2f95e0e 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
@@ -18,7 +18,6 @@
 package org.apache.twill.internal;
 
 import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.AbstractExecutionThreadService;
 import com.google.common.util.concurrent.FutureCallback;
@@ -145,13 +144,6 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
                                                      Threads.createDaemonThreadFactory("message-callback"),
                                                      new ThreadPoolExecutor.DiscardPolicy());
 
-    // Create the live node, if succeeded, start the service, otherwise fail out.
-    createLiveNode().get();
-
-    // Create node for messaging
-    ZKOperations.ignoreError(zkClient.create(getZKPath("messages"), null, CreateMode.PERSISTENT),
-                             KeeperException.NodeExistsException.class, null).get();
-
     // Watch for session expiration, recreate the live node if reconnected after expiration.
     zkClient.addConnectionWatcher(new Watcher() {
       private boolean expired = false;
@@ -169,6 +161,13 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
       }
     });
 
+    // Create the live node, if succeeded, start the service, otherwise fail out.
+    createLiveNode().get();
+
+    // Create node for messaging
+    ZKOperations.ignoreError(zkClient.create(getZKPath("messages"), null, CreateMode.PERSISTENT),
+                             KeeperException.NodeExistsException.class, null).get();
+
     doStart();
 
     // Starts watching for messages
@@ -186,10 +185,8 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
     try {
       doStop();
     } finally {
-      ListenableFuture<List<String>> removeCompletion = Futures.successfulAsList(ImmutableList.of(removeServiceNode(),
-                                                                                                  removeLiveNode()));
       // Given at most 5 seconds to cleanup ZK nodes
-      removeCompletion.get(5, TimeUnit.SECONDS);
+      removeLiveNode().get(5, TimeUnit.SECONDS);
       LOG.info("Service {} with runId {} shutdown completed", getServiceName(), runId.getId());
     }
   }
@@ -213,12 +210,6 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
     return ZKOperations.ignoreError(zkClient.delete(liveNode), KeeperException.NoNodeException.class, liveNode);
   }
 
-  private OperationFuture<String> removeServiceNode() {
-    String serviceNode = String.format("/%s", runId.getId());
-    LOG.info("Remove service node {}{}", zkClient.getConnectString(), serviceNode);
-    return ZKOperations.recursiveDelete(zkClient, serviceNode);
-  }
-
   /**
    * Watches for messages that are sent through ZK messages node.
    */

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java b/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
index 3f2d073..43268b7 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
@@ -142,20 +142,19 @@ public final class ApplicationBundler {
       } finally {
         jarOut.close();
       }
-      LOG.debug("copying temporary bundle to destination {} ({} bytes)", target.toURI(), tmpJar.length());
+      LOG.debug("copying temporary bundle to destination {} ({} bytes)", target, tmpJar.length());
       // Copy the tmp jar into destination.
-      OutputStream os = null; 
       try {
-        os = new BufferedOutputStream(target.getOutputStream());
-        Files.copy(tmpJar, os);
-      } catch (IOException e) {
-        throw new IOException("failed to copy bundle from " + tmpJar.toURI() + " to " + target.toURI(), e);
-      } finally {
-        if (os != null) {
+        OutputStream os = new BufferedOutputStream(target.getOutputStream());
+        try {
+          Files.copy(tmpJar, os);
+        } finally {
           os.close();
         }
+      } catch (IOException e) {
+        throw new IOException("failed to copy bundle from " + tmpJar.toURI() + " to " + target, e);
       }
-      LOG.debug("finished creating bundle at {}", target.toURI());
+      LOG.debug("finished creating bundle at {}", target);
     } finally {
       tmpJar.delete();
       LOG.debug("cleaned up local temporary for bundle {}", tmpJar.toURI());

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
index a891de6..1688073 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
@@ -98,7 +98,7 @@ public final class TwillContainerLauncher {
                                                                  secureStoreLocation.length(), false, null));
       }
     } catch (IOException e) {
-      LOG.warn("Failed to launch container with secure store {}.", secureStoreLocation.toURI());
+      LOG.warn("Failed to launch container with secure store {}.", secureStoreLocation);
     }
 
     if (afterResources == null) {

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
index 174c25d..c8becd3 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
@@ -185,9 +185,9 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
 
   public void forceFlush() {
     try {
-      publishLogs(2, TimeUnit.SECONDS);
+      scheduler.submit(flushTask).get(2, TimeUnit.SECONDS);
     } catch (Exception e) {
-      LOG.error("Failed to publish last batch of log.", e);
+      LOG.error("Failed to force log flush in 2 seconds.", e);
     }
   }
 
@@ -279,7 +279,7 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
         try {
           int published = publishLogs(2L, TimeUnit.SECONDS);
           if (LOG.isDebugEnabled()) {
-            LOG.info("Published {} log messages to Kafka.", published);
+            LOG.debug("Published {} log messages to Kafka.", published);
           }
         } catch (Exception e) {
           LOG.error("Failed to push logs to Kafka. Log entries dropped.", e);

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
index ed86cb8..2ab97db 100644
--- a/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
+++ b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
@@ -238,4 +238,9 @@ final class HDFSLocation implements Location {
   public int hashCode() {
     return path.hashCode();
   }
+
+  @Override
+  public String toString() {
+    return path.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
index ae86f42..fc6cb3c 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
@@ -22,18 +22,30 @@ import ch.qos.logback.classic.joran.JoranConfigurator;
 import ch.qos.logback.classic.util.ContextInitializer;
 import ch.qos.logback.core.joran.spi.JoranException;
 import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.AbstractService;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Service;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.twill.api.RunId;
+import org.apache.twill.common.CompositeService;
 import org.apache.twill.common.Services;
 import org.apache.twill.filesystem.HDFSLocationFactory;
 import org.apache.twill.filesystem.LocalLocationFactory;
 import org.apache.twill.filesystem.Location;
 import org.apache.twill.internal.logging.KafkaAppender;
+import org.apache.twill.internal.logging.Loggings;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClient;
 import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
+import org.apache.twill.zookeeper.ZKOperations;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.ILoggerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,6 +55,7 @@ import java.io.File;
 import java.io.StringReader;
 import java.net.URI;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Class for main method that starts a service.
@@ -58,45 +71,54 @@ public abstract class ServiceMain {
     }
   }
 
-  protected final void doMain(final ZKClientService zkClientService,
-                              final Service service) throws ExecutionException, InterruptedException {
+  protected final void doMain(final Service mainService,
+                              Service...prerequisites) throws ExecutionException, InterruptedException {
     configureLogger();
 
-    final String serviceName = service.toString();
+    Service requiredServices = new CompositeService(prerequisites);
     Runtime.getRuntime().addShutdownHook(new Thread() {
       @Override
       public void run() {
-        Services.chainStop(service, zkClientService);
+        mainService.stopAndWait();
       }
     });
 
     // Listener for state changes of the service
-    ListenableFuture<Service.State> completion = Services.getCompletionFuture(service);
+    ListenableFuture<Service.State> completion = Services.getCompletionFuture(mainService);
+    Throwable initFailure = null;
 
     try {
       try {
         // Starts the service
-        LOG.info("Starting service {}.", serviceName);
-        Futures.allAsList(Services.chainStart(zkClientService, service).get()).get();
-        LOG.info("Service {} started.", serviceName);
+        LOG.info("Starting service {}.", mainService);
+        Futures.allAsList(Services.chainStart(requiredServices, mainService).get()).get();
+        LOG.info("Service {} started.", mainService);
       } catch (Throwable t) {
-        LOG.error("Exception when starting service {}.", serviceName, t);
-        // Exit with the init fail exit code.
-        System.exit(ContainerExitCodes.INIT_FAILED);
+        LOG.error("Exception when starting service {}.", mainService, t);
+        initFailure = t;
       }
 
       try {
-        completion.get();
-        LOG.info("Service {} completed.", serviceName);
+        if (initFailure == null) {
+          completion.get();
+          LOG.info("Service {} completed.", mainService);
+        }
       } catch (Throwable t) {
-        LOG.error("Exception thrown from service {}.", serviceName, t);
+        LOG.error("Exception thrown from service {}.", mainService, t);
         throw Throwables.propagate(t);
       }
     } finally {
+      requiredServices.stopAndWait();
+
       ILoggerFactory loggerFactory = LoggerFactory.getILoggerFactory();
       if (loggerFactory instanceof LoggerContext) {
         ((LoggerContext) loggerFactory).stop();
       }
+
+      if (initFailure != null) {
+        // Exit with the init fail exit code.
+        System.exit(ContainerExitCodes.INIT_FAILED);
+      }
     }
   }
 
@@ -139,6 +161,17 @@ public abstract class ServiceMain {
     }
   }
 
+  /**
+   * Creates a {@link ZKClientService}.
+   */
+  protected static ZKClientService createZKClient(String zkConnectStr) {
+    return ZKClientServices.delegate(
+      ZKClients.reWatchOnExpire(
+        ZKClients.retryOnFailure(
+          ZKClientService.Builder.of(zkConnectStr).build(),
+          RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
+  }
+
   private void configureLogger() {
     // Check if SLF4J is bound to logback in the current environment
     ILoggerFactory loggerFactory = LoggerFactory.getILoggerFactory();
@@ -220,4 +253,34 @@ public abstract class ServiceMain {
     }
     return "OFF";
   }
+
+  /**
+   * A simple service for creating/remove ZK paths needed for {@link AbstractTwillService}.
+   */
+  protected static final class TwillZKPathService extends AbstractIdleService {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TwillZKPathService.class);
+    private static final long TIMEOUT_SECONDS = 5L;
+
+    private final ZKClient zkClient;
+    private final String path;
+
+    public TwillZKPathService(ZKClient zkClient, RunId runId) {
+      this.zkClient = zkClient;
+      this.path = "/" + runId.getId();
+    }
+
+    @Override
+    protected void startUp() throws Exception {
+      LOG.info("Creating container ZK path: {}{}", zkClient.getConnectString(), path);
+      ZKOperations.ignoreError(zkClient.create(path, null, CreateMode.PERSISTENT),
+                               KeeperException.NodeExistsException.class, null).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    }
+
+    @Override
+    protected void shutDown() throws Exception {
+      LOG.info("Removing container ZK path: {}{}", zkClient.getConnectString(), path);
+      ZKOperations.recursiveDelete(zkClient, path).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index 914f13f..561a68b 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -17,7 +17,8 @@
  */
 package org.apache.twill.internal.appmaster;
 
-import com.google.common.util.concurrent.Service;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -26,15 +27,23 @@ import org.apache.twill.internal.Constants;
 import org.apache.twill.internal.EnvKeys;
 import org.apache.twill.internal.RunIds;
 import org.apache.twill.internal.ServiceMain;
+import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
+import org.apache.twill.internal.logging.Loggings;
+import org.apache.twill.internal.utils.Networks;
 import org.apache.twill.internal.yarn.VersionDetectYarnAMClientFactory;
-import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.internal.yarn.YarnAMClient;
+import org.apache.twill.zookeeper.ZKClient;
 import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.twill.zookeeper.ZKClientServices;
-import org.apache.twill.zookeeper.ZKClients;
+import org.apache.twill.zookeeper.ZKOperations;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -56,18 +65,23 @@ public final class ApplicationMasterMain extends ServiceMain {
     File twillSpec = new File(Constants.Files.TWILL_SPEC);
     RunId runId = RunIds.fromString(System.getenv(EnvKeys.TWILL_RUN_ID));
 
-    ZKClientService zkClientService =
-      ZKClientServices.delegate(
-        ZKClients.reWatchOnExpire(
-          ZKClients.retryOnFailure(
-            ZKClientService.Builder.of(zkConnect).build(),
-            RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
-
+    ZKClientService zkClientService = createZKClient(zkConnect);
     Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
     setRMSchedulerAddress(conf);
-    Service service = new ApplicationMasterService(runId, zkClientService, twillSpec,
-                                                   new VersionDetectYarnAMClientFactory(conf), createAppLocation(conf));
-    new ApplicationMasterMain(String.format("%s/%s/kafka", zkConnect, runId.getId())).doMain(zkClientService, service);
+
+    final YarnAMClient amClient = new VersionDetectYarnAMClientFactory(conf).create();
+    ApplicationMasterService service = new ApplicationMasterService(runId, zkClientService,
+                                                                    twillSpec, amClient, createAppLocation(conf));
+    TrackerService trackerService = new TrackerService(service);
+
+    new ApplicationMasterMain(String.format("%s/%s/kafka", zkConnect, runId.getId()))
+      .doMain(
+        service,
+        new YarnAMClientService(amClient, trackerService),
+        zkClientService,
+        new TwillZKPathService(zkClientService, runId),
+        new ApplicationKafkaService(zkClientService, runId)
+      );
   }
 
   /**
@@ -105,4 +119,108 @@ public final class ApplicationMasterMain extends ServiceMain {
   protected String getRunnableName() {
     return System.getenv(EnvKeys.TWILL_RUNNABLE_NAME);
   }
+
+
+  /**
+   * A service wrapper for starting/stopping {@link EmbeddedKafkaServer} and make sure the ZK path for
+   * Kafka exists before starting the Kafka server.
+   */
+  private static final class ApplicationKafkaService extends AbstractIdleService {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ApplicationKafkaService.class);
+
+    private final ZKClient zkClient;
+    private final String kafkaZKPath;
+    private final EmbeddedKafkaServer kafkaServer;
+
+    private ApplicationKafkaService(ZKClient zkClient, RunId runId) {
+      this.zkClient = zkClient;
+      this.kafkaZKPath = "/" + runId.getId() + "/kafka";
+      this.kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig(zkClient.getConnectString() + kafkaZKPath));
+    }
+
+    @Override
+    protected void startUp() throws Exception {
+      ZKOperations.ignoreError(
+        zkClient.create(kafkaZKPath, null, CreateMode.PERSISTENT),
+        KeeperException.NodeExistsException.class, kafkaZKPath).get();
+      kafkaServer.startAndWait();
+    }
+
+    @Override
+    protected void shutDown() throws Exception {
+      // Flush all logs before shutting down Kafka server
+      Loggings.forceFlush();
+      // Delay for 2 seconds to give clients chance to poll the last batch of log messages.
+      try {
+        TimeUnit.SECONDS.sleep(2);
+      } catch (InterruptedException e) {
+        // Ignore
+        LOG.info("Kafka shutdown delay interrupted", e);
+      } finally {
+        kafkaServer.stopAndWait();
+      }
+    }
+
+    private Properties generateKafkaConfig(String kafkaZKConnect) {
+      int port = Networks.getRandomPort();
+      Preconditions.checkState(port > 0, "Failed to get random port.");
+
+      Properties prop = new Properties();
+      prop.setProperty("log.dir", new File("kafka-logs").getAbsolutePath());
+      prop.setProperty("port", Integer.toString(port));
+      prop.setProperty("broker.id", "1");
+      prop.setProperty("socket.send.buffer.bytes", "1048576");
+      prop.setProperty("socket.receive.buffer.bytes", "1048576");
+      prop.setProperty("socket.request.max.bytes", "104857600");
+      prop.setProperty("num.partitions", "1");
+      prop.setProperty("log.retention.hours", "24");
+      prop.setProperty("log.flush.interval.messages", "10000");
+      prop.setProperty("log.flush.interval.ms", "1000");
+      prop.setProperty("log.segment.bytes", "536870912");
+      prop.setProperty("zookeeper.connect", kafkaZKConnect);
+      prop.setProperty("zookeeper.connection.timeout.ms", "1000000");
+      prop.setProperty("default.replication.factor", "1");
+      return prop;
+    }
+  }
+
+
+  /**
+   * A Service wrapper that starts {@link TrackerService} and {@link YarnAMClient}. It is needed because
+   * the tracker host and url needs to be provided to {@link YarnAMClient} before it starts {@link YarnAMClient}.
+   */
+  private static final class YarnAMClientService extends AbstractIdleService {
+
+    private final YarnAMClient yarnAMClient;
+    private final TrackerService trackerService;
+
+    private YarnAMClientService(YarnAMClient yarnAMClient, TrackerService trackerService) {
+      this.yarnAMClient = yarnAMClient;
+      this.trackerService = trackerService;
+    }
+
+    @Override
+    protected void startUp() throws Exception {
+      trackerService.setHost(yarnAMClient.getHost());
+      trackerService.startAndWait();
+
+      yarnAMClient.setTracker(trackerService.getBindAddress(), trackerService.getUrl());
+      try {
+        yarnAMClient.startAndWait();
+      } catch (Exception e) {
+        trackerService.stopAndWait();
+        throw e;
+      }
+    }
+
+    @Override
+    protected void shutDown() throws Exception {
+      try {
+        yarnAMClient.stopAndWait();
+      } finally {
+        trackerService.stopAndWait();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index d8659dd..8f8952b 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -23,7 +23,6 @@ import com.google.common.base.Predicate;
 import com.google.common.base.Supplier;
 import com.google.common.base.Throwables;
 import com.google.common.collect.HashMultiset;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
@@ -67,14 +66,10 @@ import org.apache.twill.internal.TwillContainerLauncher;
 import org.apache.twill.internal.json.JvmOptionsCodec;
 import org.apache.twill.internal.json.LocalFileCodec;
 import org.apache.twill.internal.json.TwillSpecificationAdapter;
-import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
-import org.apache.twill.internal.logging.Loggings;
 import org.apache.twill.internal.state.Message;
 import org.apache.twill.internal.utils.Instances;
-import org.apache.twill.internal.utils.Networks;
 import org.apache.twill.internal.yarn.AbstractYarnTwillService;
 import org.apache.twill.internal.yarn.YarnAMClient;
-import org.apache.twill.internal.yarn.YarnAMClientFactory;
 import org.apache.twill.internal.yarn.YarnContainerInfo;
 import org.apache.twill.internal.yarn.YarnContainerStatus;
 import org.apache.twill.internal.yarn.YarnUtils;
@@ -88,12 +83,10 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.Reader;
-import java.net.URL;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
@@ -103,7 +96,7 @@ import java.util.concurrent.TimeUnit;
 /**
  *
  */
-public final class ApplicationMasterService extends AbstractYarnTwillService {
+public final class ApplicationMasterService extends AbstractYarnTwillService implements Supplier<ResourceReport> {
 
   private static final Logger LOG = LoggerFactory.getLogger(ApplicationMasterService.class);
 
@@ -116,7 +109,6 @@ public final class ApplicationMasterService extends AbstractYarnTwillService {
   private final ApplicationMasterLiveNodeData amLiveNode;
   private final RunningContainers runningContainers;
   private final ExpectedContainers expectedContainers;
-  private final TrackerService trackerService;
   private final YarnAMClient amClient;
   private final JvmOptions jvmOpts;
   private final int reservedMemory;
@@ -125,19 +117,18 @@ public final class ApplicationMasterService extends AbstractYarnTwillService {
   private final PlacementPolicyManager placementPolicyManager;
 
   private volatile boolean stopped;
-  private EmbeddedKafkaServer kafkaServer;
   private Queue<RunnableContainerRequest> runnableContainerRequests;
   private ExecutorService instanceChangeExecutor;
 
   public ApplicationMasterService(RunId runId, ZKClient zkClient, File twillSpecFile,
-                                  YarnAMClientFactory amClientFactory, Location applicationLocation) throws Exception {
+                                  YarnAMClient amClient, Location applicationLocation) throws Exception {
     super(zkClient, runId, applicationLocation);
 
     this.runId = runId;
     this.twillSpec = TwillSpecificationAdapter.create().fromJson(twillSpecFile);
     this.zkClient = zkClient;
     this.applicationLocation = applicationLocation;
-    this.amClient = amClientFactory.create();
+    this.amClient = amClient;
     this.credentials = createCredentials();
     this.jvmOpts = loadJvmOptions();
     this.reservedMemory = getReservedMemory();
@@ -149,12 +140,6 @@ public final class ApplicationMasterService extends AbstractYarnTwillService {
 
     expectedContainers = initExpectedContainers(twillSpec);
     runningContainers = initRunningContainers(amClient.getContainerId(), amClient.getHost());
-    trackerService = new TrackerService(new Supplier<ResourceReport>() {
-      @Override
-      public ResourceReport get() {
-        return runningContainers.getResourceReport();
-      }
-    }, amClient.getHost());
     eventHandler = createEventHandler(twillSpec);
   }
 
@@ -183,6 +168,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService {
     }
   }
 
+  @SuppressWarnings("unchecked")
   private EventHandler createEventHandler(TwillSpecification twillSpec) {
     try {
       // Should be able to load by this class ClassLoader, as they packaged in the same jar.
@@ -219,6 +205,11 @@ public final class ApplicationMasterService extends AbstractYarnTwillService {
   }
 
   @Override
+  public ResourceReport get() {
+    return runningContainers.getResourceReport();
+  }
+
+  @Override
   protected void doStart() throws Exception {
     LOG.info("Start application master with spec: " + TwillSpecificationAdapter.create().toJson(twillSpec));
 
@@ -227,31 +218,9 @@ public final class ApplicationMasterService extends AbstractYarnTwillService {
 
     instanceChangeExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("instanceChanger"));
 
-    kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig());
-
-    // Must start tracker before start AMClient
-    LOG.info("Starting application master tracker server");
-    trackerService.startAndWait();
-    URL trackerUrl = trackerService.getUrl();
-    LOG.info("Started application master tracker server on " + trackerUrl);
-
-    amClient.setTracker(trackerService.getBindAddress(), trackerUrl);
-    amClient.startAndWait();
-
-    // Creates ZK path for runnable and kafka logging service
-    Futures.allAsList(ImmutableList.of(
-      zkClient.create("/" + runId.getId() + "/runnables", null, CreateMode.PERSISTENT),
-      zkClient.create("/" + runId.getId() + "/kafka", null, CreateMode.PERSISTENT))
-    ).get();
-
+    // Creates ZK path for runnable
+    zkClient.create("/" + runId.getId() + "/runnables", null, CreateMode.PERSISTENT).get();
     runningContainers.addWatcher("/discoverable");
-
-    // Starts kafka server
-    LOG.info("Starting kafka server");
-
-    kafkaServer.startAndWait();
-    LOG.info("Kafka server started");
-
     runnableContainerRequests = initContainerRequests();
   }
 
@@ -295,29 +264,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService {
       TimeUnit.SECONDS.sleep(1);
     }
 
-    LOG.info("Stopping application master tracker server");
-    try {
-      trackerService.stopAndWait();
-      LOG.info("Stopped application master tracker server");
-    } catch (Exception e) {
-      LOG.error("Failed to stop tracker service.", e);
-    } finally {
-      try {
-        try {
-          // App location cleanup
-          cleanupDir();
-          Loggings.forceFlush();
-          // Sleep a short while to let kafka clients to have chance to fetch the log
-          TimeUnit.SECONDS.sleep(1);
-        } finally {
-          kafkaServer.stopAndWait();
-          LOG.info("Kafka server stopped");
-        }
-      } finally {
-        // Stops the AMClient
-        amClient.stopAndWait();
-      }
-    }
+    cleanupDir();
   }
 
   @Override
@@ -431,7 +378,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService {
         (System.currentTimeMillis() - requestStartTime) > Constants.CONSTRAINED_PROVISION_REQUEST_TIMEOUT) {
         LOG.info("Relaxing provisioning constraints for request {}", provisioning.peek().getRequestId());
         // Clear the blacklist for the pending provision request(s).
-        clearBlacklist();
+        amClient.clearBlacklist();
         isRequestRelaxed = true;
       }
 
@@ -447,7 +394,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService {
    * Manage Blacklist for a given request.
    */
   private void manageBlacklist(Map.Entry<AllocationSpecification, ? extends Collection<RuntimeSpecification>> request) {
-    clearBlacklist();
+    amClient.clearBlacklist();
 
     //Check the allocation strategy
     AllocationSpecification currentAllocationSpecification = request.getKey();
@@ -466,8 +413,8 @@ public final class ApplicationMasterService extends AbstractYarnTwillService {
             // Yarn Resource Manager may include port in the node name depending on the setting
             // YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME. It is safe to add both
             // the names (with and without port) to the blacklist.
-            addToBlacklist(containerInfo.getHost().getHostName());
-            addToBlacklist(containerInfo.getHost().getHostName() + ":" + containerInfo.getPort());
+            amClient.addToBlacklist(containerInfo.getHost().getHostName());
+            amClient.addToBlacklist(containerInfo.getHost().getHostName() + ":" + containerInfo.getPort());
           }
         }
       }
@@ -642,27 +589,6 @@ public final class ApplicationMasterService extends AbstractYarnTwillService {
   }
 
   /**
-   * Add a resource to the blacklist.
-   */
-  private void addToBlacklist(String resource) {
-    amClient.addToBlacklist(resource);
-  }
-
-  /**
-   * Remove a resource from the blacklist.
-   */
-  private void removeFromBlacklist(String resource) {
-    amClient.removeFromBlacklist(resource);
-  }
-
-  /**
-   * Clears the resource blacklist.
-   */
-  private void clearBlacklist() {
-    amClient.clearBlacklist();
-  }
-
-  /**
    * Launches runnables in the provisioned containers.
    */
   private void launchRunnable(List<? extends ProcessLauncher<YarnContainerInfo>> launchers,
@@ -735,28 +661,6 @@ public final class ApplicationMasterService extends AbstractYarnTwillService {
     return String.format("%s/%s/kafka", zkClient.getConnectString(), runId.getId());
   }
 
-  private Properties generateKafkaConfig() {
-    int port = Networks.getRandomPort();
-    Preconditions.checkState(port > 0, "Failed to get random port.");
-
-    Properties prop = new Properties();
-    prop.setProperty("log.dir", new File("kafka-logs").getAbsolutePath());
-    prop.setProperty("port", Integer.toString(port));
-    prop.setProperty("broker.id", "1");
-    prop.setProperty("socket.send.buffer.bytes", "1048576");
-    prop.setProperty("socket.receive.buffer.bytes", "1048576");
-    prop.setProperty("socket.request.max.bytes", "104857600");
-    prop.setProperty("num.partitions", "1");
-    prop.setProperty("log.retention.hours", "24");
-    prop.setProperty("log.flush.interval.messages", "10000");
-    prop.setProperty("log.flush.interval.ms", "1000");
-    prop.setProperty("log.segment.bytes", "536870912");
-    prop.setProperty("zookeeper.connect", getKafkaZKConnect());
-    prop.setProperty("zookeeper.connection.timeout.ms", "1000000");
-    prop.setProperty("default.replication.factor", "1");
-    return prop;
-  }
-
   /**
    * Attempts to change the number of running instances.
    * @return {@code true} if the message does requests for changes in number of running instances of a runnable,

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
index 6165c6f..7c09c58 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
@@ -64,6 +64,7 @@ import java.net.URL;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import javax.ws.rs.GET;
 
 /**
  * Webservice that the Application Master will register back to the resource manager
@@ -80,36 +81,43 @@ public final class TrackerService extends AbstractIdleService {
   private static final int CLOSE_CHANNEL_TIMEOUT = 5;
   private static final int MAX_INPUT_SIZE = 100 * 1024 * 1024;
 
-  private final String host;
+  private final Supplier<ResourceReport> resourceReport;
+  private final ChannelGroup channelGroup;
+
+  private String host;
   private ServerBootstrap bootstrap;
   private InetSocketAddress bindAddress;
   private URL url;
-  private final ChannelGroup channelGroup;
-  private final Supplier<ResourceReport> resourceReport;
 
   /**
    * Initialize the service.
    *
    * @param resourceReport live report that the service will return to clients.
-   * @param appMasterHost the application master host.
    */
-  public TrackerService(Supplier<ResourceReport> resourceReport, String appMasterHost) {
+  TrackerService(Supplier<ResourceReport> resourceReport) {
     this.channelGroup = new DefaultChannelGroup("appMasterTracker");
     this.resourceReport = resourceReport;
-    this.host = appMasterHost;
+  }
+
+  /**
+   * Sets the hostname which the tracker service will bind to. This method must be called before starting this
+   * tracker service.
+   */
+  void setHost(String host) {
+    this.host = host;
   }
 
   /**
    * Returns the address this tracker service is bounded to.
    */
-  public InetSocketAddress getBindAddress() {
+  InetSocketAddress getBindAddress() {
     return bindAddress;
   }
 
   /**
    * @return tracker url.
    */
-  public URL getUrl() {
+  URL getUrl() {
     return url;
   }
 
@@ -138,7 +146,7 @@ public final class TrackerService extends AbstractIdleService {
         pipeline.addLast("aggregator", new HttpChunkAggregator(MAX_INPUT_SIZE));
         pipeline.addLast("encoder", new HttpResponseEncoder());
         pipeline.addLast("compressor", new HttpContentCompressor());
-        pipeline.addLast("handler", new ReportHandler(resourceReport));
+        pipeline.addLast("handler", new ReportHandler());
 
         return pipeline;
       }
@@ -148,6 +156,8 @@ public final class TrackerService extends AbstractIdleService {
     bindAddress = (InetSocketAddress) channel.getLocalAddress();
     url = URI.create(String.format("http://%s:%d", host, bindAddress.getPort())).toURL();
     channelGroup.add(channel);
+
+    LOG.info("Tracker service started at {}", url);
   }
 
   @Override
@@ -159,18 +169,17 @@ public final class TrackerService extends AbstractIdleService {
     } finally {
       bootstrap.releaseExternalResources();
     }
+    LOG.info("Tracker service stopped at {}", url);
   }
 
   /**
    * Handler to return resources used by this application master, which will be available through
    * the host and port set when this application master registered itself to the resource manager.
    */
-  public class ReportHandler extends SimpleChannelUpstreamHandler {
-    private final Supplier<ResourceReport> report;
+  final class ReportHandler extends SimpleChannelUpstreamHandler {
     private final ResourceReportAdapter reportAdapter;
 
-    public ReportHandler(Supplier<ResourceReport> report) {
-      this.report = report;
+    public ReportHandler() {
       this.reportAdapter = ResourceReportAdapter.create();
     }
 
@@ -202,7 +211,7 @@ public final class TrackerService extends AbstractIdleService {
 
       ChannelBuffer content = ChannelBuffers.dynamicBuffer();
       Writer writer = new OutputStreamWriter(new ChannelBufferOutputStream(content), CharsetUtil.UTF_8);
-      reportAdapter.toJson(report.get(), writer);
+      reportAdapter.toJson(resourceReport.get(), writer);
       try {
         writer.close();
       } catch (IOException e1) {

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
index 5c9aa45..bb3d200 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -20,6 +20,7 @@ package org.apache.twill.internal.container;
 import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
 import com.google.common.io.Files;
+import com.google.common.util.concurrent.AbstractService;
 import com.google.common.util.concurrent.Service;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -43,10 +44,9 @@ import org.apache.twill.internal.RunIds;
 import org.apache.twill.internal.ServiceMain;
 import org.apache.twill.internal.json.ArgumentsCodec;
 import org.apache.twill.internal.json.TwillSpecificationAdapter;
-import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.internal.logging.Loggings;
 import org.apache.twill.zookeeper.ZKClient;
 import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.twill.zookeeper.ZKClientServices;
 import org.apache.twill.zookeeper.ZKClients;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,7 +56,6 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.Reader;
-import java.util.concurrent.TimeUnit;
 
 /**
  *
@@ -81,11 +80,7 @@ public final class TwillContainerMain extends ServiceMain {
     int instanceId = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_ID));
     int instanceCount = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_COUNT));
 
-    ZKClientService zkClientService = ZKClientServices.delegate(
-      ZKClients.reWatchOnExpire(
-        ZKClients.retryOnFailure(ZKClientService.Builder.of(zkConnectStr).build(),
-                                 RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
-
+    ZKClientService zkClientService = createZKClient(zkConnectStr);
     ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClientService);
 
     ZKClient electionZKClient = getAppRunZKClient(zkClientService, appRunId);
@@ -111,7 +106,11 @@ public final class TwillContainerMain extends ServiceMain {
     Service service = new TwillContainerService(context, containerInfo, containerZKClient,
                                                 runId, runnableSpec, getClassLoader(),
                                                 createAppLocation(conf));
-    new TwillContainerMain().doMain(zkClientService, service);
+    new TwillContainerMain().doMain(
+      service,
+      new LogFlushService(),
+      zkClientService,
+      new TwillZKPathService(containerZKClient, runId));
   }
 
   private static void loadSecureStore() throws IOException {
@@ -192,4 +191,23 @@ public final class TwillContainerMain extends ServiceMain {
   protected String getRunnableName() {
     return System.getenv(EnvKeys.TWILL_RUNNABLE_NAME);
   }
+
+
+  /**
+   * Simple service that force flushing logs on stop.
+   */
+  private static final class LogFlushService extends AbstractService {
+
+    @Override
+    protected void doStart() {
+      // No-op
+      notifyStarted();
+    }
+
+    @Override
+    protected void doStop() {
+      Loggings.forceFlush();
+      notifyStopped();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
index dc3761f..96031ce 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
@@ -30,7 +30,6 @@ import org.apache.twill.filesystem.Location;
 import org.apache.twill.internal.BasicTwillContext;
 import org.apache.twill.internal.ContainerInfo;
 import org.apache.twill.internal.ContainerLiveNodeData;
-import org.apache.twill.internal.logging.Loggings;
 import org.apache.twill.internal.state.Message;
 import org.apache.twill.internal.utils.Instances;
 import org.apache.twill.internal.yarn.AbstractYarnTwillService;
@@ -134,9 +133,15 @@ public final class TwillContainerService extends AbstractYarnTwillService {
   @Override
   protected void doStop() throws Exception {
     commandExecutor.shutdownNow();
-    runnable.destroy();
-    context.stop();
-    Loggings.forceFlush();
+    try {
+      runnable.destroy();
+    } catch (Throwable t) {
+      // Just catch the exception, not propagate it since it's already in shutdown sequence and
+      // we want all twill services properly shutdown.
+      LOG.warn("Exception when calling runnable.destroy.", t);
+    } finally {
+      context.stop();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java
index 4704345..961bc30 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java
@@ -93,7 +93,7 @@ public abstract class AbstractYarnTwillService extends AbstractTwillService {
       UserGroupInformation.getCurrentUser().addCredentials(credentials);
       this.credentials = credentials;
 
-      LOG.info("Secure store updated from {}.", location.toURI());
+      LOG.info("Secure store updated from {}.", location);
 
     } catch (Throwable t) {
       LOG.error("Failed to update secure store.", t);

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index 0e0fc75..e78889a 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -329,7 +329,7 @@ final class YarnTwillPreparer implements TwillPreparer {
 
       List<Token<?>> tokens = YarnUtils.addDelegationTokens(yarnConfig, locationFactory, credentials);
       for (Token<?> token : tokens) {
-        LOG.debug("Delegation token acquired for {}, {}", locationFactory.getHomeLocation().toURI(), token);
+        LOG.debug("Delegation token acquired for {}, {}", locationFactory.getHomeLocation(), token);
       }
     } catch (IOException e) {
       LOG.warn("Failed to check for secure login type. Not gathering any delegation token.", e);

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
index 6a33131..b2981e4 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
@@ -601,7 +601,7 @@ public final class YarnTwillRunnerService extends AbstractIdleService implements
     // Rename the tmp file into the credentials location
     tmpLocation.renameTo(credentialsLocation);
 
-    LOG.debug("Secure store for {} {} saved to {}.", application, runId, credentialsLocation.toURI());
+    LOG.debug("Secure store for {} {} saved to {}.", application, runId, credentialsLocation);
   }
 
   private static LocationFactory createDefaultLocationFactory(Configuration configuration) {

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-yarn/src/test/java/org/apache/twill/yarn/InitializeFailTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/InitializeFailTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/InitializeFailTestRun.java
index 39813cc..01b599b 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/InitializeFailTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/InitializeFailTestRun.java
@@ -21,11 +21,16 @@ import org.apache.twill.api.AbstractTwillRunnable;
 import org.apache.twill.api.TwillContext;
 import org.apache.twill.api.TwillController;
 import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.logging.LogEntry;
+import org.apache.twill.api.logging.LogHandler;
+import org.apache.twill.api.logging.LogThrowable;
 import org.apache.twill.api.logging.PrinterLogHandler;
 import org.apache.twill.common.Services;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.PrintWriter;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -38,11 +43,33 @@ public class InitializeFailTestRun extends BaseYarnTest {
   @Test
   public void testInitFail() throws InterruptedException, ExecutionException, TimeoutException {
     TwillRunner runner = YarnTestUtils.getTwillRunner();
-    TwillController controller = runner.prepare(new InitFailRunnable())
-                                       .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out)))
-                                       .start();
+    final CountDownLatch logLatch = new CountDownLatch(1);
+
+    // Verify that it receives the exception log entry that thrown when runnable initialize
+    LogHandler logVerifyHandler = new LogHandler() {
+      @Override
+      public void onLog(LogEntry logEntry) {
+        LogThrowable logThrowable = logEntry.getThrowable();
+        if (logThrowable != null) {
+          while (logThrowable.getCause() != null) {
+            logThrowable = logThrowable.getCause();
+          }
+          if (IllegalStateException.class.getName().equals(logThrowable.getClassName())
+            && logThrowable.getMessage().contains("Fail to init")) {
+            logLatch.countDown();
+          }
+        }
+      }
+    };
+
+    TwillController controller = runner
+      .prepare(new InitFailRunnable())
+      .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out)))
+      .addLogHandler(logVerifyHandler)
+      .start();
 
     Services.getCompletionFuture(controller).get(2, TimeUnit.MINUTES);
+    Assert.assertTrue(logLatch.await(10, TimeUnit.SECONDS));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-yarn/src/test/java/org/apache/twill/yarn/SessionExpireTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/SessionExpireTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/SessionExpireTestRun.java
index f378606..6e2046c 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/SessionExpireTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/SessionExpireTestRun.java
@@ -21,6 +21,7 @@ import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Service;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.twill.api.AbstractTwillRunnable;
 import org.apache.twill.api.TwillController;
 import org.apache.twill.api.TwillRunner;
@@ -94,7 +95,7 @@ public class SessionExpireTestRun extends BaseYarnTest {
     QueryExp query = Query.isInstanceOf(new StringValueExp(ConnectionMXBean.class.getName()));
 
     Stopwatch stopwatch = new Stopwatch();
-
+    stopwatch.start();
     do {
       // Find the AM session and expire it
       Set<ObjectName> connectionBeans = mbeanServer.queryNames(ObjectName.WILDCARD, query);
@@ -111,6 +112,7 @@ public class SessionExpireTestRun extends BaseYarnTest {
           }
         }
       }
+      Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
     } while (stopwatch.elapsedTime(timeoutUnit) < timeout);
 
     return false;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
index 73ca274..0dde3be 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
@@ -424,18 +424,19 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
     public void process(WatchedEvent event) {
       try {
         if (event.getState() == Event.KeeperState.SyncConnected && state() == State.STARTING) {
-          LOG.debug("Connected to ZooKeeper: " + zkStr);
+          LOG.debug("Connected to ZooKeeper: {}", zkStr);
           notifyStarted();
           return;
         }
         if (event.getState() == Event.KeeperState.Expired) {
-          LOG.info("ZooKeeper session expired: " + zkStr);
+          LOG.info("ZooKeeper session expired: {}", zkStr);
 
           // When connection expired, simply reconnect again
           Thread t = new Thread(new Runnable() {
             @Override
             public void run() {
               try {
+                LOG.info("Reconnect to ZooKeeper due to expiration: {}", zkStr);
                 zooKeeper.set(createZooKeeper());
               } catch (IOException e) {
                 zooKeeper.set(null);