You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2016/01/26 20:16:26 UTC
[08/22] incubator-twill git commit: (TWILL-131) Remove ZK node when
application finished.
(TWILL-131) Remove ZK node when application finished.
- Remove the application ZK node when the application terminates
This closes #70 on Github
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/e4a36762
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/e4a36762
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/e4a36762
Branch: refs/heads/site
Commit: e4a36762e8df34aa4971e29863714f199cb8ddcd
Parents: f6d2b6c
Author: Terence Yim <ch...@apache.org>
Authored: Wed Oct 14 13:39:32 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Oct 20 08:47:20 2015 -0700
----------------------------------------------------------------------
.../org/apache/twill/internal/Constants.java | 83 ++++++++++++++++++
.../twill/internal/AbstractTwillService.java | 2 +-
.../internal/AbstractZKServiceController.java | 3 +-
.../org/apache/twill/internal/Constants.java | 76 -----------------
.../twill/discovery/ZKDiscoveryService.java | 4 +-
.../twill/discovery/ZKDiscoveryServiceTest.java | 2 +-
.../org/apache/twill/internal/ServiceMain.java | 19 +++--
.../appmaster/ApplicationMasterMain.java | 89 +++++++++++++++++++-
.../appmaster/ApplicationMasterService.java | 6 +-
.../internal/container/TwillContainerMain.java | 5 +-
.../apache/twill/yarn/YarnTwillPreparer.java | 8 +-
.../twill/yarn/YarnTwillRunnerService.java | 4 +-
.../org/apache/twill/yarn/BaseYarnTest.java | 4 +
.../apache/twill/yarn/EchoServerTestRun.java | 68 +++++++++++++--
.../java/org/apache/twill/yarn/TwillTester.java | 4 +
15 files changed, 266 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-common/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-common/src/main/java/org/apache/twill/internal/Constants.java b/twill-common/src/main/java/org/apache/twill/internal/Constants.java
new file mode 100644
index 0000000..dd04eb1
--- /dev/null
+++ b/twill-common/src/main/java/org/apache/twill/internal/Constants.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+/**
+ * This class contains collection of common constants used in Twill.
+ */
+public final class Constants {
+
+ public static final String LOG_TOPIC = "log";
+
+ /** Maximum number of seconds for AM to start. */
+ public static final int APPLICATION_MAX_START_SECONDS = 60;
+ /** Maximum number of seconds for AM to stop. */
+ public static final int APPLICATION_MAX_STOP_SECONDS = 60;
+
+ public static final long PROVISION_TIMEOUT = 30000;
+
+ /**
+ * Milliseconds AM should wait for RM to allocate a constrained provision request.
+ * On timeout, AM relaxes the request constraints.
+ */
+ public static final int CONSTRAINED_PROVISION_REQUEST_TIMEOUT = 5000;
+
+ public static final double HEAP_MIN_RATIO = 0.7d;
+
+ /** Memory size of AM. */
+ public static final int APP_MASTER_MEMORY_MB = 512;
+
+ public static final int APP_MASTER_RESERVED_MEMORY_MB = 150;
+
+ public static final String CLASSPATH = "classpath";
+ public static final String APPLICATION_CLASSPATH = "application-classpath";
+
+ /** Command names for the restart runnable instances. */
+ public static final String RESTART_ALL_RUNNABLE_INSTANCES = "restartAllRunnableInstances";
+ public static final String RESTART_RUNNABLES_INSTANCES = "restartRunnablesInstances";
+
+ /**
+ * Common ZK paths constants
+ */
+ public static final String DISCOVERY_PATH_PREFIX = "/discoverable";
+ public static final String INSTANCES_PATH_PREFIX = "/instances";
+
+
+ /**
+ * Constants for names of internal files that are shared between client, AM and containers.
+ */
+ public static final class Files {
+
+ public static final String LAUNCHER_JAR = "launcher.jar";
+ public static final String APP_MASTER_JAR = "appMaster.jar";
+ public static final String CONTAINER_JAR = "container.jar";
+ public static final String LOCALIZE_FILES = "localizeFiles.json";
+ public static final String TWILL_SPEC = "twillSpec.json";
+ public static final String ARGUMENTS = "arguments.json";
+ public static final String ENVIRONMENTS = "environments.json";
+ public static final String LOGBACK_TEMPLATE = "logback-template.xml";
+ public static final String JVM_OPTIONS = "jvm.opts";
+ public static final String CREDENTIALS = "credentials.store";
+
+ private Files() {
+ }
+ }
+
+ private Constants() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
index 2f95e0e..8688d0b 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
@@ -359,7 +359,7 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
}
private String getLiveNodePath() {
- return "/instances/" + runId.getId();
+ return String.format("%s/%s", Constants.INSTANCES_PATH_PREFIX, runId.getId());
}
private <T> byte[] toJson(T obj) {
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
index 0cf92ea..9b30823 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
@@ -21,7 +21,6 @@ import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-
import org.apache.twill.api.Command;
import org.apache.twill.api.RunId;
import org.apache.twill.api.ServiceController;
@@ -239,7 +238,7 @@ public abstract class AbstractZKServiceController extends AbstractExecutionServi
* Returns the zookeeper node path for the ephemeral instance node for this runId.
*/
protected final String getInstancePath() {
- return String.format("/instances/%s", getRunId().getId());
+ return String.format("%s/%s", Constants.INSTANCES_PATH_PREFIX, getRunId().getId());
}
private String getZKPath(String path) {
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-core/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Constants.java b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
deleted file mode 100644
index 39de851..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/Constants.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-/**
- * This class contains collection of common constants used in Twill.
- */
-public final class Constants {
-
- public static final String LOG_TOPIC = "log";
-
- /** Maximum number of seconds for AM to start. */
- public static final int APPLICATION_MAX_START_SECONDS = 60;
- /** Maximum number of seconds for AM to stop. */
- public static final int APPLICATION_MAX_STOP_SECONDS = 60;
-
- public static final long PROVISION_TIMEOUT = 30000;
-
- /**
- * Milliseconds AM should wait for RM to allocate a constrained provision request.
- * On timeout, AM relaxes the request constraints.
- */
- public static final int CONSTRAINED_PROVISION_REQUEST_TIMEOUT = 5000;
-
- public static final double HEAP_MIN_RATIO = 0.7d;
-
- /** Memory size of AM. */
- public static final int APP_MASTER_MEMORY_MB = 512;
-
- public static final int APP_MASTER_RESERVED_MEMORY_MB = 150;
-
- public static final String CLASSPATH = "classpath";
- public static final String APPLICATION_CLASSPATH = "application-classpath";
-
- /** Command names for the restart runnable instances. */
- public static final String RESTART_ALL_RUNNABLE_INSTANCES = "restartAllRunnableInstances";
- public static final String RESTART_RUNNABLES_INSTANCES = "restartRunnablesInstances";
-
- /**
- * Constants for names of internal files that are shared between client, AM and containers.
- */
- public static final class Files {
-
- public static final String LAUNCHER_JAR = "launcher.jar";
- public static final String APP_MASTER_JAR = "appMaster.jar";
- public static final String CONTAINER_JAR = "container.jar";
- public static final String LOCALIZE_FILES = "localizeFiles.json";
- public static final String TWILL_SPEC = "twillSpec.json";
- public static final String ARGUMENTS = "arguments.json";
- public static final String ENVIRONMENTS = "environments.json";
- public static final String LOGBACK_TEMPLATE = "logback-template.xml";
- public static final String JVM_OPTIONS = "jvm.opts";
- public static final String CREDENTIALS = "credentials.store";
-
- private Files() {
- }
- }
-
- private Constants() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
index 3f0db34..c563bab 100644
--- a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
@@ -31,6 +31,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
+import org.apache.twill.internal.Constants;
import org.apache.twill.zookeeper.NodeChildren;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.OperationFuture;
@@ -94,7 +95,6 @@ import java.util.concurrent.locks.ReentrantLock;
*/
public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceClient {
private static final Logger LOG = LoggerFactory.getLogger(ZKDiscoveryService.class);
- private static final String NAMESPACE = "/discoverable";
private static final long RETRY_MILLIS = 1000;
@@ -112,7 +112,7 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
* @param zkClient The {@link ZKClient} for interacting with zookeeper.
*/
public ZKDiscoveryService(ZKClient zkClient) {
- this(zkClient, NAMESPACE);
+ this(zkClient, Constants.DISCOVERY_PATH_PREFIX);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
index 7707c5b..7d6e369 100644
--- a/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
+++ b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
@@ -63,7 +63,7 @@ public class ZKDiscoveryServiceTest extends DiscoveryServiceTestBase {
zkServer.stopAndWait();
}
- @Test (timeout = 10000)
+ @Test (timeout = 30000)
public void testDoubleRegister() throws Exception {
Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
DiscoveryService discoveryService = entry.getKey();
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
index a6af3d3..cafd375 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
@@ -157,12 +157,16 @@ public abstract class ServiceMain {
/**
* Creates a {@link ZKClientService}.
*/
- protected static ZKClientService createZKClient(String zkConnectStr) {
+ protected static ZKClientService createZKClient(String zkConnectStr, String appName) {
return ZKClientServices.delegate(
- ZKClients.reWatchOnExpire(
- ZKClients.retryOnFailure(
- ZKClientService.Builder.of(zkConnectStr).build(),
- RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
+ ZKClients.namespace(
+ ZKClients.reWatchOnExpire(
+ ZKClients.retryOnFailure(
+ ZKClientService.Builder.of(zkConnectStr).build(),
+ RetryStrategies.fixDelay(1, TimeUnit.SECONDS)
+ )
+ ), "/" + appName
+ ));
}
private void configureLogger() {
@@ -256,10 +260,11 @@ public abstract class ServiceMain {
/**
* A simple service for creating/remove ZK paths needed for {@link AbstractTwillService}.
*/
- protected static final class TwillZKPathService extends AbstractIdleService {
+ protected static class TwillZKPathService extends AbstractIdleService {
+
+ protected static final long TIMEOUT_SECONDS = 5L;
private static final Logger LOG = LoggerFactory.getLogger(TwillZKPathService.class);
- private static final long TIMEOUT_SECONDS = 5L;
private final ZKClient zkClient;
private final String path;
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index 3e8cb93..38a2463 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -19,6 +19,7 @@ package org.apache.twill.internal.appmaster;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Futures;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -32,6 +33,7 @@ import org.apache.twill.internal.logging.Loggings;
import org.apache.twill.internal.utils.Networks;
import org.apache.twill.internal.yarn.VersionDetectYarnAMClientFactory;
import org.apache.twill.internal.yarn.YarnAMClient;
+import org.apache.twill.zookeeper.OperationFuture;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.twill.zookeeper.ZKOperations;
@@ -43,7 +45,10 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
@@ -65,7 +70,7 @@ public final class ApplicationMasterMain extends ServiceMain {
File twillSpec = new File(Constants.Files.TWILL_SPEC);
RunId runId = RunIds.fromString(System.getenv(EnvKeys.TWILL_RUN_ID));
- ZKClientService zkClientService = createZKClient(zkConnect);
+ ZKClientService zkClientService = createZKClient(zkConnect, System.getenv(EnvKeys.TWILL_APP_NAME));
Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
setRMSchedulerAddress(conf);
@@ -74,12 +79,12 @@ public final class ApplicationMasterMain extends ServiceMain {
twillSpec, amClient, createAppLocation(conf));
TrackerService trackerService = new TrackerService(service);
- new ApplicationMasterMain(String.format("%s/%s/kafka", zkConnect, runId.getId()))
+ new ApplicationMasterMain(service.getKafkaZKConnect())
.doMain(
service,
new YarnAMClientService(amClient, trackerService),
zkClientService,
- new TwillZKPathService(zkClientService, runId),
+ new AppMasterTwillZKPathService(zkClientService, runId),
new ApplicationKafkaService(zkClientService, runId)
);
}
@@ -229,4 +234,82 @@ public final class ApplicationMasterMain extends ServiceMain {
}
}
}
+
+ private static final class AppMasterTwillZKPathService extends TwillZKPathService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AppMasterTwillZKPathService.class);
+ private final ZKClient zkClient;
+
+ public AppMasterTwillZKPathService(ZKClient zkClient, RunId runId) {
+ super(zkClient, runId);
+ this.zkClient = zkClient;
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ super.shutDown();
+
+ // Deletes ZK nodes created for the application execution.
+ // We don't have to worry about a race condition if another instance of the same app starts at the same time
+ // as when removal is performed. This is because we always create nodes with "createParent == true",
+ // which takes care of the parent node recreation if it is removed from here.
+
+ // Try to delete the /instances path. It may throws NotEmptyException if there are other instances of the
+ // same app running, which we can safely ignore and return.
+ if (!delete(Constants.INSTANCES_PATH_PREFIX)) {
+ return;
+ }
+
+ // Try to delete children under /discovery. It may fail with NotEmptyException if there are other instances
+ // of the same app running that has discovery services running.
+ List<String> children = zkClient.getChildren(Constants.DISCOVERY_PATH_PREFIX)
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS).getChildren();
+ List<OperationFuture<?>> deleteFutures = new ArrayList<>();
+ for (String child : children) {
+ String path = Constants.DISCOVERY_PATH_PREFIX + "/" + child;
+ LOG.info("Removing ZK path: {}{}", zkClient.getConnectString(), path);
+ deleteFutures.add(zkClient.delete(path));
+ }
+ Futures.successfulAsList(deleteFutures).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ for (OperationFuture<?> future : deleteFutures) {
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof KeeperException.NotEmptyException) {
+ return;
+ }
+ throw e;
+ }
+ }
+
+ // Delete the /discovery. It may fail with NotEmptyException (due to race between apps),
+ // which can safely ignore and return.
+ if (!delete(Constants.DISCOVERY_PATH_PREFIX)) {
+ return;
+ }
+
+ // Delete the ZK path for the app namespace.
+ delete("/");
+ }
+
+ /**
+ * Deletes the given ZK path.
+ *
+ * @param path path to delete
+ * @return true if the path was deleted, false if failed to delete due to {@link KeeperException.NotEmptyException}.
+ * @throws Exception if failed to delete the path
+ */
+ private boolean delete(String path) throws Exception {
+ try {
+ LOG.info("Removing ZK path: {}{}", zkClient.getConnectString(), path);
+ zkClient.delete(path).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ return true;
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof KeeperException.NotEmptyException) {
+ return false;
+ }
+ throw e;
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index e1523d6..c376de4 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -236,7 +236,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
// Creates ZK path for runnable
zkClient.create("/" + runId.getId() + "/runnables", null, CreateMode.PERSISTENT).get();
- runningContainers.addWatcher("/discoverable");
+ runningContainers.addWatcher(Constants.DISCOVERY_PATH_PREFIX);
runnableContainerRequests = initContainerRequests();
}
@@ -648,7 +648,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
env.put(EnvKeys.TWILL_APP_RUN_ID, runId.getId());
env.put(EnvKeys.TWILL_APP_NAME, twillSpec.getName());
env.put(EnvKeys.TWILL_APP_LOG_LEVEL, System.getenv(EnvKeys.TWILL_APP_LOG_LEVEL));
- env.put(EnvKeys.TWILL_ZK_CONNECT, zkClient.getConnectString());
+ env.put(EnvKeys.TWILL_ZK_CONNECT, System.getenv(EnvKeys.TWILL_ZK_CONNECT));
env.put(EnvKeys.TWILL_LOG_KAFKA_ZK, getKafkaZKConnect());
ProcessLauncher.PrepareLaunchContext launchContext = processLauncher.prepareLaunch(env, getLocalizeFiles(),
@@ -703,7 +703,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
return String.format("/%s/runnables/%s", runId.getId(), runnableName);
}
- private String getKafkaZKConnect() {
+ String getKafkaZKConnect() {
return String.format("%s/%s/kafka", zkClient.getConnectString(), runId.getId());
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
index 51837a7..3ea786a 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -18,7 +18,6 @@
package org.apache.twill.internal.container;
import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.io.Files;
import com.google.common.util.concurrent.AbstractService;
@@ -28,9 +27,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.twill.api.LocalFile;
import org.apache.twill.api.RunId;
-import org.apache.twill.api.RuntimeSpecification;
import org.apache.twill.api.TwillRunnableSpecification;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.discovery.ZKDiscoveryService;
@@ -80,7 +77,7 @@ public final class TwillContainerMain extends ServiceMain {
int instanceId = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_ID));
int instanceCount = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_COUNT));
- ZKClientService zkClientService = createZKClient(zkConnectStr);
+ ZKClientService zkClientService = createZKClient(zkConnectStr, System.getenv(EnvKeys.TWILL_APP_NAME));
ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClientService);
ZKClient appRunZkClient = getAppRunZKClient(zkClientService, appRunId);
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index d4edfeb..d04cdab 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -116,7 +116,7 @@ final class YarnTwillPreparer implements TwillPreparer {
private final YarnConfiguration yarnConfig;
private final TwillSpecification twillSpec;
private final YarnAppClient yarnAppClient;
- private final ZKClient zkClient;
+ private final String zkConnectString;
private final LocationFactory locationFactory;
private final YarnTwillControllerFactory controllerFactory;
private final RunId runId;
@@ -139,13 +139,13 @@ final class YarnTwillPreparer implements TwillPreparer {
private LogEntry.Level logLevel;
YarnTwillPreparer(YarnConfiguration yarnConfig, TwillSpecification twillSpec,
- YarnAppClient yarnAppClient, ZKClient zkClient,
+ YarnAppClient yarnAppClient, String zkConnectString,
LocationFactory locationFactory, String extraOptions, LogEntry.Level logLevel,
YarnTwillControllerFactory controllerFactory) {
this.yarnConfig = yarnConfig;
this.twillSpec = twillSpec;
this.yarnAppClient = yarnAppClient;
- this.zkClient = ZKClients.namespace(zkClient, "/" + twillSpec.getName());
+ this.zkConnectString = zkConnectString;
this.locationFactory = locationFactory;
this.controllerFactory = controllerFactory;
this.runId = RunIds.generate();
@@ -345,7 +345,7 @@ final class YarnTwillPreparer implements TwillPreparer {
ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder()
.put(EnvKeys.TWILL_FS_USER, fsUser)
.put(EnvKeys.TWILL_APP_DIR, getAppLocation().toURI().toASCIIString())
- .put(EnvKeys.TWILL_ZK_CONNECT, zkClient.getConnectString())
+ .put(EnvKeys.TWILL_ZK_CONNECT, zkConnectString)
.put(EnvKeys.TWILL_RUN_ID, runId.getId())
.put(EnvKeys.TWILL_RESERVED_MEMORY_MB, Integer.toString(reservedMemory))
.put(EnvKeys.TWILL_APP_NAME, twillSpec.getName())
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
index 8a13017..c5853d6 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
@@ -277,8 +277,8 @@ public final class YarnTwillRunnerService implements TwillRunnerService {
final TwillSpecification twillSpec = application.configure();
final String appName = twillSpec.getName();
- return new YarnTwillPreparer(yarnConfig, twillSpec, yarnAppClient, zkClientService, locationFactory, jvmOptions,
- LogEntry.Level.INFO, new YarnTwillControllerFactory() {
+ return new YarnTwillPreparer(yarnConfig, twillSpec, yarnAppClient, zkClientService.getConnectString(),
+ locationFactory, jvmOptions, LogEntry.Level.INFO, new YarnTwillControllerFactory() {
@Override
public YarnTwillController create(RunId runId, Iterable<LogHandler> logHandlers,
Callable<ProcessController<YarnApplicationReport>> startUp) {
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java b/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
index b5c7f58..a9cf2ed 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
@@ -109,4 +109,8 @@ public abstract class BaseYarnTest {
public List<NodeReport> getNodeReports() throws Exception {
return TWILL_TESTER.getNodeReports();
}
+
+ public String getZKConnectionString() {
+ return TWILL_TESTER.getZKConnectionString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
index 3f0f20c..13c07b1 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
@@ -31,23 +31,22 @@ import org.apache.twill.api.TwillRunnerService;
import org.apache.twill.api.logging.PrinterLogHandler;
import org.apache.twill.common.Threads;
import org.apache.twill.discovery.Discoverable;
+import org.apache.twill.zookeeper.ZKClientService;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.Socket;
-import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
/**
@@ -58,8 +57,7 @@ public final class EchoServerTestRun extends BaseYarnTest {
private static final Logger LOG = LoggerFactory.getLogger(EchoServerTestRun.class);
@Test
- public void testEchoServer() throws InterruptedException, ExecutionException, IOException,
- URISyntaxException, TimeoutException {
+ public void testEchoServer() throws Exception {
TwillRunner runner = getTwillRunner();
TwillController controller = runner.prepare(new EchoServer(),
@@ -158,6 +156,64 @@ public final class EchoServerTestRun extends BaseYarnTest {
TimeUnit.SECONDS.sleep(2);
}
+ @Test
+ public void testZKCleanup() throws Exception {
+ ZKClientService zkClient = ZKClientService.Builder.of(getZKConnectionString() + "/twill").build();
+ zkClient.startAndWait();
+
+ try {
+ TwillRunner runner = getTwillRunner();
+
+ // Start an application and stop it.
+ TwillController controller = runner.prepare(new EchoServer())
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ .withApplicationArguments("echo")
+ .withArguments("EchoServer", "echo2")
+ .start();
+
+ Iterable<Discoverable> echoServices = controller.discoverService("echo");
+ Assert.assertTrue(waitForSize(echoServices, 1, 120));
+
+ controller.terminate().get();
+
+ // Verify the ZK node gets cleanup
+ Assert.assertNull(zkClient.exists("/EchoServer").get());
+
+ // Start two instances of the application and stop one of it
+ List<TwillController> controllers = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ controllers.add(runner.prepare(new EchoServer())
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ .withApplicationArguments("echo")
+ .withArguments("EchoServer", "echo2")
+ .start());
+ }
+
+ // There should be two instances up and running.
+ echoServices = controller.discoverService("echo");
+ Assert.assertTrue(waitForSize(echoServices, 2, 120));
+
+ // Stop one instance of the app
+ controllers.get(0).terminate().get();
+
+ // Verify the ZK node should still be there
+ Assert.assertNotNull(zkClient.exists("/EchoServer").get());
+
+ // We should still be able to do discovery, which depends on the ZK node.
+ echoServices = controller.discoverService("echo");
+ Assert.assertTrue(waitForSize(echoServices, 1, 120));
+
+ // Stop second instance of the app
+ controllers.get(1).terminate().get();
+
+ // Verify the ZK node gets cleanup
+ Assert.assertNull(zkClient.exists("/EchoServer").get());
+
+ } finally {
+ zkClient.stopAndWait();
+ }
+ }
+
/**
* Need helper method here to wait for getting resource report because {@link TwillController#getResourceReport()}
* could return null if the application has not fully started.
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
index f669b83..e604cec 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
@@ -160,6 +160,10 @@ public class TwillTester extends ExternalResource {
return yarnAppClient.getNodeReports();
}
+ public String getZKConnectionString() {
+ return zkServer.getConnectionStr();
+ }
+
private void stopQuietly(Service service) {
try {
service.stopAndWait();