You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2015/10/14 22:39:41 UTC

incubator-twill git commit: WIP

Repository: incubator-twill
Updated Branches:
  refs/heads/feature/TWILL-131 [created] 1f831c299


WIP

Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/1f831c29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/1f831c29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/1f831c29

Branch: refs/heads/feature/TWILL-131
Commit: 1f831c2993bb16dd695baca5bee3436bfee4ed10
Parents: f6d2b6c
Author: Terence Yim <ch...@apache.org>
Authored: Wed Oct 14 13:39:32 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Wed Oct 14 13:39:32 2015 -0700

----------------------------------------------------------------------
 .../org/apache/twill/internal/Constants.java    | 83 +++++++++++++++++++
 .../twill/internal/AbstractTwillService.java    |  2 +-
 .../internal/AbstractZKServiceController.java   |  3 +-
 .../org/apache/twill/internal/Constants.java    | 76 ------------------
 .../twill/discovery/ZKDiscoveryService.java     |  4 +-
 .../org/apache/twill/internal/ServiceMain.java  | 19 +++--
 .../appmaster/ApplicationMasterMain.java        | 84 +++++++++++++++++++-
 .../appmaster/ApplicationMasterService.java     |  6 +-
 .../internal/container/TwillContainerMain.java  |  5 +-
 .../apache/twill/yarn/YarnTwillPreparer.java    |  8 +-
 .../twill/yarn/YarnTwillRunnerService.java      |  4 +-
 11 files changed, 190 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-common/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-common/src/main/java/org/apache/twill/internal/Constants.java b/twill-common/src/main/java/org/apache/twill/internal/Constants.java
