You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/11/21 22:54:27 UTC
[04/15] Initial import commit.
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
new file mode 100644
index 0000000..17425d4
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.base.Supplier;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CharStreams;
+import com.google.common.io.OutputSupplier;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.GsonBuilder;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.twill.api.EventHandlerSpecification;
+import org.apache.twill.api.LocalFile;
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.RuntimeSpecification;
+import org.apache.twill.api.SecureStore;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillPreparer;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.LogHandler;
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.filesystem.LocationFactory;
+import org.apache.twill.internal.ApplicationBundler;
+import org.apache.twill.internal.Arguments;
+import org.apache.twill.internal.Configs;
+import org.apache.twill.internal.Constants;
+import org.apache.twill.internal.DefaultLocalFile;
+import org.apache.twill.internal.DefaultRuntimeSpecification;
+import org.apache.twill.internal.DefaultTwillSpecification;
+import org.apache.twill.internal.EnvKeys;
+import org.apache.twill.internal.LogOnlyEventHandler;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.RunIds;
+import org.apache.twill.internal.appmaster.ApplicationMasterMain;
+import org.apache.twill.internal.container.TwillContainerMain;
+import org.apache.twill.internal.json.ArgumentsCodec;
+import org.apache.twill.internal.json.LocalFileCodec;
+import org.apache.twill.internal.json.TwillSpecificationAdapter;
+import org.apache.twill.internal.utils.Dependencies;
+import org.apache.twill.internal.utils.Paths;
+import org.apache.twill.internal.yarn.YarnAppClient;
+import org.apache.twill.internal.yarn.YarnApplicationReport;
+import org.apache.twill.internal.yarn.YarnUtils;
+import org.apache.twill.launcher.TwillLauncher;
+import org.apache.twill.zookeeper.ZKClient;
+import org.apache.twill.zookeeper.ZKClients;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.jar.JarEntry;
+import java.util.jar.JarOutputStream;
+
+/**
+ * Implementation for {@link TwillPreparer} to prepare and launch distributed application on Hadoop YARN.
+ */
+final class YarnTwillPreparer implements TwillPreparer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(YarnTwillPreparer.class);
+ private static final String KAFKA_ARCHIVE = "kafka-0.7.2.tgz";
+
+ private final YarnConfiguration yarnConfig;
+ private final TwillSpecification twillSpec;
+ private final YarnAppClient yarnAppClient;
+ private final ZKClient zkClient;
+ private final LocationFactory locationFactory;
+ private final Supplier<String> jvmOpts;
+ private final YarnTwillControllerFactory controllerFactory;
+ private final RunId runId;
+
+ private final List<LogHandler> logHandlers = Lists.newArrayList();
+ private final List<String> arguments = Lists.newArrayList();
+ private final Set<Class<?>> dependencies = Sets.newIdentityHashSet();
+ private final List<URI> resources = Lists.newArrayList();
+ private final List<String> classPaths = Lists.newArrayList();
+ private final ListMultimap<String, String> runnableArgs = ArrayListMultimap.create();
+ private final Credentials credentials;
+ private final int reservedMemory;
+ private String user;
+
+ YarnTwillPreparer(YarnConfiguration yarnConfig, TwillSpecification twillSpec, YarnAppClient yarnAppClient,
+ ZKClient zkClient, LocationFactory locationFactory, Supplier<String> jvmOpts,
+ YarnTwillControllerFactory controllerFactory) {
+ this.yarnConfig = yarnConfig;
+ this.twillSpec = twillSpec;
+ this.yarnAppClient = yarnAppClient;
+ this.zkClient = ZKClients.namespace(zkClient, "/" + twillSpec.getName());
+ this.locationFactory = locationFactory;
+ this.jvmOpts = jvmOpts;
+ this.controllerFactory = controllerFactory;
+ this.runId = RunIds.generate();
+ this.credentials = createCredentials();
+ this.reservedMemory = yarnConfig.getInt(Configs.Keys.JAVA_RESERVED_MEMORY_MB,
+ Configs.Defaults.JAVA_RESERVED_MEMORY_MB);
+ this.user = System.getProperty("user.name");
+ }
+
+ @Override
+ public TwillPreparer addLogHandler(LogHandler handler) {
+ logHandlers.add(handler);
+ return this;
+ }
+
+ @Override
+ public TwillPreparer setUser(String user) {
+ this.user = user;
+ return this;
+ }
+
+ @Override
+ public TwillPreparer withApplicationArguments(String... args) {
+ return withApplicationArguments(ImmutableList.copyOf(args));
+ }
+
+ @Override
+ public TwillPreparer withApplicationArguments(Iterable<String> args) {
+ Iterables.addAll(arguments, args);
+ return this;
+ }
+
+ @Override
+ public TwillPreparer withArguments(String runnableName, String... args) {
+ return withArguments(runnableName, ImmutableList.copyOf(args));
+ }
+
+ @Override
+ public TwillPreparer withArguments(String runnableName, Iterable<String> args) {
+ runnableArgs.putAll(runnableName, args);
+ return this;
+ }
+
+ @Override
+ public TwillPreparer withDependencies(Class<?>... classes) {
+ return withDependencies(ImmutableList.copyOf(classes));
+ }
+
+ @Override
+ public TwillPreparer withDependencies(Iterable<Class<?>> classes) {
+ Iterables.addAll(dependencies, classes);
+ return this;
+ }
+
+ @Override
+ public TwillPreparer withResources(URI... resources) {
+ return withResources(ImmutableList.copyOf(resources));
+ }
+
+ @Override
+ public TwillPreparer withResources(Iterable<URI> resources) {
+ Iterables.addAll(this.resources, resources);
+ return this;
+ }
+
+ @Override
+ public TwillPreparer withClassPaths(String... classPaths) {
+ return withClassPaths(ImmutableList.copyOf(classPaths));
+ }
+
+ @Override
+ public TwillPreparer withClassPaths(Iterable<String> classPaths) {
+ Iterables.addAll(this.classPaths, classPaths);
+ return this;
+ }
+
+ @Override
+ public TwillPreparer addSecureStore(SecureStore secureStore) {
+ Object store = secureStore.getStore();
+ Preconditions.checkArgument(store instanceof Credentials, "Only Hadoop Credentials is supported.");
+ this.credentials.mergeAll((Credentials) store);
+ return this;
+ }
+
+ @Override
+ public TwillController start() {
+ try {
+ final ProcessLauncher<ApplicationId> launcher = yarnAppClient.createLauncher(user, twillSpec);
+ final ApplicationId appId = launcher.getContainerInfo();
+
+ Callable<ProcessController<YarnApplicationReport>> submitTask =
+ new Callable<ProcessController<YarnApplicationReport>>() {
+ @Override
+ public ProcessController<YarnApplicationReport> call() throws Exception {
+ String fsUser = locationFactory.getHomeLocation().getName();
+
+ // Local files needed by AM
+ Map<String, LocalFile> localFiles = Maps.newHashMap();
+ // Local files declared by runnables
+ Multimap<String, LocalFile> runnableLocalFiles = HashMultimap.create();
+
+ String vmOpts = jvmOpts.get();
+
+ createAppMasterJar(createBundler(), localFiles);
+ createContainerJar(createBundler(), localFiles);
+ populateRunnableLocalFiles(twillSpec, runnableLocalFiles);
+ saveSpecification(twillSpec, runnableLocalFiles, localFiles);
+ saveLogback(localFiles);
+ saveLauncher(localFiles);
+ saveKafka(localFiles);
+ saveVmOptions(vmOpts, localFiles);
+ saveArguments(new Arguments(arguments, runnableArgs), localFiles);
+ saveLocalFiles(localFiles, ImmutableSet.of(Constants.Files.TWILL_SPEC,
+ Constants.Files.LOGBACK_TEMPLATE,
+ Constants.Files.CONTAINER_JAR,
+ Constants.Files.LAUNCHER_JAR,
+ Constants.Files.ARGUMENTS));
+
+ LOG.debug("Submit AM container spec: {}", appId);
+ // java -Djava.io.tmpdir=tmp -cp launcher.jar:$HADOOP_CONF_DIR -XmxMemory
+ // org.apache.twill.internal.TwillLauncher
+ // appMaster.jar
+ // org.apache.twill.internal.appmaster.ApplicationMasterMain
+ // false
+ return launcher.prepareLaunch(
+ ImmutableMap.<String, String>builder()
+ .put(EnvKeys.TWILL_FS_USER, fsUser)
+ .put(EnvKeys.TWILL_APP_DIR, getAppLocation().toURI().toASCIIString())
+ .put(EnvKeys.TWILL_ZK_CONNECT, zkClient.getConnectString())
+ .put(EnvKeys.TWILL_RUN_ID, runId.getId())
+ .put(EnvKeys.TWILL_RESERVED_MEMORY_MB, Integer.toString(reservedMemory))
+ .put(EnvKeys.TWILL_APP_NAME, twillSpec.getName()).build(),
+ localFiles.values(), credentials)
+ .noResources()
+ .noEnvironment()
+ .withCommands().add(
+ "java",
+ "-Djava.io.tmpdir=tmp",
+ "-Dyarn.appId=$" + EnvKeys.YARN_APP_ID_STR,
+ "-Dtwill.app=$" + EnvKeys.TWILL_APP_NAME,
+ "-cp", Constants.Files.LAUNCHER_JAR + ":$HADOOP_CONF_DIR",
+ "-Xmx" + (Constants.APP_MASTER_MEMORY_MB - Constants.APP_MASTER_RESERVED_MEMORY_MB) + "m",
+ vmOpts,
+ TwillLauncher.class.getName(),
+ Constants.Files.APP_MASTER_JAR,
+ ApplicationMasterMain.class.getName(),
+ Boolean.FALSE.toString())
+ .redirectOutput(Constants.STDOUT)
+ .redirectError(Constants.STDERR)
+ .launch();
+ }
+ };
+
+ YarnTwillController controller = controllerFactory.create(runId, logHandlers, submitTask);
+ controller.start();
+ return controller;
+ } catch (Exception e) {
+ LOG.error("Failed to submit application {}", twillSpec.getName(), e);
+ throw Throwables.propagate(e);
+ }
+ }
+
+ private Credentials createCredentials() {
+ Credentials credentials = new Credentials();
+
+ try {
+ credentials.addAll(UserGroupInformation.getCurrentUser().getCredentials());
+
+ List<Token<?>> tokens = YarnUtils.addDelegationTokens(yarnConfig, locationFactory, credentials);
+ for (Token<?> token : tokens) {
+ LOG.debug("Delegation token acquired for {}, {}", locationFactory.getHomeLocation().toURI(), token);
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to check for secure login type. Not gathering any delegation token.", e);
+ }
+ return credentials;
+ }
+
+ private ApplicationBundler createBundler() {
+ return new ApplicationBundler(ImmutableList.<String>of());
+ }
+
+ private LocalFile createLocalFile(String name, Location location) throws IOException {
+ return createLocalFile(name, location, false);
+ }
+
+ private LocalFile createLocalFile(String name, Location location, boolean archive) throws IOException {
+ return new DefaultLocalFile(name, location.toURI(), location.lastModified(), location.length(), archive, null);
+ }
+
+ private void createAppMasterJar(ApplicationBundler bundler, Map<String, LocalFile> localFiles) throws IOException {
+ try {
+ LOG.debug("Create and copy {}", Constants.Files.APP_MASTER_JAR);
+ Location location = createTempLocation(Constants.Files.APP_MASTER_JAR);
+
+ List<Class<?>> classes = Lists.newArrayList();
+ classes.add(ApplicationMasterMain.class);
+
+ // Stuck in the yarnAppClient class to make bundler being able to pickup the right yarn-client version
+ classes.add(yarnAppClient.getClass());
+
+ // Add the TwillRunnableEventHandler class
+ if (twillSpec.getEventHandler() != null) {
+ classes.add(getClassLoader().loadClass(twillSpec.getEventHandler().getClassName()));
+ }
+
+ bundler.createBundle(location, classes);
+ LOG.debug("Done {}", Constants.Files.APP_MASTER_JAR);
+
+ localFiles.put(Constants.Files.APP_MASTER_JAR, createLocalFile(Constants.Files.APP_MASTER_JAR, location));
+ } catch (ClassNotFoundException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ private void createContainerJar(ApplicationBundler bundler, Map<String, LocalFile> localFiles) throws IOException {
+ try {
+ Set<Class<?>> classes = Sets.newIdentityHashSet();
+ classes.add(TwillContainerMain.class);
+ classes.addAll(dependencies);
+
+ ClassLoader classLoader = getClassLoader();
+ for (RuntimeSpecification spec : twillSpec.getRunnables().values()) {
+ classes.add(classLoader.loadClass(spec.getRunnableSpecification().getClassName()));
+ }
+
+ LOG.debug("Create and copy {}", Constants.Files.CONTAINER_JAR);
+ Location location = createTempLocation(Constants.Files.CONTAINER_JAR);
+ bundler.createBundle(location, classes, resources);
+ LOG.debug("Done {}", Constants.Files.CONTAINER_JAR);
+
+ localFiles.put(Constants.Files.CONTAINER_JAR, createLocalFile(Constants.Files.CONTAINER_JAR, location));
+
+ } catch (ClassNotFoundException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ /**
+ * Based on the given {@link TwillSpecification}, upload LocalFiles to Yarn Cluster.
+ * @param twillSpec The {@link TwillSpecification} for populating resource.
+ * @param localFiles A Multimap to store runnable name to transformed LocalFiles.
+ * @throws IOException
+ */
+ private void populateRunnableLocalFiles(TwillSpecification twillSpec,
+ Multimap<String, LocalFile> localFiles) throws IOException {
+
+ LOG.debug("Populating Runnable LocalFiles");
+ for (Map.Entry<String, RuntimeSpecification> entry: twillSpec.getRunnables().entrySet()) {
+ String runnableName = entry.getKey();
+ for (LocalFile localFile : entry.getValue().getLocalFiles()) {
+ Location location;
+
+ URI uri = localFile.getURI();
+ if ("hdfs".equals(uri.getScheme())) {
+ // Assuming the location factory is HDFS one. If it is not, it will failed, which is the correct behavior.
+ location = locationFactory.create(uri);
+ } else {
+ URL url = uri.toURL();
+ LOG.debug("Create and copy {} : {}", runnableName, url);
+ // Preserves original suffix for expansion.
+ location = copyFromURL(url, createTempLocation(Paths.appendSuffix(url.getFile(), localFile.getName())));
+ LOG.debug("Done {} : {}", runnableName, url);
+ }
+
+ localFiles.put(runnableName,
+ new DefaultLocalFile(localFile.getName(), location.toURI(), location.lastModified(),
+ location.length(), localFile.isArchive(), localFile.getPattern()));
+ }
+ }
+ LOG.debug("Done Runnable LocalFiles");
+ }
+
+ private void saveSpecification(TwillSpecification spec, final Multimap<String, LocalFile> runnableLocalFiles,
+ Map<String, LocalFile> localFiles) throws IOException {
+ // Rewrite LocalFiles inside twillSpec
+ Map<String, RuntimeSpecification> runtimeSpec = Maps.transformEntries(
+ spec.getRunnables(), new Maps.EntryTransformer<String, RuntimeSpecification, RuntimeSpecification>() {
+ @Override
+ public RuntimeSpecification transformEntry(String key, RuntimeSpecification value) {
+ return new DefaultRuntimeSpecification(value.getName(), value.getRunnableSpecification(),
+ value.getResourceSpecification(), runnableLocalFiles.get(key));
+ }
+ });
+
+ // Serialize into a local temp file.
+ LOG.debug("Create and copy {}", Constants.Files.TWILL_SPEC);
+ Location location = createTempLocation(Constants.Files.TWILL_SPEC);
+ Writer writer = new OutputStreamWriter(location.getOutputStream(), Charsets.UTF_8);
+ try {
+ EventHandlerSpecification eventHandler = spec.getEventHandler();
+ if (eventHandler == null) {
+ eventHandler = new LogOnlyEventHandler().configure();
+ }
+
+ TwillSpecificationAdapter.create().toJson(
+ new DefaultTwillSpecification(spec.getName(), runtimeSpec, spec.getOrders(), eventHandler),
+ writer);
+ } finally {
+ writer.close();
+ }
+ LOG.debug("Done {}", Constants.Files.TWILL_SPEC);
+
+ localFiles.put(Constants.Files.TWILL_SPEC, createLocalFile(Constants.Files.TWILL_SPEC, location));
+ }
+
+ private void saveLogback(Map<String, LocalFile> localFiles) throws IOException {
+ LOG.debug("Create and copy {}", Constants.Files.LOGBACK_TEMPLATE);
+ Location location = copyFromURL(getClass().getClassLoader().getResource(Constants.Files.LOGBACK_TEMPLATE),
+ createTempLocation(Constants.Files.LOGBACK_TEMPLATE));
+ LOG.debug("Done {}", Constants.Files.LOGBACK_TEMPLATE);
+
+ localFiles.put(Constants.Files.LOGBACK_TEMPLATE, createLocalFile(Constants.Files.LOGBACK_TEMPLATE, location));
+ }
+
+ /**
+ * Creates the launcher.jar for launch the main application.
+ */
+ private void saveLauncher(Map<String, LocalFile> localFiles) throws URISyntaxException, IOException {
+
+ LOG.debug("Create and copy {}", Constants.Files.LAUNCHER_JAR);
+ Location location = createTempLocation(Constants.Files.LAUNCHER_JAR);
+
+ final String launcherName = TwillLauncher.class.getName();
+
+ // Create a jar file with the TwillLauncher optionally a json serialized classpath.json in it.
+ final JarOutputStream jarOut = new JarOutputStream(location.getOutputStream());
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ if (classLoader == null) {
+ classLoader = getClass().getClassLoader();
+ }
+ Dependencies.findClassDependencies(classLoader, new Dependencies.ClassAcceptor() {
+ @Override
+ public boolean accept(String className, URL classUrl, URL classPathUrl) {
+ Preconditions.checkArgument(className.startsWith(launcherName),
+ "Launcher jar should not have dependencies: %s", className);
+ try {
+ jarOut.putNextEntry(new JarEntry(className.replace('.', '/') + ".class"));
+ InputStream is = classUrl.openStream();
+ try {
+ ByteStreams.copy(is, jarOut);
+ } finally {
+ is.close();
+ }
+ } catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+ return true;
+ }
+ }, TwillLauncher.class.getName());
+
+ try {
+ if (!classPaths.isEmpty()) {
+ jarOut.putNextEntry(new JarEntry("classpath"));
+ jarOut.write(Joiner.on(':').join(classPaths).getBytes(Charsets.UTF_8));
+ }
+ } finally {
+ jarOut.close();
+ }
+ LOG.debug("Done {}", Constants.Files.LAUNCHER_JAR);
+
+ localFiles.put(Constants.Files.LAUNCHER_JAR, createLocalFile(Constants.Files.LAUNCHER_JAR, location));
+ }
+
+ private void saveKafka(Map<String, LocalFile> localFiles) throws IOException {
+ LOG.debug("Copy {}", Constants.Files.KAFKA);
+ Location location = copyFromURL(getClass().getClassLoader().getResource(KAFKA_ARCHIVE),
+ createTempLocation(Constants.Files.KAFKA));
+ LOG.debug("Done {}", Constants.Files.KAFKA);
+
+ localFiles.put(Constants.Files.KAFKA, createLocalFile(Constants.Files.KAFKA, location, true));
+ }
+
+ private void saveVmOptions(String opts, Map<String, LocalFile> localFiles) throws IOException {
+ if (opts.isEmpty()) {
+ // If no vm options, no need to localize the file.
+ return;
+ }
+ LOG.debug("Copy {}", Constants.Files.JVM_OPTIONS);
+ final Location location = createTempLocation(Constants.Files.JVM_OPTIONS);
+ CharStreams.write(opts, new OutputSupplier<Writer>() {
+ @Override
+ public Writer getOutput() throws IOException {
+ return new OutputStreamWriter(location.getOutputStream(), Charsets.UTF_8);
+ }
+ });
+ LOG.debug("Done {}", Constants.Files.JVM_OPTIONS);
+
+ localFiles.put(Constants.Files.JVM_OPTIONS, createLocalFile(Constants.Files.JVM_OPTIONS, location));
+ }
+
+ private void saveArguments(Arguments arguments, Map<String, LocalFile> localFiles) throws IOException {
+ LOG.debug("Create and copy {}", Constants.Files.ARGUMENTS);
+ final Location location = createTempLocation(Constants.Files.ARGUMENTS);
+ ArgumentsCodec.encode(arguments, new OutputSupplier<Writer>() {
+ @Override
+ public Writer getOutput() throws IOException {
+ return new OutputStreamWriter(location.getOutputStream(), Charsets.UTF_8);
+ }
+ });
+ LOG.debug("Done {}", Constants.Files.ARGUMENTS);
+
+ localFiles.put(Constants.Files.ARGUMENTS, createLocalFile(Constants.Files.ARGUMENTS, location));
+ }
+
+ /**
+ * Serializes the list of files that needs to localize from AM to Container.
+ */
+ private void saveLocalFiles(Map<String, LocalFile> localFiles, Set<String> includes) throws IOException {
+ Map<String, LocalFile> localize = ImmutableMap.copyOf(Maps.filterKeys(localFiles, Predicates.in(includes)));
+ LOG.debug("Create and copy {}", Constants.Files.LOCALIZE_FILES);
+ Location location = createTempLocation(Constants.Files.LOCALIZE_FILES);
+ Writer writer = new OutputStreamWriter(location.getOutputStream(), Charsets.UTF_8);
+ try {
+ new GsonBuilder().registerTypeAdapter(LocalFile.class, new LocalFileCodec())
+ .create().toJson(localize.values(), new TypeToken<List<LocalFile>>() {
+ }.getType(), writer);
+ } finally {
+ writer.close();
+ }
+ LOG.debug("Done {}", Constants.Files.LOCALIZE_FILES);
+ localFiles.put(Constants.Files.LOCALIZE_FILES, createLocalFile(Constants.Files.LOCALIZE_FILES, location));
+ }
+
+ private Location copyFromURL(URL url, Location target) throws IOException {
+ InputStream is = url.openStream();
+ try {
+ OutputStream os = new BufferedOutputStream(target.getOutputStream());
+ try {
+ ByteStreams.copy(is, os);
+ } finally {
+ os.close();
+ }
+ } finally {
+ is.close();
+ }
+ return target;
+ }
+
+ private Location createTempLocation(String fileName) {
+ String name;
+ String suffix = Paths.getExtension(fileName);
+
+ name = fileName.substring(0, fileName.length() - suffix.length() - 1);
+
+ try {
+ return getAppLocation().append(name).getTempFile('.' + suffix);
+ } catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ private Location getAppLocation() {
+ return locationFactory.create(String.format("/%s/%s", twillSpec.getName(), runId.getId()));
+ }
+
+ /**
+ * Returns the context ClassLoader if there is any, otherwise, returns ClassLoader of this class.
+ */
+ private ClassLoader getClassLoader() {
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ return classLoader == null ? getClass().getClassLoader() : classLoader;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
new file mode 100644
index 0000000..9335465
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
@@ -0,0 +1,583 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.SecureStore;
+import org.apache.twill.api.SecureStoreUpdater;
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillPreparer;
+import org.apache.twill.api.TwillRunnable;
+import org.apache.twill.api.TwillRunnerService;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.LogHandler;
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.common.ServiceListenerAdapter;
+import org.apache.twill.common.Threads;
+import org.apache.twill.filesystem.HDFSLocationFactory;
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.filesystem.LocationFactory;
+import org.apache.twill.internal.Constants;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.RunIds;
+import org.apache.twill.internal.SingleRunnableApplication;
+import org.apache.twill.internal.appmaster.ApplicationMasterLiveNodeData;
+import org.apache.twill.internal.yarn.VersionDetectYarnAppClientFactory;
+import org.apache.twill.internal.yarn.YarnAppClient;
+import org.apache.twill.internal.yarn.YarnApplicationReport;
+import org.apache.twill.internal.yarn.YarnUtils;
+import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClient;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
+import org.apache.twill.zookeeper.ZKOperations;
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Suppliers;
+import com.google.common.base.Throwables;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableTable;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Table;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Callables;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * An implementation of {@link org.apache.twill.api.TwillRunnerService} that runs application on a YARN cluster.
+ */
+public final class YarnTwillRunnerService extends AbstractIdleService implements TwillRunnerService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(YarnTwillRunnerService.class);
+
+ private static final int ZK_TIMEOUT = 10000;
+ private static final Function<String, RunId> STRING_TO_RUN_ID = new Function<String, RunId>() {
+ @Override
+ public RunId apply(String input) {
+ return RunIds.fromString(input);
+ }
+ };
+ private static final Function<YarnTwillController, TwillController> CAST_CONTROLLER =
+ new Function<YarnTwillController, TwillController>() {
+ @Override
+ public TwillController apply(YarnTwillController controller) {
+ return controller;
+ }
+ };
+
+ private final YarnConfiguration yarnConfig;
+ private final YarnAppClient yarnAppClient;
+ private final ZKClientService zkClientService;
+ private final LocationFactory locationFactory;
+ private final Table<String, RunId, YarnTwillController> controllers;
+ private ScheduledExecutorService secureStoreScheduler;
+
+ private Iterable<LiveInfo> liveInfos;
+ private Cancellable watchCancellable;
+ private volatile String jvmOptions = "";
+
+ public YarnTwillRunnerService(YarnConfiguration config, String zkConnect) {
+ this(config, zkConnect, new HDFSLocationFactory(getFileSystem(config), "/twill"));
+ }
+
+ public YarnTwillRunnerService(YarnConfiguration config, String zkConnect, LocationFactory locationFactory) {
+ this.yarnConfig = config;
+ this.yarnAppClient = new VersionDetectYarnAppClientFactory().create(config);
+ this.locationFactory = locationFactory;
+ this.zkClientService = getZKClientService(zkConnect);
+ this.controllers = HashBasedTable.create();
+ }
+
+ /**
+ * This methods sets the extra JVM options that will be passed to the java command line for every application
+ * started through this {@link YarnTwillRunnerService} instance. It only affects applications that are started
+ * after options is set.
+ *
+ * This is intended for advance usage. All options will be passed unchanged to the java command line. Invalid
+ * options could cause application not able to start.
+ *
+ * @param options extra JVM options.
+ */
+ public void setJVMOptions(String options) {
+ Preconditions.checkArgument(options != null, "JVM options cannot be null.");
+ this.jvmOptions = options;
+ }
+
+ @Override
+ public Cancellable scheduleSecureStoreUpdate(final SecureStoreUpdater updater,
+ long initialDelay, long delay, TimeUnit unit) {
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ return new Cancellable() {
+ @Override
+ public void cancel() {
+ // No-op
+ }
+ };
+ }
+
+ synchronized (this) {
+ if (secureStoreScheduler == null) {
+ secureStoreScheduler = Executors.newSingleThreadScheduledExecutor(
+ Threads.createDaemonThreadFactory("secure-store-updater"));
+ }
+ }
+
+ final ScheduledFuture<?> future = secureStoreScheduler.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ // Collects all <application, runId> pairs first
+ Multimap<String, RunId> liveApps = HashMultimap.create();
+ synchronized (YarnTwillRunnerService.this) {
+ for (Table.Cell<String, RunId, YarnTwillController> cell : controllers.cellSet()) {
+ liveApps.put(cell.getRowKey(), cell.getColumnKey());
+ }
+ }
+
+ // Collect all secure stores that needs to be updated.
+ Table<String, RunId, SecureStore> secureStores = HashBasedTable.create();
+ for (Map.Entry<String, RunId> entry : liveApps.entries()) {
+ try {
+ secureStores.put(entry.getKey(), entry.getValue(), updater.update(entry.getKey(), entry.getValue()));
+ } catch (Throwable t) {
+ LOG.warn("Exception thrown by SecureStoreUpdater {}", updater, t);
+ }
+ }
+
+ // Update secure stores.
+ updateSecureStores(secureStores);
+ }
+ }, initialDelay, delay, unit);
+
+ return new Cancellable() {
+ @Override
+ public void cancel() {
+ future.cancel(false);
+ }
+ };
+ }
+
+ @Override
+ public TwillPreparer prepare(TwillRunnable runnable) {
+ return prepare(runnable, ResourceSpecification.BASIC);
+ }
+
+ @Override
+ public TwillPreparer prepare(TwillRunnable runnable, ResourceSpecification resourceSpecification) {
+ return prepare(new SingleRunnableApplication(runnable, resourceSpecification));
+ }
+
+ @Override
+ public TwillPreparer prepare(TwillApplication application) {
+ Preconditions.checkState(isRunning(), "Service not start. Please call start() first.");
+ final TwillSpecification twillSpec = application.configure();
+ final String appName = twillSpec.getName();
+
+ return new YarnTwillPreparer(yarnConfig, twillSpec, yarnAppClient, zkClientService, locationFactory,
+ Suppliers.ofInstance(jvmOptions),
+ new YarnTwillControllerFactory() {
+ @Override
+ public YarnTwillController create(RunId runId, Iterable<LogHandler> logHandlers,
+ Callable<ProcessController<YarnApplicationReport>> startUp) {
+ ZKClient zkClient = ZKClients.namespace(zkClientService, "/" + appName);
+ YarnTwillController controller = listenController(new YarnTwillController(runId, zkClient,
+ logHandlers, startUp));
+ synchronized (YarnTwillRunnerService.this) {
+ Preconditions.checkArgument(!controllers.contains(appName, runId),
+ "Application %s with runId %s is already running.", appName, runId);
+ controllers.put(appName, runId, controller);
+ }
+ return controller;
+ }
+ });
+ }
+
+ @Override
+ public synchronized TwillController lookup(String applicationName, final RunId runId) {
+ return controllers.get(applicationName, runId);
+ }
+
+ @Override
+ public Iterable<TwillController> lookup(final String applicationName) {
+ return new Iterable<TwillController>() {
+ @Override
+ public Iterator<TwillController> iterator() {
+ synchronized (YarnTwillRunnerService.this) {
+ return Iterators.transform(ImmutableList.copyOf(controllers.row(applicationName).values()).iterator(),
+ CAST_CONTROLLER);
+ }
+ }
+ };
+ }
+
+ @Override
+ public Iterable<LiveInfo> lookupLive() {
+ return liveInfos;
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ yarnAppClient.startAndWait();
+ zkClientService.startAndWait();
+
+ // Create the root node, so that the namespace root would get created if it is missing
+ // If the exception is caused by node exists, then it's ok. Otherwise propagate the exception.
+ ZKOperations.ignoreError(zkClientService.create("/", null, CreateMode.PERSISTENT),
+ KeeperException.NodeExistsException.class, null).get();
+
+ watchCancellable = watchLiveApps();
+ liveInfos = createLiveInfos();
+
+ // Schedule an updater for updating HDFS delegation tokens
+ if (UserGroupInformation.isSecurityEnabled()) {
+ long delay = yarnConfig.getLong(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
+ DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
+ scheduleSecureStoreUpdate(new LocationSecureStoreUpdater(yarnConfig, locationFactory),
+ delay, delay, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ // Shutdown shouldn't stop any controllers, as stopping this client service should let the remote containers
+ // running. However, this assumes that this TwillRunnerService is a long running service and you only stop it
+ // when the JVM process is about to exit. Hence it is important that threads created in the controllers are
+ // daemon threads.
+ synchronized (this) {
+ if (secureStoreScheduler != null) {
+ secureStoreScheduler.shutdownNow();
+ }
+ }
+ watchCancellable.cancel();
+ zkClientService.stopAndWait();
+ yarnAppClient.stopAndWait();
+ }
+
+ private Cancellable watchLiveApps() {
+ final Map<String, Cancellable> watched = Maps.newConcurrentMap();
+
+ final AtomicBoolean cancelled = new AtomicBoolean(false);
+ // Watch child changes in the root, which gives all application names.
+ final Cancellable cancellable = ZKOperations.watchChildren(zkClientService, "/",
+ new ZKOperations.ChildrenCallback() {
+ @Override
+ public void updated(NodeChildren nodeChildren) {
+ if (cancelled.get()) {
+ return;
+ }
+
+ Set<String> apps = ImmutableSet.copyOf(nodeChildren.getChildren());
+
+ // For each for the application name, watch for ephemeral nodes under /instances.
+ for (final String appName : apps) {
+ if (watched.containsKey(appName)) {
+ continue;
+ }
+
+ final String instancePath = String.format("/%s/instances", appName);
+ watched.put(appName,
+ ZKOperations.watchChildren(zkClientService, instancePath, new ZKOperations.ChildrenCallback() {
+ @Override
+ public void updated(NodeChildren nodeChildren) {
+ if (cancelled.get()) {
+ return;
+ }
+ if (nodeChildren.getChildren().isEmpty()) { // No more child, means no live instances
+ Cancellable removed = watched.remove(appName);
+ if (removed != null) {
+ removed.cancel();
+ }
+ return;
+ }
+ synchronized (YarnTwillRunnerService.this) {
+ // For each of the children, which the node name is the runId,
+ // fetch the application Id and construct TwillController.
+ for (final RunId runId : Iterables.transform(nodeChildren.getChildren(), STRING_TO_RUN_ID)) {
+ if (controllers.contains(appName, runId)) {
+ continue;
+ }
+ updateController(appName, runId, cancelled);
+ }
+ }
+ }
+ }));
+ }
+
+ // Remove app watches for apps that are gone. Removal of controller from controllers table is done
+ // in the state listener attached to the twill controller.
+ for (String removeApp : Sets.difference(watched.keySet(), apps)) {
+ watched.remove(removeApp).cancel();
+ }
+ }
+ });
+ return new Cancellable() {
+ @Override
+ public void cancel() {
+ cancelled.set(true);
+ cancellable.cancel();
+ for (Cancellable c : watched.values()) {
+ c.cancel();
+ }
+ }
+ };
+ }
+
+ private YarnTwillController listenController(final YarnTwillController controller) {
+ controller.addListener(new ServiceListenerAdapter() {
+ @Override
+ public void terminated(State from) {
+ removeController();
+ }
+
+ @Override
+ public void failed(State from, Throwable failure) {
+ removeController();
+ }
+
+ private void removeController() {
+ synchronized (YarnTwillRunnerService.this) {
+ Iterables.removeIf(controllers.values(),
+ new Predicate<TwillController>() {
+ @Override
+ public boolean apply(TwillController input) {
+ return input == controller;
+ }
+ });
+ }
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ return controller;
+ }
+
+ private ZKClientService getZKClientService(String zkConnect) {
+ return ZKClientServices.delegate(
+ ZKClients.reWatchOnExpire(
+ ZKClients.retryOnFailure(ZKClientService.Builder.of(zkConnect)
+ .setSessionTimeout(ZK_TIMEOUT)
+ .build(), RetryStrategies.exponentialDelay(100, 2000, TimeUnit.MILLISECONDS))));
+ }
+
+ private Iterable<LiveInfo> createLiveInfos() {
+ return new Iterable<LiveInfo>() {
+
+ @Override
+ public Iterator<LiveInfo> iterator() {
+ Map<String, Map<RunId, YarnTwillController>> controllerMap = ImmutableTable.copyOf(controllers).rowMap();
+ return Iterators.transform(controllerMap.entrySet().iterator(),
+ new Function<Map.Entry<String, Map<RunId, YarnTwillController>>, LiveInfo>() {
+ @Override
+ public LiveInfo apply(final Map.Entry<String, Map<RunId, YarnTwillController>> entry) {
+ return new LiveInfo() {
+ @Override
+ public String getApplicationName() {
+ return entry.getKey();
+ }
+
+ @Override
+ public Iterable<TwillController> getControllers() {
+ return Iterables.transform(entry.getValue().values(), CAST_CONTROLLER);
+ }
+ };
+ }
+ });
+ }
+ };
+ }
+
+ private void updateController(final String appName, final RunId runId, final AtomicBoolean cancelled) {
+ String instancePath = String.format("/%s/instances/%s", appName, runId.getId());
+
+ // Fetch the content node.
+ Futures.addCallback(zkClientService.getData(instancePath), new FutureCallback<NodeData>() {
+ @Override
+ public void onSuccess(NodeData result) {
+ if (cancelled.get()) {
+ return;
+ }
+ ApplicationId appId = getApplicationId(result);
+ if (appId == null) {
+ return;
+ }
+
+ synchronized (YarnTwillRunnerService.this) {
+ if (!controllers.contains(appName, runId)) {
+ ZKClient zkClient = ZKClients.namespace(zkClientService, "/" + appName);
+ YarnTwillController controller = listenController(
+ new YarnTwillController(runId, zkClient,
+ Callables.returning(yarnAppClient.createProcessController(appId))));
+ controllers.put(appName, runId, controller);
+ controller.start();
+ }
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.warn("Failed in fetching application instance node.", t);
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ }
+
+
+ /**
+ * Decodes application ID stored inside the node data.
+ * @param nodeData The node data to decode from. If it is {@code null}, this method would return {@code null}.
+ * @return The ApplicationId or {@code null} if failed to decode.
+ */
+ private ApplicationId getApplicationId(NodeData nodeData) {
+ byte[] data = nodeData == null ? null : nodeData.getData();
+ if (data == null) {
+ return null;
+ }
+
+ Gson gson = new Gson();
+ JsonElement json = gson.fromJson(new String(data, Charsets.UTF_8), JsonElement.class);
+ if (!json.isJsonObject()) {
+ LOG.warn("Unable to decode live data node.");
+ return null;
+ }
+
+ JsonObject jsonObj = json.getAsJsonObject();
+ json = jsonObj.get("data");
+ if (!json.isJsonObject()) {
+ LOG.warn("Property data not found in live data node.");
+ return null;
+ }
+
+ try {
+ ApplicationMasterLiveNodeData amLiveNode = gson.fromJson(json, ApplicationMasterLiveNodeData.class);
+ return YarnUtils.createApplicationId(amLiveNode.getAppIdClusterTime(), amLiveNode.getAppId());
+ } catch (Exception e) {
+ LOG.warn("Failed to decode application live node data.", e);
+ return null;
+ }
+ }
+
+ private void updateSecureStores(Table<String, RunId, SecureStore> secureStores) {
+ for (Table.Cell<String, RunId, SecureStore> cell : secureStores.cellSet()) {
+ Object store = cell.getValue().getStore();
+ if (!(store instanceof Credentials)) {
+ LOG.warn("Only Hadoop Credentials is supported. Ignore update for {}.", cell);
+ continue;
+ }
+
+ Credentials credentials = (Credentials) store;
+ if (credentials.getAllTokens().isEmpty()) {
+ // Nothing to update.
+ continue;
+ }
+
+ try {
+ updateCredentials(cell.getRowKey(), cell.getColumnKey(), credentials);
+ synchronized (YarnTwillRunnerService.this) {
+ // Notify the application for secure store updates if it is still running.
+ YarnTwillController controller = controllers.get(cell.getRowKey(), cell.getColumnKey());
+ if (controller != null) {
+ controller.secureStoreUpdated();
+ }
+ }
+ } catch (Throwable t) {
+ LOG.warn("Failed to update secure store for {}.", cell, t);
+ }
+ }
+ }
+
+ private void updateCredentials(String application, RunId runId, Credentials updates) throws IOException {
+ Location credentialsLocation = locationFactory.create(String.format("/%s/%s/%s", application, runId.getId(),
+ Constants.Files.CREDENTIALS));
+ // Try to read the old credentials.
+ Credentials credentials = new Credentials();
+ if (credentialsLocation.exists()) {
+ DataInputStream is = new DataInputStream(new BufferedInputStream(credentialsLocation.getInputStream()));
+ try {
+ credentials.readTokenStorageStream(is);
+ } finally {
+ is.close();
+ }
+ }
+
+ // Overwrite with the updates.
+ credentials.addAll(updates);
+
+ // Overwrite the credentials.
+ Location tmpLocation = credentialsLocation.getTempFile(Constants.Files.CREDENTIALS);
+
+ // Save the credentials store with user-only permission.
+ DataOutputStream os = new DataOutputStream(new BufferedOutputStream(tmpLocation.getOutputStream("600")));
+ try {
+ credentials.writeTokenStorageToStream(os);
+ } finally {
+ os.close();
+ }
+
+ // Rename the tmp file into the credentials location
+ tmpLocation.renameTo(credentialsLocation);
+
+ LOG.debug("Secure store for {} {} saved to {}.", application, runId, credentialsLocation.toURI());
+ }
+
+ private static FileSystem getFileSystem(YarnConfiguration configuration) {
+ try {
+ return FileSystem.get(configuration);
+ } catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/yarn/package-info.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/yarn/package-info.java b/yarn/src/main/java/org/apache/twill/yarn/package-info.java
new file mode 100644
index 0000000..b3cbc5e
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/yarn/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Classes in this package implement the Twill API for Apache Hadoop YARN.
+ */
+package org.apache.twill.yarn;
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/resources/logback-template.xml
----------------------------------------------------------------------
diff --git a/yarn/src/main/resources/logback-template.xml b/yarn/src/main/resources/logback-template.xml
new file mode 100644
index 0000000..38cf6c8
--- /dev/null
+++ b/yarn/src/main/resources/logback-template.xml
@@ -0,0 +1,11 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!-- Default logback configuration for twill library -->
+<configuration>
+
+ <logger name="org.apache.hadoop" level="WARN" />
+ <logger name="org.apache.zookeeper" level="WARN" />
+
+ <root level="INFO" />
+
+</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/test/java/org/apache/twill/yarn/BuggyServer.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/BuggyServer.java b/yarn/src/test/java/org/apache/twill/yarn/BuggyServer.java
new file mode 100644
index 0000000..bb1a583
--- /dev/null
+++ b/yarn/src/test/java/org/apache/twill/yarn/BuggyServer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+/**
+ * Server for testing that will die if you give it a 0.
+ */
+public final class BuggyServer extends SocketServer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BuggyServer.class);
+
+ @Override
+ public void handleRequest(BufferedReader reader, PrintWriter writer) throws IOException {
+ String line = reader.readLine();
+ LOG.info("Received: " + line + " going to divide by it");
+ Integer toDivide = Integer.valueOf(line);
+ writer.println(Integer.toString(100 / toDivide));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/test/java/org/apache/twill/yarn/DistributeShellTestRun.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/DistributeShellTestRun.java b/yarn/src/test/java/org/apache/twill/yarn/DistributeShellTestRun.java
new file mode 100644
index 0000000..1054ec9
--- /dev/null
+++ b/yarn/src/test/java/org/apache/twill/yarn/DistributeShellTestRun.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.common.ServiceListenerAdapter;
+import org.apache.twill.common.Threads;
+import com.google.common.util.concurrent.Service;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.PrintWriter;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This test is executed by {@link YarnTestSuite}.
+ */
+public class DistributeShellTestRun {
+
+ @Ignore
+ @Test
+ public void testDistributedShell() throws InterruptedException {
+ TwillRunner twillRunner = YarnTestSuite.getTwillRunner();
+
+ TwillController controller = twillRunner.prepare(new DistributedShell("pwd", "ls -al"))
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out)))
+ .start();
+
+ final CountDownLatch stopLatch = new CountDownLatch(1);
+ controller.addListener(new ServiceListenerAdapter() {
+
+ @Override
+ public void terminated(Service.State from) {
+ stopLatch.countDown();
+ }
+
+ @Override
+ public void failed(Service.State from, Throwable failure) {
+ stopLatch.countDown();
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+
+ Assert.assertTrue(stopLatch.await(10, TimeUnit.SECONDS));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/test/java/org/apache/twill/yarn/DistributedShell.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/DistributedShell.java b/yarn/src/test/java/org/apache/twill/yarn/DistributedShell.java
new file mode 100644
index 0000000..c89371c
--- /dev/null
+++ b/yarn/src/test/java/org/apache/twill/yarn/DistributedShell.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.AbstractTwillRunnable;
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+/**
+ *
+ */
+public final class DistributedShell extends AbstractTwillRunnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DistributedShell.class);
+
+ public DistributedShell(String...commands) {
+ super(ImmutableMap.of("cmds", Joiner.on(';').join(commands)));
+ }
+
+ @Override
+ public void run() {
+ for (String cmd : Splitter.on(';').split(getArgument("cmds"))) {
+ try {
+ Process process = new ProcessBuilder(ImmutableList.copyOf(Splitter.on(' ').split(cmd)))
+ .redirectErrorStream(true).start();
+ BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream(), Charsets.US_ASCII));
+ try {
+ String line = reader.readLine();
+ while (line != null) {
+ LOG.info(line);
+ line = reader.readLine();
+ }
+ } finally {
+ reader.close();
+ }
+ } catch (IOException e) {
+ LOG.error("Fail to execute command " + cmd, e);
+ }
+ }
+ }
+
+ @Override
+ public void stop() {
+ // No-op
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/test/java/org/apache/twill/yarn/EchoServer.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/EchoServer.java b/yarn/src/test/java/org/apache/twill/yarn/EchoServer.java
new file mode 100644
index 0000000..6b77e66
--- /dev/null
+++ b/yarn/src/test/java/org/apache/twill/yarn/EchoServer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.Command;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+/**
+ * Test server that echoes back what it receives.
+ */
+public final class EchoServer extends SocketServer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(EchoServer.class);
+
+ @Override
+ public void handleRequest(BufferedReader reader, PrintWriter writer) throws IOException {
+ String line = reader.readLine();
+ LOG.info("Received: " + line);
+ if (line != null) {
+ writer.println(line);
+ }
+ }
+
+ @Override
+ public void handleCommand(Command command) throws Exception {
+ LOG.info("Command received: " + command + " " + getContext().getInstanceCount());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java b/yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
new file mode 100644
index 0000000..d868eef
--- /dev/null
+++ b/yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.TwillRunnerService;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.common.ServiceListenerAdapter;
+import org.apache.twill.common.Threads;
+import org.apache.twill.discovery.Discoverable;
+import com.google.common.base.Charsets;
+import com.google.common.io.LineReader;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.net.URISyntaxException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Using echo server to test various behavior of YarnTwillService.
+ * This test is executed by {@link YarnTestSuite}.
+ */
+public class EchoServerTestRun {
+
+ private static final Logger LOG = LoggerFactory.getLogger(EchoServerTestRun.class);
+
+ @Test
+ public void testEchoServer() throws InterruptedException, ExecutionException, IOException,
+ URISyntaxException, TimeoutException {
+ TwillRunner runner = YarnTestSuite.getTwillRunner();
+
+ TwillController controller = runner.prepare(new EchoServer(),
+ ResourceSpecification.Builder.with()
+ .setVirtualCores(1)
+ .setMemory(1, ResourceSpecification.SizeUnit.GIGA)
+ .setInstances(2)
+ .build())
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ .withApplicationArguments("echo")
+ .withArguments("EchoServer", "echo2")
+ .start();
+
+ final CountDownLatch running = new CountDownLatch(1);
+ controller.addListener(new ServiceListenerAdapter() {
+ @Override
+ public void running() {
+ running.countDown();
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+
+ Assert.assertTrue(running.await(30, TimeUnit.SECONDS));
+
+ Iterable<Discoverable> echoServices = controller.discoverService("echo");
+ Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 2, 60));
+
+ for (Discoverable discoverable : echoServices) {
+ String msg = "Hello: " + discoverable.getSocketAddress();
+
+ Socket socket = new Socket(discoverable.getSocketAddress().getAddress(),
+ discoverable.getSocketAddress().getPort());
+ try {
+ PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true);
+ LineReader reader = new LineReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8));
+
+ writer.println(msg);
+ Assert.assertEquals(msg, reader.readLine());
+ } finally {
+ socket.close();
+ }
+ }
+
+ // Increase number of instances
+ controller.changeInstances("EchoServer", 3);
+ Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 3, 60));
+
+ echoServices = controller.discoverService("echo2");
+
+ // Decrease number of instances
+ controller.changeInstances("EchoServer", 1);
+ Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 1, 60));
+
+ // Increase number of instances again
+ controller.changeInstances("EchoServer", 2);
+ Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 2, 60));
+
+ // Make sure still only one app is running
+ Iterable<TwillRunner.LiveInfo> apps = runner.lookupLive();
+ Assert.assertTrue(YarnTestSuite.waitForSize(apps, 1, 60));
+
+ // Creates a new runner service to check it can regain control over running app.
+ TwillRunnerService runnerService = YarnTestSuite.createTwillRunnerService();
+ runnerService.startAndWait();
+
+ try {
+ Iterable <TwillController> controllers = runnerService.lookup("EchoServer");
+ Assert.assertTrue(YarnTestSuite.waitForSize(controllers, 1, 60));
+
+ for (TwillController c : controllers) {
+ LOG.info("Stopping application: " + c.getRunId());
+ c.stop().get(30, TimeUnit.SECONDS);
+ }
+
+ Assert.assertTrue(YarnTestSuite.waitForSize(apps, 0, 60));
+ } finally {
+ runnerService.stopAndWait();
+ }
+
+ // Sleep a bit before exiting.
+ TimeUnit.SECONDS.sleep(2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/test/java/org/apache/twill/yarn/EnvironmentEchoServer.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/EnvironmentEchoServer.java b/yarn/src/test/java/org/apache/twill/yarn/EnvironmentEchoServer.java
new file mode 100644
index 0000000..4be2472
--- /dev/null
+++ b/yarn/src/test/java/org/apache/twill/yarn/EnvironmentEchoServer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+/**
+ * Test server that returns back the value of the env key sent in. Used to check env for
+ * runnables is correctly set.
+ */
+public class EnvironmentEchoServer extends SocketServer {
+
+ @Override
+ public void handleRequest(BufferedReader reader, PrintWriter writer) throws IOException {
+ String envKey = reader.readLine();
+ writer.println(System.getenv(envKey));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/test/java/org/apache/twill/yarn/FailureRestartTestRun.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/FailureRestartTestRun.java b/yarn/src/test/java/org/apache/twill/yarn/FailureRestartTestRun.java
new file mode 100644
index 0000000..b3d3933
--- /dev/null
+++ b/yarn/src/test/java/org/apache/twill/yarn/FailureRestartTestRun.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.Command;
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.discovery.Discoverable;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Sets;
+import com.google.common.io.LineReader;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class FailureRestartTestRun {
+
+ @Test
+ public void testFailureRestart() throws Exception {
+ TwillRunner runner = YarnTestSuite.getTwillRunner();
+
+ ResourceSpecification resource = ResourceSpecification.Builder.with()
+ .setVirtualCores(1)
+ .setMemory(512, ResourceSpecification.SizeUnit.MEGA)
+ .setInstances(2)
+ .build();
+ TwillController controller = runner.prepare(new FailureRunnable(), resource)
+ .withApplicationArguments("failure")
+ .withArguments(FailureRunnable.class.getSimpleName(), "failure2")
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ .start();
+
+ Iterable<Discoverable> discoverables = controller.discoverService("failure");
+ Assert.assertTrue(YarnTestSuite.waitForSize(discoverables, 2, 60));
+
+ // Make sure we see the right instance IDs
+ Assert.assertEquals(Sets.newHashSet(0, 1), getInstances(discoverables));
+
+ // Kill server with instanceId = 0
+ controller.sendCommand(FailureRunnable.class.getSimpleName(), Command.Builder.of("kill0").build());
+
+ // Do a shot sleep, make sure the runnable is killed.
+ TimeUnit.SECONDS.sleep(5);
+
+ Assert.assertTrue(YarnTestSuite.waitForSize(discoverables, 2, 60));
+ // Make sure we see the right instance IDs
+ Assert.assertEquals(Sets.newHashSet(0, 1), getInstances(discoverables));
+
+ controller.stopAndWait();
+ }
+
+ private Set<Integer> getInstances(Iterable<Discoverable> discoverables) throws IOException {
+ Set<Integer> instances = Sets.newHashSet();
+ for (Discoverable discoverable : discoverables) {
+ InetSocketAddress socketAddress = discoverable.getSocketAddress();
+ Socket socket = new Socket(socketAddress.getAddress(), socketAddress.getPort());
+ try {
+ PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true);
+ LineReader reader = new LineReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8));
+
+ String msg = "Failure";
+ writer.println(msg);
+
+ String line = reader.readLine();
+ Assert.assertTrue(line.endsWith(msg));
+ instances.add(Integer.parseInt(line.substring(0, line.length() - msg.length())));
+ } finally {
+ socket.close();
+ }
+ }
+ return instances;
+ }
+
+
+ public static final class FailureRunnable extends SocketServer {
+
+ private volatile boolean killed;
+
+ @Override
+ public void run() {
+ killed = false;
+ super.run();
+ if (killed) {
+ throw new RuntimeException("Exception");
+ }
+ }
+
+ @Override
+ public void handleCommand(Command command) throws Exception {
+ if (command.getCommand().equals("kill" + getContext().getInstanceId())) {
+ killed = true;
+ running = false;
+ serverSocket.close();
+ }
+ }
+
+ @Override
+ public void handleRequest(BufferedReader reader, PrintWriter writer) throws IOException {
+ String line = reader.readLine();
+ writer.println(getContext().getInstanceId() + line);
+ writer.flush();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java b/yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java
new file mode 100644
index 0000000..de2c74c
--- /dev/null
+++ b/yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.discovery.Discoverable;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import com.google.common.io.LineReader;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.concurrent.TimeUnit;
+import java.util.jar.JarEntry;
+import java.util.jar.JarOutputStream;
+
+/**
+ * Test for local file transfer.
+ */
+public class LocalFileTestRun {
+
+ @ClassRule
+ public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @Test
+ public void testLocalFile() throws Exception {
+ String header = Files.readFirstLine(new File(getClass().getClassLoader().getResource("header.txt").toURI()),
+ Charsets.UTF_8);
+
+ TwillRunner runner = YarnTestSuite.getTwillRunner();
+ if (runner instanceof YarnTwillRunnerService) {
+ ((YarnTwillRunnerService) runner).setJVMOptions("-verbose:gc -Xloggc:gc.log -XX:+PrintGCDetails");
+ }
+
+ TwillController controller = runner.prepare(new LocalFileApplication())
+ .withApplicationArguments("local")
+ .withArguments("LocalFileSocketServer", "local2")
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ .start();
+
+ if (runner instanceof YarnTwillRunnerService) {
+ ((YarnTwillRunnerService) runner).setJVMOptions("");
+ }
+
+ Iterable<Discoverable> discoverables = controller.discoverService("local");
+ Assert.assertTrue(YarnTestSuite.waitForSize(discoverables, 1, 60));
+
+ InetSocketAddress socketAddress = discoverables.iterator().next().getSocketAddress();
+ Socket socket = new Socket(socketAddress.getAddress(), socketAddress.getPort());
+ try {
+ PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true);
+ LineReader reader = new LineReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8));
+
+ String msg = "Local file test";
+ writer.println(msg);
+ Assert.assertEquals(header, reader.readLine());
+ Assert.assertEquals(msg, reader.readLine());
+ } finally {
+ socket.close();
+ }
+
+ controller.stopAndWait();
+
+ Assert.assertTrue(YarnTestSuite.waitForSize(discoverables, 0, 60));
+
+ TimeUnit.SECONDS.sleep(2);
+ }
+
+ public static final class LocalFileApplication implements TwillApplication {
+
+ private final File headerFile;
+
+ public LocalFileApplication() throws Exception {
+ // Create a jar file that contains the header.txt file inside.
+ headerFile = tmpFolder.newFile("header.jar");
+ JarOutputStream os = new JarOutputStream(new FileOutputStream(headerFile));
+ try {
+ os.putNextEntry(new JarEntry("header.txt"));
+ ByteStreams.copy(getClass().getClassLoader().getResourceAsStream("header.txt"), os);
+ } finally {
+ os.close();
+ }
+ }
+
+ @Override
+ public TwillSpecification configure() {
+ return TwillSpecification.Builder.with()
+ .setName("LocalFileApp")
+ .withRunnable()
+ .add(new LocalFileSocketServer())
+ .withLocalFiles()
+ .add("header", headerFile, true).apply()
+ .anyOrder()
+ .build();
+ }
+ }
+
+ public static final class LocalFileSocketServer extends SocketServer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LocalFileSocketServer.class);
+
+ @Override
+ public void handleRequest(BufferedReader reader, PrintWriter writer) throws IOException {
+ // Verify there is a gc.log file locally
+ Preconditions.checkState(new File("gc.log").exists());
+
+ LOG.info("handleRequest");
+ String header = Files.toString(new File("header/header.txt"), Charsets.UTF_8);
+ writer.write(header);
+ writer.println(reader.readLine());
+ LOG.info("Flushed response");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java b/yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java
new file mode 100644
index 0000000..0598ef1
--- /dev/null
+++ b/yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.EventHandler;
+import org.apache.twill.api.EventHandlerContext;
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.common.Services;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+import java.io.PrintWriter;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ *
+ */
+public class ProvisionTimeoutTestRun {
+
+ @Test
+ public void testProvisionTimeout() throws InterruptedException, ExecutionException, TimeoutException {
+ TwillRunner runner = YarnTestSuite.getTwillRunner();
+
+ TwillController controller = runner.prepare(new TimeoutApplication())
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ .start();
+
+ // The provision should failed in 30 seconds after AM started, which AM could took a while to start.
+ // Hence we give 90 seconds max time here.
+ try {
+ Services.getCompletionFuture(controller).get(90, TimeUnit.SECONDS);
+ } finally {
+ // If it timeout, kill the app as cleanup.
+ controller.kill();
+ }
+ }
+
+ public static final class Handler extends EventHandler {
+
+ private boolean abort;
+
+ @Override
+ protected Map<String, String> getConfigs() {
+ return ImmutableMap.of("abort", "true");
+ }
+
+ @Override
+ public void initialize(EventHandlerContext context) {
+ this.abort = Boolean.parseBoolean(context.getSpecification().getConfigs().get("abort"));
+ }
+
+ @Override
+ public TimeoutAction launchTimeout(Iterable<TimeoutEvent> timeoutEvents) {
+ if (abort) {
+ return TimeoutAction.abort();
+ } else {
+ return TimeoutAction.recheck(10, TimeUnit.SECONDS);
+ }
+ }
+ }
+
+ public static final class TimeoutApplication implements TwillApplication {
+
+ @Override
+ public TwillSpecification configure() {
+ return TwillSpecification.Builder.with()
+ .setName("TimeoutApplication")
+ .withRunnable()
+ .add(new TimeoutRunnable(),
+ ResourceSpecification.Builder.with()
+ .setVirtualCores(1)
+ .setMemory(8, ResourceSpecification.SizeUnit.GIGA).build())
+ .noLocalFiles()
+ .anyOrder()
+ .withEventHandler(new Handler())
+ .build();
+ }
+ }
+
+ /**
+ * A runnable that do nothing, as it's not expected to get provisioned.
+ */
+ public static final class TimeoutRunnable extends AbstractTwillRunnable {
+
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ @Override
+ public void stop() {
+ latch.countDown();
+ }
+
+ @Override
+ public void run() {
+ // Simply block here
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+ }
+}