You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2015/10/14 22:39:41 UTC
incubator-twill git commit: WIP
Repository: incubator-twill
Updated Branches:
refs/heads/feature/TWILL-131 [created] 1f831c299
WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/1f831c29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/1f831c29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/1f831c29
Branch: refs/heads/feature/TWILL-131
Commit: 1f831c2993bb16dd695baca5bee3436bfee4ed10
Parents: f6d2b6c
Author: Terence Yim <ch...@apache.org>
Authored: Wed Oct 14 13:39:32 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Wed Oct 14 13:39:32 2015 -0700
----------------------------------------------------------------------
.../org/apache/twill/internal/Constants.java | 83 +++++++++++++++++++
.../twill/internal/AbstractTwillService.java | 2 +-
.../internal/AbstractZKServiceController.java | 3 +-
.../org/apache/twill/internal/Constants.java | 76 ------------------
.../twill/discovery/ZKDiscoveryService.java | 4 +-
.../org/apache/twill/internal/ServiceMain.java | 19 +++--
.../appmaster/ApplicationMasterMain.java | 84 +++++++++++++++++++-
.../appmaster/ApplicationMasterService.java | 6 +-
.../internal/container/TwillContainerMain.java | 5 +-
.../apache/twill/yarn/YarnTwillPreparer.java | 8 +-
.../twill/yarn/YarnTwillRunnerService.java | 4 +-
11 files changed, 190 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-common/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-common/src/main/java/org/apache/twill/internal/Constants.java b/twill-common/src/main/java/org/apache/twill/internal/Constants.java
new file mode 100644
index 0000000..dd04eb1
--- /dev/null
+++ b/twill-common/src/main/java/org/apache/twill/internal/Constants.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+/**
+ * This class contains collection of common constants used in Twill.
+ */
+public final class Constants {
+
+ public static final String LOG_TOPIC = "log";
+
+ /** Maximum number of seconds for AM to start. */
+ public static final int APPLICATION_MAX_START_SECONDS = 60;
+ /** Maximum number of seconds for AM to stop. */
+ public static final int APPLICATION_MAX_STOP_SECONDS = 60;
+
+ public static final long PROVISION_TIMEOUT = 30000;
+
+ /**
+ * Milliseconds AM should wait for RM to allocate a constrained provision request.
+ * On timeout, AM relaxes the request constraints.
+ */
+ public static final int CONSTRAINED_PROVISION_REQUEST_TIMEOUT = 5000;
+
+ public static final double HEAP_MIN_RATIO = 0.7d;
+
+ /** Memory size of AM. */
+ public static final int APP_MASTER_MEMORY_MB = 512;
+
+ public static final int APP_MASTER_RESERVED_MEMORY_MB = 150;
+
+ public static final String CLASSPATH = "classpath";
+ public static final String APPLICATION_CLASSPATH = "application-classpath";
+
+ /** Command names for the restart runnable instances. */
+ public static final String RESTART_ALL_RUNNABLE_INSTANCES = "restartAllRunnableInstances";
+ public static final String RESTART_RUNNABLES_INSTANCES = "restartRunnablesInstances";
+
+ /**
+ * Common ZK paths constants
+ */
+ public static final String DISCOVERY_PATH_PREFIX = "/discoverable";
+ public static final String INSTANCES_PATH_PREFIX = "/instances";
+
+
+ /**
+ * Constants for names of internal files that are shared between client, AM and containers.
+ */
+ public static final class Files {
+
+ public static final String LAUNCHER_JAR = "launcher.jar";
+ public static final String APP_MASTER_JAR = "appMaster.jar";
+ public static final String CONTAINER_JAR = "container.jar";
+ public static final String LOCALIZE_FILES = "localizeFiles.json";
+ public static final String TWILL_SPEC = "twillSpec.json";
+ public static final String ARGUMENTS = "arguments.json";
+ public static final String ENVIRONMENTS = "environments.json";
+ public static final String LOGBACK_TEMPLATE = "logback-template.xml";
+ public static final String JVM_OPTIONS = "jvm.opts";
+ public static final String CREDENTIALS = "credentials.store";
+
+ private Files() {
+ }
+ }
+
+ private Constants() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
index 2f95e0e..8688d0b 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
@@ -359,7 +359,7 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
}
private String getLiveNodePath() {
- return "/instances/" + runId.getId();
+ return String.format("%s/%s", Constants.INSTANCES_PATH_PREFIX, runId.getId());
}
private <T> byte[] toJson(T obj) {
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
index 0cf92ea..9b30823 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
@@ -21,7 +21,6 @@ import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-
import org.apache.twill.api.Command;
import org.apache.twill.api.RunId;
import org.apache.twill.api.ServiceController;
@@ -239,7 +238,7 @@ public abstract class AbstractZKServiceController extends AbstractExecutionServi
* Returns the zookeeper node path for the ephemeral instance node for this runId.
*/
protected final String getInstancePath() {
- return String.format("/instances/%s", getRunId().getId());
+ return String.format("%s/%s", Constants.INSTANCES_PATH_PREFIX, getRunId().getId());
}
private String getZKPath(String path) {
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-core/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Constants.java b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
deleted file mode 100644
index 39de851..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/Constants.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-/**
- * This class contains collection of common constants used in Twill.
- */
-public final class Constants {
-
- public static final String LOG_TOPIC = "log";
-
- /** Maximum number of seconds for AM to start. */
- public static final int APPLICATION_MAX_START_SECONDS = 60;
- /** Maximum number of seconds for AM to stop. */
- public static final int APPLICATION_MAX_STOP_SECONDS = 60;
-
- public static final long PROVISION_TIMEOUT = 30000;
-
- /**
- * Milliseconds AM should wait for RM to allocate a constrained provision request.
- * On timeout, AM relaxes the request constraints.
- */
- public static final int CONSTRAINED_PROVISION_REQUEST_TIMEOUT = 5000;
-
- public static final double HEAP_MIN_RATIO = 0.7d;
-
- /** Memory size of AM. */
- public static final int APP_MASTER_MEMORY_MB = 512;
-
- public static final int APP_MASTER_RESERVED_MEMORY_MB = 150;
-
- public static final String CLASSPATH = "classpath";
- public static final String APPLICATION_CLASSPATH = "application-classpath";
-
- /** Command names for the restart runnable instances. */
- public static final String RESTART_ALL_RUNNABLE_INSTANCES = "restartAllRunnableInstances";
- public static final String RESTART_RUNNABLES_INSTANCES = "restartRunnablesInstances";
-
- /**
- * Constants for names of internal files that are shared between client, AM and containers.
- */
- public static final class Files {
-
- public static final String LAUNCHER_JAR = "launcher.jar";
- public static final String APP_MASTER_JAR = "appMaster.jar";
- public static final String CONTAINER_JAR = "container.jar";
- public static final String LOCALIZE_FILES = "localizeFiles.json";
- public static final String TWILL_SPEC = "twillSpec.json";
- public static final String ARGUMENTS = "arguments.json";
- public static final String ENVIRONMENTS = "environments.json";
- public static final String LOGBACK_TEMPLATE = "logback-template.xml";
- public static final String JVM_OPTIONS = "jvm.opts";
- public static final String CREDENTIALS = "credentials.store";
-
- private Files() {
- }
- }
-
- private Constants() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
index 3f0db34..c563bab 100644
--- a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
@@ -31,6 +31,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
+import org.apache.twill.internal.Constants;
import org.apache.twill.zookeeper.NodeChildren;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.OperationFuture;
@@ -94,7 +95,6 @@ import java.util.concurrent.locks.ReentrantLock;
*/
public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceClient {
private static final Logger LOG = LoggerFactory.getLogger(ZKDiscoveryService.class);
- private static final String NAMESPACE = "/discoverable";
private static final long RETRY_MILLIS = 1000;
@@ -112,7 +112,7 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
* @param zkClient The {@link ZKClient} for interacting with zookeeper.
*/
public ZKDiscoveryService(ZKClient zkClient) {
- this(zkClient, NAMESPACE);
+ this(zkClient, Constants.DISCOVERY_PATH_PREFIX);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
index a6af3d3..cafd375 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
@@ -157,12 +157,16 @@ public abstract class ServiceMain {
/**
* Creates a {@link ZKClientService}.
*/
- protected static ZKClientService createZKClient(String zkConnectStr) {
+ protected static ZKClientService createZKClient(String zkConnectStr, String appName) {
return ZKClientServices.delegate(
- ZKClients.reWatchOnExpire(
- ZKClients.retryOnFailure(
- ZKClientService.Builder.of(zkConnectStr).build(),
- RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
+ ZKClients.namespace(
+ ZKClients.reWatchOnExpire(
+ ZKClients.retryOnFailure(
+ ZKClientService.Builder.of(zkConnectStr).build(),
+ RetryStrategies.fixDelay(1, TimeUnit.SECONDS)
+ )
+ ), "/" + appName
+ ));
}
private void configureLogger() {
@@ -256,10 +260,11 @@ public abstract class ServiceMain {
/**
* A simple service for creating/remove ZK paths needed for {@link AbstractTwillService}.
*/
- protected static final class TwillZKPathService extends AbstractIdleService {
+ protected static class TwillZKPathService extends AbstractIdleService {
+
+ protected static final long TIMEOUT_SECONDS = 5L;
private static final Logger LOG = LoggerFactory.getLogger(TwillZKPathService.class);
- private static final long TIMEOUT_SECONDS = 5L;
private final ZKClient zkClient;
private final String path;
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index 3e8cb93..f373947 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -19,6 +19,7 @@ package org.apache.twill.internal.appmaster;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Futures;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -32,6 +33,7 @@ import org.apache.twill.internal.logging.Loggings;
import org.apache.twill.internal.utils.Networks;
import org.apache.twill.internal.yarn.VersionDetectYarnAMClientFactory;
import org.apache.twill.internal.yarn.YarnAMClient;
+import org.apache.twill.zookeeper.OperationFuture;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.twill.zookeeper.ZKOperations;
@@ -43,7 +45,10 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
@@ -65,7 +70,7 @@ public final class ApplicationMasterMain extends ServiceMain {
File twillSpec = new File(Constants.Files.TWILL_SPEC);
RunId runId = RunIds.fromString(System.getenv(EnvKeys.TWILL_RUN_ID));
- ZKClientService zkClientService = createZKClient(zkConnect);
+ ZKClientService zkClientService = createZKClient(zkConnect, System.getenv(EnvKeys.TWILL_APP_NAME));
Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
setRMSchedulerAddress(conf);
@@ -74,12 +79,12 @@ public final class ApplicationMasterMain extends ServiceMain {
twillSpec, amClient, createAppLocation(conf));
TrackerService trackerService = new TrackerService(service);
- new ApplicationMasterMain(String.format("%s/%s/kafka", zkConnect, runId.getId()))
+ new ApplicationMasterMain(service.getKafkaZKConnect())
.doMain(
service,
new YarnAMClientService(amClient, trackerService),
zkClientService,
- new TwillZKPathService(zkClientService, runId),
+ new AppMasterTwillZKPathService(zkClientService, runId),
new ApplicationKafkaService(zkClientService, runId)
);
}
@@ -229,4 +234,77 @@ public final class ApplicationMasterMain extends ServiceMain {
}
}
}
+
+ private static final class AppMasterTwillZKPathService extends TwillZKPathService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AppMasterTwillZKPathService.class);
+ private final ZKClient zkClient;
+
+ public AppMasterTwillZKPathService(ZKClient zkClient, RunId runId) {
+ super(zkClient, runId);
+ this.zkClient = zkClient;
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ super.shutDown();
+
+ // Try to delete the /instances path. It may throws NotEmptyException if there are other instances of the
+ // same app running, which can safely ignore and return.
+ if (!delete(Constants.INSTANCES_PATH_PREFIX)) {
+ return;
+ }
+
+ // Try to delete children under /discovery. It may fail with NotEmptyException if there are other instances
+ // of the same app running that has discovery services running.
+ List<String> children = zkClient.getChildren(Constants.DISCOVERY_PATH_PREFIX)
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS).getChildren();
+ List<OperationFuture<?>> deleteFutures = new ArrayList<>();
+ for (String child : children) {
+ String path = Constants.DISCOVERY_PATH_PREFIX + "/" + child;
+ LOG.info("Removing ZK path: {}{}", zkClient.getConnectString(), path);
+ deleteFutures.add(zkClient.delete(path));
+ }
+ Futures.successfulAsList(deleteFutures).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ for (OperationFuture<?> future : deleteFutures) {
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof KeeperException.NotEmptyException) {
+ return;
+ }
+ throw e;
+ }
+ }
+
+ // Delete the /discovery. It may fail with NotEmptyException (due to race between apps),
+ // which can safely ignore and return.
+ if (!delete(Constants.DISCOVERY_PATH_PREFIX)) {
+ return;
+ }
+
+ // Delete the ZK path for the app namespace.
+ delete("/");
+ }
+
+ /**
+ * Deletes the given ZK path.
+ *
+ * @param path path to delete
+ * @return true if the path is delete, false if failed to delete due to {@link KeeperException.NotEmptyException}.
+ * @throws Exception if failed to delete
+ */
+ private boolean delete(String path) throws Exception {
+ try {
+ LOG.info("Removing ZK path: {}{}", zkClient.getConnectString(), path);
+ zkClient.delete(path).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ return true;
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof KeeperException.NotEmptyException) {
+ return false;
+ }
+ throw e;
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index e1523d6..c376de4 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -236,7 +236,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
// Creates ZK path for runnable
zkClient.create("/" + runId.getId() + "/runnables", null, CreateMode.PERSISTENT).get();
- runningContainers.addWatcher("/discoverable");
+ runningContainers.addWatcher(Constants.DISCOVERY_PATH_PREFIX);
runnableContainerRequests = initContainerRequests();
}
@@ -648,7 +648,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
env.put(EnvKeys.TWILL_APP_RUN_ID, runId.getId());
env.put(EnvKeys.TWILL_APP_NAME, twillSpec.getName());
env.put(EnvKeys.TWILL_APP_LOG_LEVEL, System.getenv(EnvKeys.TWILL_APP_LOG_LEVEL));
- env.put(EnvKeys.TWILL_ZK_CONNECT, zkClient.getConnectString());
+ env.put(EnvKeys.TWILL_ZK_CONNECT, System.getenv(EnvKeys.TWILL_ZK_CONNECT));
env.put(EnvKeys.TWILL_LOG_KAFKA_ZK, getKafkaZKConnect());
ProcessLauncher.PrepareLaunchContext launchContext = processLauncher.prepareLaunch(env, getLocalizeFiles(),
@@ -703,7 +703,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
return String.format("/%s/runnables/%s", runId.getId(), runnableName);
}
- private String getKafkaZKConnect() {
+ String getKafkaZKConnect() {
return String.format("%s/%s/kafka", zkClient.getConnectString(), runId.getId());
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
index 51837a7..3ea786a 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -18,7 +18,6 @@
package org.apache.twill.internal.container;
import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.io.Files;
import com.google.common.util.concurrent.AbstractService;
@@ -28,9 +27,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.twill.api.LocalFile;
import org.apache.twill.api.RunId;
-import org.apache.twill.api.RuntimeSpecification;
import org.apache.twill.api.TwillRunnableSpecification;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.discovery.ZKDiscoveryService;
@@ -80,7 +77,7 @@ public final class TwillContainerMain extends ServiceMain {
int instanceId = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_ID));
int instanceCount = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_COUNT));
- ZKClientService zkClientService = createZKClient(zkConnectStr);
+ ZKClientService zkClientService = createZKClient(zkConnectStr, System.getenv(EnvKeys.TWILL_APP_NAME));
ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClientService);
ZKClient appRunZkClient = getAppRunZKClient(zkClientService, appRunId);
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index d4edfeb..d04cdab 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -116,7 +116,7 @@ final class YarnTwillPreparer implements TwillPreparer {
private final YarnConfiguration yarnConfig;
private final TwillSpecification twillSpec;
private final YarnAppClient yarnAppClient;
- private final ZKClient zkClient;
+ private final String zkConnectString;
private final LocationFactory locationFactory;
private final YarnTwillControllerFactory controllerFactory;
private final RunId runId;
@@ -139,13 +139,13 @@ final class YarnTwillPreparer implements TwillPreparer {
private LogEntry.Level logLevel;
YarnTwillPreparer(YarnConfiguration yarnConfig, TwillSpecification twillSpec,
- YarnAppClient yarnAppClient, ZKClient zkClient,
+ YarnAppClient yarnAppClient, String zkConnectString,
LocationFactory locationFactory, String extraOptions, LogEntry.Level logLevel,
YarnTwillControllerFactory controllerFactory) {
this.yarnConfig = yarnConfig;
this.twillSpec = twillSpec;
this.yarnAppClient = yarnAppClient;
- this.zkClient = ZKClients.namespace(zkClient, "/" + twillSpec.getName());
+ this.zkConnectString = zkConnectString;
this.locationFactory = locationFactory;
this.controllerFactory = controllerFactory;
this.runId = RunIds.generate();
@@ -345,7 +345,7 @@ final class YarnTwillPreparer implements TwillPreparer {
ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder()
.put(EnvKeys.TWILL_FS_USER, fsUser)
.put(EnvKeys.TWILL_APP_DIR, getAppLocation().toURI().toASCIIString())
- .put(EnvKeys.TWILL_ZK_CONNECT, zkClient.getConnectString())
+ .put(EnvKeys.TWILL_ZK_CONNECT, zkConnectString)
.put(EnvKeys.TWILL_RUN_ID, runId.getId())
.put(EnvKeys.TWILL_RESERVED_MEMORY_MB, Integer.toString(reservedMemory))
.put(EnvKeys.TWILL_APP_NAME, twillSpec.getName())
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
index 8a13017..c5853d6 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
@@ -277,8 +277,8 @@ public final class YarnTwillRunnerService implements TwillRunnerService {
final TwillSpecification twillSpec = application.configure();
final String appName = twillSpec.getName();
- return new YarnTwillPreparer(yarnConfig, twillSpec, yarnAppClient, zkClientService, locationFactory, jvmOptions,
- LogEntry.Level.INFO, new YarnTwillControllerFactory() {
+ return new YarnTwillPreparer(yarnConfig, twillSpec, yarnAppClient, zkClientService.getConnectString(),
+ locationFactory, jvmOptions, LogEntry.Level.INFO, new YarnTwillControllerFactory() {
@Override
public YarnTwillController create(RunId runId, Iterable<LogHandler> logHandlers,
Callable<ProcessController<YarnApplicationReport>> startUp) {