new file mode 100644
index 0000000..dd04eb1
--- /dev/null
+++ b/twill-common/src/main/java/org/apache/twill/internal/Constants.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+/**
+ * This class contains collection of common constants used in Twill.
+ */
+public final class Constants {
+
+  public static final String LOG_TOPIC = "log";
+
+  /** Maximum number of seconds for AM to start. */
+  public static final int APPLICATION_MAX_START_SECONDS = 60;
+  /** Maximum number of seconds for AM to stop. */
+  public static final int APPLICATION_MAX_STOP_SECONDS = 60;
+
+  public static final long PROVISION_TIMEOUT = 30000;
+
+  /**
+   * Milliseconds AM should wait for RM to allocate a constrained provision request.
+   * On timeout, AM relaxes the request constraints.
+   */
+  public static final int CONSTRAINED_PROVISION_REQUEST_TIMEOUT = 5000;
+
+  public static final double HEAP_MIN_RATIO = 0.7d;
+
+  /** Memory size of AM. */
+  public static final int APP_MASTER_MEMORY_MB = 512;
+
+  public static final int APP_MASTER_RESERVED_MEMORY_MB = 150;
+
+  public static final String CLASSPATH = "classpath";
+  public static final String APPLICATION_CLASSPATH = "application-classpath";
+
+  /** Command names for the restart runnable instances. */
+  public static final String RESTART_ALL_RUNNABLE_INSTANCES = "restartAllRunnableInstances";
+  public static final String RESTART_RUNNABLES_INSTANCES = "restartRunnablesInstances";
+
+  /**
+   * Common ZK paths constants
+   */
+  public static final String DISCOVERY_PATH_PREFIX = "/discoverable";
+  public static final String INSTANCES_PATH_PREFIX = "/instances";
+
+
+  /**
+   * Constants for names of internal files that are shared between client, AM and containers.
+   */
+  public static final class Files {
+
+    public static final String LAUNCHER_JAR = "launcher.jar";
+    public static final String APP_MASTER_JAR = "appMaster.jar";
+    public static final String CONTAINER_JAR = "container.jar";
+    public static final String LOCALIZE_FILES = "localizeFiles.json";
+    public static final String TWILL_SPEC = "twillSpec.json";
+    public static final String ARGUMENTS = "arguments.json";
+    public static final String ENVIRONMENTS = "environments.json";
+    public static final String LOGBACK_TEMPLATE = "logback-template.xml";
+    public static final String JVM_OPTIONS = "jvm.opts";
+    public static final String CREDENTIALS = "credentials.store";
+
+    private Files() {
+    }
+  }
+
+  private Constants() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
index 2f95e0e..8688d0b 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
@@ -359,7 +359,7 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
   }
 
   private String getLiveNodePath() {
-    return "/instances/" + runId.getId();
+    return String.format("%s/%s", Constants.INSTANCES_PATH_PREFIX, runId.getId());
   }
 
   private <T> byte[] toJson(T obj) {

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
index 0cf92ea..9b30823 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
@@ -21,7 +21,6 @@ import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-
 import org.apache.twill.api.Command;
 import org.apache.twill.api.RunId;
 import org.apache.twill.api.ServiceController;
@@ -239,7 +238,7 @@ public abstract class AbstractZKServiceController extends AbstractExecutionServi
    * Returns the zookeeper node path for the ephemeral instance node for this runId.
    */
   protected final String getInstancePath() {
-    return String.format("/instances/%s", getRunId().getId());
+    return String.format("%s/%s", Constants.INSTANCES_PATH_PREFIX, getRunId().getId());
   }
 
   private String getZKPath(String path) {

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-core/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Constants.java b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
deleted file mode 100644
index 39de851..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/Constants.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-/**
- * This class contains collection of common constants used in Twill.
- */
-public final class Constants {
-
-  public static final String LOG_TOPIC = "log";
-
-  /** Maximum number of seconds for AM to start. */
-  public static final int APPLICATION_MAX_START_SECONDS = 60;
-  /** Maximum number of seconds for AM to stop. */
-  public static final int APPLICATION_MAX_STOP_SECONDS = 60;
-
-  public static final long PROVISION_TIMEOUT = 30000;
-
-  /**
-   * Milliseconds AM should wait for RM to allocate a constrained provision request.
-   * On timeout, AM relaxes the request constraints.
-   */
-  public static final int CONSTRAINED_PROVISION_REQUEST_TIMEOUT = 5000;
-
-  public static final double HEAP_MIN_RATIO = 0.7d;
-
-  /** Memory size of AM. */
-  public static final int APP_MASTER_MEMORY_MB = 512;
-
-  public static final int APP_MASTER_RESERVED_MEMORY_MB = 150;
-
-  public static final String CLASSPATH = "classpath";
-  public static final String APPLICATION_CLASSPATH = "application-classpath";
-
-  /** Command names for the restart runnable instances. */
-  public static final String RESTART_ALL_RUNNABLE_INSTANCES = "restartAllRunnableInstances";
-  public static final String RESTART_RUNNABLES_INSTANCES = "restartRunnablesInstances";
-
-  /**
-   * Constants for names of internal files that are shared between client, AM and containers.
-   */
-  public static final class Files {
-
-    public static final String LAUNCHER_JAR = "launcher.jar";
-    public static final String APP_MASTER_JAR = "appMaster.jar";
-    public static final String CONTAINER_JAR = "container.jar";
-    public static final String LOCALIZE_FILES = "localizeFiles.json";
-    public static final String TWILL_SPEC = "twillSpec.json";
-    public static final String ARGUMENTS = "arguments.json";
-    public static final String ENVIRONMENTS = "environments.json";
-    public static final String LOGBACK_TEMPLATE = "logback-template.xml";
-    public static final String JVM_OPTIONS = "jvm.opts";
-    public static final String CREDENTIALS = "credentials.store";
-
-    private Files() {
-    }
-  }
-
-  private Constants() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
index 3f0db34..c563bab 100644
--- a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
@@ -31,6 +31,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import org.apache.twill.common.Cancellable;
 import org.apache.twill.common.Threads;
+import org.apache.twill.internal.Constants;
 import org.apache.twill.zookeeper.NodeChildren;
 import org.apache.twill.zookeeper.NodeData;
 import org.apache.twill.zookeeper.OperationFuture;
@@ -94,7 +95,6 @@ import java.util.concurrent.locks.ReentrantLock;
  */
 public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceClient {
   private static final Logger LOG = LoggerFactory.getLogger(ZKDiscoveryService.class);
-  private static final String NAMESPACE = "/discoverable";
 
   private static final long RETRY_MILLIS = 1000;
 
@@ -112,7 +112,7 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
    * @param zkClient The {@link ZKClient} for interacting with zookeeper.
    */
   public ZKDiscoveryService(ZKClient zkClient) {
-    this(zkClient, NAMESPACE);
+    this(zkClient, Constants.DISCOVERY_PATH_PREFIX);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
index a6af3d3..cafd375 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
@@ -157,12 +157,16 @@ public abstract class ServiceMain {
   /**
    * Creates a {@link ZKClientService}.
    */
-  protected static ZKClientService createZKClient(String zkConnectStr) {
+  protected static ZKClientService createZKClient(String zkConnectStr, String appName) {
     return ZKClientServices.delegate(
-      ZKClients.reWatchOnExpire(
-        ZKClients.retryOnFailure(
-          ZKClientService.Builder.of(zkConnectStr).build(),
-          RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
+      ZKClients.namespace(
+        ZKClients.reWatchOnExpire(
+          ZKClients.retryOnFailure(
+            ZKClientService.Builder.of(zkConnectStr).build(),
+            RetryStrategies.fixDelay(1, TimeUnit.SECONDS)
+          )
+        ), "/" + appName
+      ));
   }
 
   private void configureLogger() {
@@ -256,10 +260,11 @@ public abstract class ServiceMain {
   /**
    * A simple service for creating/remove ZK paths needed for {@link AbstractTwillService}.
    */
-  protected static final class TwillZKPathService extends AbstractIdleService {
+  protected static class TwillZKPathService extends AbstractIdleService {
+
+    protected static final long TIMEOUT_SECONDS = 5L;
 
     private static final Logger LOG = LoggerFactory.getLogger(TwillZKPathService.class);
-    private static final long TIMEOUT_SECONDS = 5L;
 
     private final ZKClient zkClient;
     private final String path;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index 3e8cb93..f373947 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -19,6 +19,7 @@ package org.apache.twill.internal.appmaster;
 
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Futures;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -32,6 +33,7 @@ import org.apache.twill.internal.logging.Loggings;
 import org.apache.twill.internal.utils.Networks;
 import org.apache.twill.internal.yarn.VersionDetectYarnAMClientFactory;
 import org.apache.twill.internal.yarn.YarnAMClient;
+import org.apache.twill.zookeeper.OperationFuture;
 import org.apache.twill.zookeeper.ZKClient;
 import org.apache.twill.zookeeper.ZKClientService;
 import org.apache.twill.zookeeper.ZKOperations;
@@ -43,7 +45,10 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -65,7 +70,7 @@ public final class ApplicationMasterMain extends ServiceMain {
     File twillSpec = new File(Constants.Files.TWILL_SPEC);
     RunId runId = RunIds.fromString(System.getenv(EnvKeys.TWILL_RUN_ID));
 
-    ZKClientService zkClientService = createZKClient(zkConnect);
+    ZKClientService zkClientService = createZKClient(zkConnect, System.getenv(EnvKeys.TWILL_APP_NAME));
     Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
     setRMSchedulerAddress(conf);
 
@@ -74,12 +79,12 @@ public final class ApplicationMasterMain extends ServiceMain {
                                                                     twillSpec, amClient, createAppLocation(conf));
     TrackerService trackerService = new TrackerService(service);
 
-    new ApplicationMasterMain(String.format("%s/%s/kafka", zkConnect, runId.getId()))
+    new ApplicationMasterMain(service.getKafkaZKConnect())
       .doMain(
         service,
         new YarnAMClientService(amClient, trackerService),
         zkClientService,
-        new TwillZKPathService(zkClientService, runId),
+        new AppMasterTwillZKPathService(zkClientService, runId),
         new ApplicationKafkaService(zkClientService, runId)
       );
   }
@@ -229,4 +234,77 @@ public final class ApplicationMasterMain extends ServiceMain {
       }
     }
   }
+
+  private static final class AppMasterTwillZKPathService extends TwillZKPathService {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AppMasterTwillZKPathService.class);
+    private final ZKClient zkClient;
+
+    public AppMasterTwillZKPathService(ZKClient zkClient, RunId runId) {
+      super(zkClient, runId);
+      this.zkClient = zkClient;
+    }
+
+    @Override
+    protected void shutDown() throws Exception {
+      super.shutDown();
+
+      // Try to delete the /instances path. It may throws NotEmptyException if there are other instances of the
+      // same app running, which can safely ignore and return.
+      if (!delete(Constants.INSTANCES_PATH_PREFIX)) {
+        return;
+      }
+
+      // Try to delete children under /discovery. It may fail with NotEmptyException if there are other instances
+      // of the same app running that has discovery services running.
+      List<String> children = zkClient.getChildren(Constants.DISCOVERY_PATH_PREFIX)
+                                      .get(TIMEOUT_SECONDS, TimeUnit.SECONDS).getChildren();
+      List<OperationFuture<?>> deleteFutures = new ArrayList<>();
+      for (String child : children) {
+        String path = Constants.DISCOVERY_PATH_PREFIX + "/" + child;
+        LOG.info("Removing ZK path: {}{}", zkClient.getConnectString(), path);
+        deleteFutures.add(zkClient.delete(path));
+      }
+      Futures.successfulAsList(deleteFutures).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+      for (OperationFuture<?> future : deleteFutures) {
+        try {
+          future.get();
+        } catch (ExecutionException e) {
+          if (e.getCause() instanceof KeeperException.NotEmptyException) {
+            return;
+          }
+          throw e;
+        }
+      }
+
+      // Delete the /discovery. It may fail with NotEmptyException (due to race between apps),
+      // which can safely ignore and return.
+      if (!delete(Constants.DISCOVERY_PATH_PREFIX)) {
+        return;
+      }
+
+      // Delete the ZK path for the app namespace.
+      delete("/");
+    }
+
+    /**
+     * Deletes the given ZK path.
+     *
+     * @param path path to delete
+     * @return true if the path is delete, false if failed to delete due to {@link KeeperException.NotEmptyException}.
+     * @throws Exception if failed to delete
+     */
+    private boolean delete(String path) throws Exception {
+      try {
+        LOG.info("Removing ZK path: {}{}", zkClient.getConnectString(), path);
+        zkClient.delete(path).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+        return true;
+      } catch (ExecutionException e) {
+        if (e.getCause() instanceof KeeperException.NotEmptyException) {
+          return false;
+        }
+        throw e;
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index e1523d6..c376de4 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -236,7 +236,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
 
     // Creates ZK path for runnable
     zkClient.create("/" + runId.getId() + "/runnables", null, CreateMode.PERSISTENT).get();
-    runningContainers.addWatcher("/discoverable");
+    runningContainers.addWatcher(Constants.DISCOVERY_PATH_PREFIX);
     runnableContainerRequests = initContainerRequests();
   }
 
@@ -648,7 +648,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
       env.put(EnvKeys.TWILL_APP_RUN_ID, runId.getId());
       env.put(EnvKeys.TWILL_APP_NAME, twillSpec.getName());
       env.put(EnvKeys.TWILL_APP_LOG_LEVEL, System.getenv(EnvKeys.TWILL_APP_LOG_LEVEL));
-      env.put(EnvKeys.TWILL_ZK_CONNECT, zkClient.getConnectString());
+      env.put(EnvKeys.TWILL_ZK_CONNECT, System.getenv(EnvKeys.TWILL_ZK_CONNECT));
       env.put(EnvKeys.TWILL_LOG_KAFKA_ZK, getKafkaZKConnect());
 
       ProcessLauncher.PrepareLaunchContext launchContext = processLauncher.prepareLaunch(env, getLocalizeFiles(),
@@ -703,7 +703,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
     return String.format("/%s/runnables/%s", runId.getId(), runnableName);
   }
 
-  private String getKafkaZKConnect() {
+  String getKafkaZKConnect() {
     return String.format("%s/%s/kafka", zkClient.getConnectString(), runId.getId());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
index 51837a7..3ea786a 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -18,7 +18,6 @@
 package org.apache.twill.internal.container;
 
 import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.io.Files;
 import com.google.common.util.concurrent.AbstractService;
@@ -28,9 +27,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.twill.api.LocalFile;
 import org.apache.twill.api.RunId;
-import org.apache.twill.api.RuntimeSpecification;
 import org.apache.twill.api.TwillRunnableSpecification;
 import org.apache.twill.api.TwillSpecification;
 import org.apache.twill.discovery.ZKDiscoveryService;
@@ -80,7 +77,7 @@ public final class TwillContainerMain extends ServiceMain {
     int instanceId = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_ID));
     int instanceCount = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_COUNT));
 
-    ZKClientService zkClientService = createZKClient(zkConnectStr);
+    ZKClientService zkClientService = createZKClient(zkConnectStr, System.getenv(EnvKeys.TWILL_APP_NAME));
     ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClientService);
 
     ZKClient appRunZkClient = getAppRunZKClient(zkClientService, appRunId);

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index d4edfeb..d04cdab 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -116,7 +116,7 @@ final class YarnTwillPreparer implements TwillPreparer {
   private final YarnConfiguration yarnConfig;
   private final TwillSpecification twillSpec;
   private final YarnAppClient yarnAppClient;
-  private final ZKClient zkClient;
+  private final String zkConnectString;
   private final LocationFactory locationFactory;
   private final YarnTwillControllerFactory controllerFactory;
   private final RunId runId;
@@ -139,13 +139,13 @@ final class YarnTwillPreparer implements TwillPreparer {
   private LogEntry.Level logLevel;
 
   YarnTwillPreparer(YarnConfiguration yarnConfig, TwillSpecification twillSpec,
-                    YarnAppClient yarnAppClient, ZKClient zkClient,
+                    YarnAppClient yarnAppClient, String zkConnectString,
                     LocationFactory locationFactory, String extraOptions, LogEntry.Level logLevel,
                     YarnTwillControllerFactory controllerFactory) {
     this.yarnConfig = yarnConfig;
     this.twillSpec = twillSpec;
     this.yarnAppClient = yarnAppClient;
-    this.zkClient = ZKClients.namespace(zkClient, "/" + twillSpec.getName());
+    this.zkConnectString = zkConnectString;
     this.locationFactory = locationFactory;
     this.controllerFactory = controllerFactory;
     this.runId = RunIds.generate();
@@ -345,7 +345,7 @@ final class YarnTwillPreparer implements TwillPreparer {
           ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder()
             .put(EnvKeys.TWILL_FS_USER, fsUser)
             .put(EnvKeys.TWILL_APP_DIR, getAppLocation().toURI().toASCIIString())
-            .put(EnvKeys.TWILL_ZK_CONNECT, zkClient.getConnectString())
+            .put(EnvKeys.TWILL_ZK_CONNECT, zkConnectString)
             .put(EnvKeys.TWILL_RUN_ID, runId.getId())
             .put(EnvKeys.TWILL_RESERVED_MEMORY_MB, Integer.toString(reservedMemory))
             .put(EnvKeys.TWILL_APP_NAME, twillSpec.getName())

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
index 8a13017..c5853d6 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
@@ -277,8 +277,8 @@ public final class YarnTwillRunnerService implements TwillRunnerService {
     final TwillSpecification twillSpec = application.configure();
     final String appName = twillSpec.getName();
 
-    return new YarnTwillPreparer(yarnConfig, twillSpec, yarnAppClient, zkClientService, locationFactory, jvmOptions,
-                                 LogEntry.Level.INFO, new YarnTwillControllerFactory() {
+    return new YarnTwillPreparer(yarnConfig, twillSpec, yarnAppClient, zkClientService.getConnectString(),
+                                 locationFactory, jvmOptions, LogEntry.Level.INFO, new YarnTwillControllerFactory() {
       @Override
       public YarnTwillController create(RunId runId, Iterable<LogHandler> logHandlers,
                                         Callable<ProcessController<YarnApplicationReport>> startUp) {