You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/11/21 22:54:28 UTC
[05/15] Initial import commit.
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
new file mode 100644
index 0000000..bbd6c10
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.container;
+
+import org.apache.twill.api.LocalFile;
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.RuntimeSpecification;
+import org.apache.twill.api.TwillRunnableSpecification;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.discovery.DiscoveryService;
+import org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.twill.internal.Arguments;
+import org.apache.twill.internal.BasicTwillContext;
+import org.apache.twill.internal.Constants;
+import org.apache.twill.internal.ContainerInfo;
+import org.apache.twill.internal.EnvContainerInfo;
+import org.apache.twill.internal.EnvKeys;
+import org.apache.twill.internal.RunIds;
+import org.apache.twill.internal.ServiceMain;
+import org.apache.twill.internal.json.ArgumentsCodec;
+import org.apache.twill.internal.json.TwillSpecificationAdapter;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClient;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.Service;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.Reader;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public final class TwillContainerMain extends ServiceMain {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TwillContainerMain.class);
+
+ /**
+ * Main method for launching a {@link TwillContainerService} which runs
+ * a {@link org.apache.twill.api.TwillRunnable}.
+ */
+ public static void main(final String[] args) throws Exception {
+ // Try to load the secure store from localized file, which AM requested RM to localize it for this container.
+ loadSecureStore();
+
+ String zkConnectStr = System.getenv(EnvKeys.TWILL_ZK_CONNECT);
+ File twillSpecFile = new File(Constants.Files.TWILL_SPEC);
+ RunId appRunId = RunIds.fromString(System.getenv(EnvKeys.TWILL_APP_RUN_ID));
+ RunId runId = RunIds.fromString(System.getenv(EnvKeys.TWILL_RUN_ID));
+ String runnableName = System.getenv(EnvKeys.TWILL_RUNNABLE_NAME);
+ int instanceId = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_ID));
+ int instanceCount = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_COUNT));
+
+ ZKClientService zkClientService = ZKClientServices.delegate(
+ ZKClients.reWatchOnExpire(
+ ZKClients.retryOnFailure(ZKClientService.Builder.of(zkConnectStr).build(),
+ RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
+
+ DiscoveryService discoveryService = new ZKDiscoveryService(zkClientService);
+
+ TwillSpecification twillSpec = loadTwillSpec(twillSpecFile);
+ renameLocalFiles(twillSpec.getRunnables().get(runnableName));
+
+ TwillRunnableSpecification runnableSpec = twillSpec.getRunnables().get(runnableName).getRunnableSpecification();
+ ContainerInfo containerInfo = new EnvContainerInfo();
+ Arguments arguments = decodeArgs();
+ BasicTwillContext context = new BasicTwillContext(
+ runId, appRunId, containerInfo.getHost(),
+ arguments.getRunnableArguments().get(runnableName).toArray(new String[0]),
+ arguments.getArguments().toArray(new String[0]),
+ runnableSpec, instanceId, discoveryService, instanceCount,
+ containerInfo.getMemoryMB(), containerInfo.getVirtualCores()
+ );
+
+ Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
+ Service service = new TwillContainerService(context, containerInfo,
+ getContainerZKClient(zkClientService, appRunId, runnableName),
+ runId, runnableSpec, getClassLoader(),
+ createAppLocation(conf));
+ new TwillContainerMain().doMain(zkClientService, service);
+ }
+
+ private static void loadSecureStore() throws IOException {
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ return;
+ }
+
+ File file = new File(Constants.Files.CREDENTIALS);
+ if (file.exists()) {
+ Credentials credentials = new Credentials();
+ DataInputStream input = new DataInputStream(new FileInputStream(file));
+ try {
+ credentials.readTokenStorageStream(input);
+ } finally {
+ input.close();
+ }
+
+ UserGroupInformation.getCurrentUser().addCredentials(credentials);
+ LOG.info("Secure store updated from {}", file);
+ }
+ }
+
+ private static void renameLocalFiles(RuntimeSpecification runtimeSpec) {
+ for (LocalFile file : runtimeSpec.getLocalFiles()) {
+ if (file.isArchive()) {
+ String path = file.getURI().toString();
+ String name = file.getName() + (path.endsWith(".tar.gz") ? ".tar.gz" : path.substring(path.lastIndexOf('.')));
+ Preconditions.checkState(new File(name).renameTo(new File(file.getName())),
+ "Fail to rename file from %s to %s.",
+ name, file.getName());
+ }
+ }
+ }
+
+ private static ZKClient getContainerZKClient(ZKClient zkClient, RunId appRunId, String runnableName) {
+ return ZKClients.namespace(zkClient, String.format("/%s/runnables/%s", appRunId, runnableName));
+ }
+
+ /**
+ * Returns the ClassLoader for the runnable.
+ */
+ private static ClassLoader getClassLoader() {
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ if (classLoader == null) {
+ return ClassLoader.getSystemClassLoader();
+ }
+ return classLoader;
+ }
+
+ private static TwillSpecification loadTwillSpec(File specFile) throws IOException {
+ Reader reader = Files.newReader(specFile, Charsets.UTF_8);
+ try {
+ return TwillSpecificationAdapter.create().fromJson(reader);
+ } finally {
+ reader.close();
+ }
+ }
+
+ private static Arguments decodeArgs() throws IOException {
+ return ArgumentsCodec.decode(Files.newReaderSupplier(new File(Constants.Files.ARGUMENTS), Charsets.UTF_8));
+ }
+
+ @Override
+ protected String getHostname() {
+ return System.getenv(EnvKeys.YARN_CONTAINER_HOST);
+ }
+
+ @Override
+ protected String getKafkaZKConnect() {
+ return System.getenv(EnvKeys.TWILL_LOG_KAFKA_ZK);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java b/yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
new file mode 100644
index 0000000..f5bc1f2
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.container;
+
+import org.apache.twill.api.Command;
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.TwillRunnable;
+import org.apache.twill.api.TwillRunnableSpecification;
+import org.apache.twill.common.Threads;
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.internal.AbstractTwillService;
+import org.apache.twill.internal.AbstractTwillService;
+import org.apache.twill.internal.BasicTwillContext;
+import org.apache.twill.internal.ContainerInfo;
+import org.apache.twill.internal.ContainerLiveNodeData;
+import org.apache.twill.internal.ZKServiceDecorator;
+import org.apache.twill.internal.logging.Loggings;
+import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.state.MessageCallback;
+import org.apache.twill.internal.utils.Instances;
+import org.apache.twill.zookeeper.ZKClient;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * This class act as a yarn container and run a {@link org.apache.twill.api.TwillRunnable}.
+ */
+public final class TwillContainerService extends AbstractTwillService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TwillContainerService.class);
+
+ private final TwillRunnableSpecification specification;
+ private final ClassLoader classLoader;
+ private final ContainerLiveNodeData containerLiveNode;
+ private final BasicTwillContext context;
+ private final ZKServiceDecorator serviceDelegate;
+ private ExecutorService commandExecutor;
+ private TwillRunnable runnable;
+
+ public TwillContainerService(BasicTwillContext context, ContainerInfo containerInfo, ZKClient zkClient,
+ RunId runId, TwillRunnableSpecification specification, ClassLoader classLoader,
+ Location applicationLocation) {
+ super(applicationLocation);
+
+ this.specification = specification;
+ this.classLoader = classLoader;
+ this.serviceDelegate = new ZKServiceDecorator(zkClient, runId, createLiveNodeSupplier(), new ServiceDelegate());
+ this.context = context;
+ this.containerLiveNode = new ContainerLiveNodeData(containerInfo.getId(),
+ containerInfo.getHost().getCanonicalHostName());
+ }
+
+ private ListenableFuture<String> processMessage(final String messageId, final Message message) {
+ LOG.debug("Message received: {} {}.", messageId, message);
+
+ if (handleSecureStoreUpdate(message)) {
+ return Futures.immediateFuture(messageId);
+ }
+
+ final SettableFuture<String> result = SettableFuture.create();
+ Command command = message.getCommand();
+ if (message.getType() == Message.Type.SYSTEM
+ && "instances".equals(command.getCommand()) && command.getOptions().containsKey("count")) {
+ context.setInstanceCount(Integer.parseInt(command.getOptions().get("count")));
+ }
+
+ commandExecutor.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ runnable.handleCommand(message.getCommand());
+ result.set(messageId);
+ } catch (Exception e) {
+ result.setException(e);
+ }
+ }
+ });
+ return result;
+ }
+
+ private Supplier<? extends JsonElement> createLiveNodeSupplier() {
+ return new Supplier<JsonElement>() {
+ @Override
+ public JsonElement get() {
+ return new Gson().toJsonTree(containerLiveNode);
+ }
+ };
+ }
+
+ @Override
+ protected Service getServiceDelegate() {
+ return serviceDelegate;
+ }
+
+ private final class ServiceDelegate extends AbstractExecutionThreadService implements MessageCallback {
+
+ @Override
+ protected void startUp() throws Exception {
+ commandExecutor = Executors.newSingleThreadExecutor(
+ Threads.createDaemonThreadFactory("runnable-command-executor"));
+
+ Class<?> runnableClass = classLoader.loadClass(specification.getClassName());
+ Preconditions.checkArgument(TwillRunnable.class.isAssignableFrom(runnableClass),
+ "Class %s is not instance of TwillRunnable.", specification.getClassName());
+
+ runnable = Instances.newInstance((Class<TwillRunnable>) runnableClass);
+ runnable.initialize(context);
+ }
+
+ @Override
+ protected void triggerShutdown() {
+ try {
+ runnable.stop();
+ } catch (Throwable t) {
+ LOG.error("Exception when stopping runnable.", t);
+ }
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ commandExecutor.shutdownNow();
+ runnable.destroy();
+ Loggings.forceFlush();
+ }
+
+ @Override
+ protected void run() throws Exception {
+ runnable.run();
+ }
+
+ @Override
+ public ListenableFuture<String> onReceived(String messageId, Message message) {
+ if (state() == State.RUNNING) {
+ // Only process message if the service is still alive
+ return processMessage(messageId, message);
+ }
+ return Futures.immediateFuture(messageId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java b/yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
new file mode 100644
index 0000000..b810854
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.api.LocalFile;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.utils.Paths;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Abstract class to help creating different types of process launcher that process on yarn.
+ *
+ * @param <T> Type of the object that contains information about the container that the process is going to launch.
+ */
+public abstract class AbstractYarnProcessLauncher<T> implements ProcessLauncher<T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractYarnProcessLauncher.class);
+
+ private final T containerInfo;
+
+ protected AbstractYarnProcessLauncher(T containerInfo) {
+ this.containerInfo = containerInfo;
+ }
+
+ @Override
+ public T getContainerInfo() {
+ return containerInfo;
+ }
+
+ @Override
+ public <C> PrepareLaunchContext prepareLaunch(Map<String, String> environments,
+ Iterable<LocalFile> resources, C credentials) {
+ if (credentials != null) {
+ Preconditions.checkArgument(credentials instanceof Credentials, "Credentials should be of type %s",
+ Credentials.class.getName());
+ }
+ return new PrepareLaunchContextImpl(environments, resources, (Credentials) credentials);
+ }
+
+ /**
+ * Tells whether to append suffix to localize resource name for archive file type. Default is true.
+ */
+ protected boolean useArchiveSuffix() {
+ return true;
+ }
+
+ /**
+ * For children class to override to perform actual process launching.
+ */
+ protected abstract <R> ProcessController<R> doLaunch(YarnLaunchContext launchContext);
+
+ /**
+ * Implementation for the {@link PrepareLaunchContext}.
+ */
+ private final class PrepareLaunchContextImpl implements PrepareLaunchContext {
+
+ private final Credentials credentials;
+ private final YarnLaunchContext launchContext;
+ private final Map<String, YarnLocalResource> localResources;
+ private final Map<String, String> environment;
+ private final List<String> commands;
+
+ private PrepareLaunchContextImpl(Map<String, String> env, Iterable<LocalFile> localFiles, Credentials credentials) {
+ this.credentials = credentials;
+ this.launchContext = YarnUtils.createLaunchContext();
+ this.localResources = Maps.newHashMap();
+ this.environment = Maps.newHashMap(env);
+ this.commands = Lists.newLinkedList();
+
+ for (LocalFile localFile : localFiles) {
+ addLocalFile(localFile);
+ }
+ }
+
+ private void addLocalFile(LocalFile localFile) {
+ String name = localFile.getName();
+ // Always append the file extension as the resource name so that archive expansion by Yarn could work.
+ // Renaming would happen by the Container Launcher.
+ if (localFile.isArchive() && useArchiveSuffix()) {
+ String path = localFile.getURI().toString();
+ String suffix = Paths.getExtension(path);
+ if (!suffix.isEmpty()) {
+ name += '.' + suffix;
+ }
+ }
+ localResources.put(name, YarnUtils.createLocalResource(localFile));
+ }
+
+ @Override
+ public ResourcesAdder withResources() {
+ return new MoreResourcesImpl();
+ }
+
+ @Override
+ public AfterResources noResources() {
+ return new MoreResourcesImpl();
+ }
+
+ private final class MoreResourcesImpl implements MoreResources {
+
+ @Override
+ public MoreResources add(LocalFile localFile) {
+ addLocalFile(localFile);
+ return this;
+ }
+
+ @Override
+ public EnvironmentAdder withEnvironment() {
+ return finish();
+ }
+
+ @Override
+ public AfterEnvironment noEnvironment() {
+ return finish();
+ }
+
+ private MoreEnvironmentImpl finish() {
+ launchContext.setLocalResources(localResources);
+ return new MoreEnvironmentImpl();
+ }
+ }
+
+ private final class MoreEnvironmentImpl implements MoreEnvironment {
+
+ @Override
+ public CommandAdder withCommands() {
+ launchContext.setEnvironment(environment);
+ return new MoreCommandImpl();
+ }
+
+ @Override
+ public <V> MoreEnvironment add(String key, V value) {
+ environment.put(key, value.toString());
+ return this;
+ }
+ }
+
+ private final class MoreCommandImpl implements MoreCommand, StdOutSetter, StdErrSetter {
+
+ private final StringBuilder commandBuilder = new StringBuilder();
+
+ @Override
+ public StdOutSetter add(String cmd, String... args) {
+ commandBuilder.append(cmd);
+ for (String arg : args) {
+ commandBuilder.append(' ').append(arg);
+ }
+ return this;
+ }
+
+ @Override
+ public <R> ProcessController<R> launch() {
+ if (credentials != null && !credentials.getAllTokens().isEmpty()) {
+ for (Token<?> token : credentials.getAllTokens()) {
+ LOG.info("Launch with delegation token {}", token);
+ }
+ launchContext.setCredentials(credentials);
+ }
+ launchContext.setCommands(commands);
+ return doLaunch(launchContext);
+ }
+
+ @Override
+ public MoreCommand redirectError(String stderr) {
+ redirect(2, stderr);
+ return noError();
+ }
+
+ @Override
+ public MoreCommand noError() {
+ commands.add(commandBuilder.toString());
+ commandBuilder.setLength(0);
+ return this;
+ }
+
+ @Override
+ public StdErrSetter redirectOutput(String stdout) {
+ redirect(1, stdout);
+ return this;
+ }
+
+ @Override
+ public StdErrSetter noOutput() {
+ return this;
+ }
+
+ private void redirect(int type, String out) {
+ commandBuilder.append(' ')
+ .append(type).append('>')
+ .append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append('/').append(out);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java b/yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
new file mode 100644
index 0000000..6f47b6c
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ *
+ */
+public final class VersionDetectYarnAMClientFactory implements YarnAMClientFactory {
+
+ private final Configuration conf;
+
+ public VersionDetectYarnAMClientFactory(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public YarnAMClient create() {
+ try {
+ Class<YarnAMClient> clz;
+ if (YarnUtils.isHadoop20()) {
+ // Uses hadoop-2.0 class
+ String clzName = getClass().getPackage().getName() + ".Hadoop20YarnAMClient";
+ clz = (Class<YarnAMClient>) Class.forName(clzName);
+ } else {
+ // Uses hadoop-2.1 class
+ String clzName = getClass().getPackage().getName() + ".Hadoop21YarnAMClient";
+ clz = (Class<YarnAMClient>) Class.forName(clzName);
+ }
+
+ return clz.getConstructor(Configuration.class).newInstance(conf);
+
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java b/yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
new file mode 100644
index 0000000..f9db959
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ *
+ */
+public final class VersionDetectYarnAppClientFactory implements YarnAppClientFactory {
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public YarnAppClient create(Configuration configuration) {
+ try {
+ Class<YarnAppClient> clz;
+
+ if (YarnUtils.isHadoop20()) {
+ // Uses hadoop-2.0 class.
+ String clzName = getClass().getPackage().getName() + ".Hadoop20YarnAppClient";
+ clz = (Class<YarnAppClient>) Class.forName(clzName);
+ } else {
+ // Uses hadoop-2.1 class
+ String clzName = getClass().getPackage().getName() + ".Hadoop21YarnAppClient";
+ clz = (Class<YarnAppClient>) Class.forName(clzName);
+ }
+
+ return clz.getConstructor(Configuration.class).newInstance(configuration);
+
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
new file mode 100644
index 0000000..83ba6a8
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.internal.ProcessLauncher;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Service;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This interface provides abstraction for AM to interacts with YARN to abstract out YARN version specific
+ * code, making multi-version compatibility easier.
+ */
+public interface YarnAMClient extends Service {
+
+ /**
+ * Builder for creating a container request.
+ */
+ abstract class ContainerRequestBuilder {
+
+ protected final Resource capability;
+ protected final int count;
+ protected final Set<String> hosts = Sets.newHashSet();
+ protected final Set<String> racks = Sets.newHashSet();
+ protected final Priority priority = Records.newRecord(Priority.class);
+
+ protected ContainerRequestBuilder(Resource capability, int count) {
+ this.capability = capability;
+ this.count = count;
+ }
+
+ public ContainerRequestBuilder addHosts(String firstHost, String...moreHosts) {
+ return add(hosts, firstHost, moreHosts);
+ }
+
+ public ContainerRequestBuilder addRacks(String firstRack, String...moreRacks) {
+ return add(racks, firstRack, moreRacks);
+ }
+
+ public ContainerRequestBuilder setPriority(int prio) {
+ priority.setPriority(prio);
+ return this;
+ }
+
+ /**
+ * Adds a container request. Returns an unique ID for the request.
+ */
+ public abstract String apply();
+
+ private <T> ContainerRequestBuilder add(Collection<T> collection, T first, T... more) {
+ collection.add(first);
+ Collections.addAll(collection, more);
+ return this;
+ }
+ }
+
+ ContainerId getContainerId();
+
+ String getHost();
+
+ /**
+ * Sets the tracker address and tracker url. This method should be called before calling {@link #start()}.
+ */
+ void setTracker(InetSocketAddress trackerAddr, URL trackerUrl);
+
+ /**
+ * Callback for allocate call.
+ */
+ // TODO: Move AM heartbeat logic into this interface so AM only needs to handle callback.
+ interface AllocateHandler {
+ void acquired(List<ProcessLauncher<YarnContainerInfo>> launchers);
+
+ void completed(List<YarnContainerStatus> completed);
+ }
+
+ void allocate(float progress, AllocateHandler handler) throws Exception;
+
+ ContainerRequestBuilder addContainerRequest(Resource capability);
+
+ ContainerRequestBuilder addContainerRequest(Resource capability, int count);
+
+ /**
+ * Notify a container request is fulfilled.
+ *
+ * Note: This method is needed to workaround a seemingly bug from AMRMClient implementation in YARN that if
+ * a container is requested after a previous container was acquired (with the same capability), multiple containers
+ * will get allocated instead of one.
+ *
+ * @param id The ID returned by {@link YarnAMClient.ContainerRequestBuilder#apply()}.
+ */
+ void completeContainerRequest(String id);
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClientFactory.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClientFactory.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClientFactory.java
new file mode 100644
index 0000000..b2a1194
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClientFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+/**
+ *
+ */
+public interface YarnAMClientFactory {
+
+ YarnAMClient create();
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
new file mode 100644
index 0000000..71a9e68
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.ProcessLauncher;
+import com.google.common.util.concurrent.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * Interface for launching Yarn application from client.
+ */
+public interface YarnAppClient extends Service {
+
+ /**
+ * Creates a {@link ProcessLauncher} for launching the application represented by the given spec.
+ */
+ ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec) throws Exception;
+
+ /**
+ * Creates a {@link ProcessLauncher} for launching application with the given user and spec.
+ *
+ * @deprecated This method will get removed.
+ */
+ @Deprecated
+ ProcessLauncher<ApplicationId> createLauncher(String user, TwillSpecification twillSpec) throws Exception;
+
+ ProcessController<YarnApplicationReport> createProcessController(ApplicationId appId);
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClientFactory.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClientFactory.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClientFactory.java
new file mode 100644
index 0000000..70cecad
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClientFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ *
+ */
+public interface YarnAppClientFactory {
+
+ YarnAppClient create(Configuration configuration);
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/YarnApplicationReport.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnApplicationReport.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnApplicationReport.java
new file mode 100644
index 0000000..4dbb1d1
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnApplicationReport.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+
+/**
+ * This interface is for adapting differences in ApplicationReport in different Hadoop version.
+ */
+public interface YarnApplicationReport {
+
+ /**
+ * Get the <code>ApplicationId</code> of the application.
+ * @return <code>ApplicationId</code> of the application
+ */
+ ApplicationId getApplicationId();
+
+ /**
+ * Get the <code>ApplicationAttemptId</code> of the current
+ * attempt of the application
+ * @return <code>ApplicationAttemptId</code> of the attempt
+ */
+ ApplicationAttemptId getCurrentApplicationAttemptId();
+
+ /**
+ * Get the <em>queue</em> to which the application was submitted.
+ * @return <em>queue</em> to which the application was submitted
+ */
+ String getQueue();
+
+ /**
+ * Get the user-defined <em>name</em> of the application.
+ * @return <em>name</em> of the application
+ */
+ String getName();
+
+ /**
+ * Get the <em>host</em> on which the <code>ApplicationMaster</code>
+ * is running.
+ * @return <em>host</em> on which the <code>ApplicationMaster</code>
+ * is running
+ */
+ String getHost();
+
+ /**
+ * Get the <em>RPC port</em> of the <code>ApplicationMaster</code>.
+ * @return <em>RPC port</em> of the <code>ApplicationMaster</code>
+ */
+ int getRpcPort();
+
+
+ /**
+ * Get the <code>YarnApplicationState</code> of the application.
+ * @return <code>YarnApplicationState</code> of the application
+ */
+ YarnApplicationState getYarnApplicationState();
+
+
+ /**
+ * Get the <em>diagnositic information</em> of the application in case of
+ * errors.
+ * @return <em>diagnositic information</em> of the application in case
+ * of errors
+ */
+ String getDiagnostics();
+
+
+ /**
+ * Get the <em>tracking url</em> for the application.
+ * @return <em>tracking url</em> for the application
+ */
+ String getTrackingUrl();
+
+
+ /**
+ * Get the original not-proxied <em>tracking url</em> for the application.
+ * This is intended to only be used by the proxy itself.
+ * @return the original not-proxied <em>tracking url</em> for the application
+ */
+ String getOriginalTrackingUrl();
+
+ /**
+ * Get the <em>start time</em> of the application.
+ * @return <em>start time</em> of the application
+ */
+ long getStartTime();
+
+
+ /**
+ * Get the <em>finish time</em> of the application.
+ * @return <em>finish time</em> of the application
+ */
+ long getFinishTime();
+
+
+ /**
+ * Get the <em>final finish status</em> of the application.
+ * @return <em>final finish status</em> of the application
+ */
+ FinalApplicationStatus getFinalApplicationStatus();
+
+ /**
+ * Retrieve the structure containing the job resources for this application
+ * @return the job resources structure for this application
+ */
+ ApplicationResourceUsageReport getApplicationResourceUsageReport();
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerInfo.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerInfo.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerInfo.java
new file mode 100644
index 0000000..e806da7
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerInfo.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.internal.ContainerInfo;
+
+/**
+ *
+ */
+public interface YarnContainerInfo extends ContainerInfo {
+
+ <T> T getContainer();
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerStatus.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerStatus.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerStatus.java
new file mode 100644
index 0000000..57e712c
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerStatus.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.ContainerState;
+
+/**
+ * This interface is for adapting differences in ContainerStatus between Hadoop 2.0 and 2.1
+ */
+public interface YarnContainerStatus {
+
+ String getContainerId();
+
+ ContainerState getState();
+
+ int getExitStatus();
+
+ String getDiagnostics();
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/YarnLaunchContext.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnLaunchContext.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnLaunchContext.java
new file mode 100644
index 0000000..984a1be
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnLaunchContext.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This interface is for adapting ContainerLaunchContext in different Hadoop version
+ */
+public interface YarnLaunchContext {
+
+ <T> T getLaunchContext();
+
+ void setCredentials(Credentials credentials);
+
+ void setLocalResources(Map<String, YarnLocalResource> localResources);
+
+ void setServiceData(Map<String, ByteBuffer> serviceData);
+
+ Map<String, String> getEnvironment();
+
+ void setEnvironment(Map<String, String> environment);
+
+ List<String> getCommands();
+
+ void setCommands(List<String> commands);
+
+ void setApplicationACLs(Map<ApplicationAccessType, String> acls);
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/YarnLocalResource.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnLocalResource.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnLocalResource.java
new file mode 100644
index 0000000..9bfc224
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnLocalResource.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+
+/**
+ * A adapter interface for the LocalResource class/interface in different Hadoop version.
+ */
+public interface YarnLocalResource {
+
+ /**
+ * Returns the actual LocalResource object in Yarn.
+ */
+ <T> T getLocalResource();
+
+ /**
+ * Get the <em>location</em> of the resource to be localized.
+ * @return <em>location</em> of the resource to be localized
+ */
+ URL getResource();
+
+ /**
+ * Set <em>location</em> of the resource to be localized.
+ * @param resource <em>location</em> of the resource to be localized
+ */
+ void setResource(URL resource);
+
+ /**
+ * Get the <em>size</em> of the resource to be localized.
+ * @return <em>size</em> of the resource to be localized
+ */
+ long getSize();
+
+ /**
+ * Set the <em>size</em> of the resource to be localized.
+ * @param size <em>size</em> of the resource to be localized
+ */
+ void setSize(long size);
+
+ /**
+ * Get the original <em>timestamp</em> of the resource to be localized, used
+ * for verification.
+ * @return <em>timestamp</em> of the resource to be localized
+ */
+ long getTimestamp();
+
+ /**
+ * Set the <em>timestamp</em> of the resource to be localized, used
+ * for verification.
+ * @param timestamp <em>timestamp</em> of the resource to be localized
+ */
+ void setTimestamp(long timestamp);
+
+ /**
+ * Get the <code>LocalResourceType</code> of the resource to be localized.
+ * @return <code>LocalResourceType</code> of the resource to be localized
+ */
+ LocalResourceType getType();
+
+ /**
+ * Set the <code>LocalResourceType</code> of the resource to be localized.
+ * @param type <code>LocalResourceType</code> of the resource to be localized
+ */
+ void setType(LocalResourceType type);
+
+ /**
+ * Get the <code>LocalResourceVisibility</code> of the resource to be
+ * localized.
+ * @return <code>LocalResourceVisibility</code> of the resource to be
+ * localized
+ */
+ LocalResourceVisibility getVisibility();
+
+ /**
+ * Set the <code>LocalResourceVisibility</code> of the resource to be
+ * localized.
+ * @param visibility <code>LocalResourceVisibility</code> of the resource to be
+ * localized
+ */
+ void setVisibility(LocalResourceVisibility visibility);
+
+ /**
+ * Get the <em>pattern</em> that should be used to extract entries from the
+ * archive (only used when type is <code>PATTERN</code>).
+ * @return <em>pattern</em> that should be used to extract entries from the
+ * archive.
+ */
+ String getPattern();
+
+ /**
+ * Set the <em>pattern</em> that should be used to extract entries from the
+ * archive (only used when type is <code>PATTERN</code>).
+ * @param pattern <em>pattern</em> that should be used to extract entries
+ * from the archive.
+ */
+ void setPattern(String pattern);
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/YarnNMClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnNMClient.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnNMClient.java
new file mode 100644
index 0000000..d863c91
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnNMClient.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.common.Cancellable;
+
+/**
+ * Abstraction for dealing with API differences in different hadoop yarn version
+ */
+public interface YarnNMClient {
+
+ /**
+ * Starts a process based on the given launch context.
+ *
+ * @param containerInfo The containerInfo that the new process will launch in.
+ * @param launchContext Contains information about the process going to start.
+ * @return A {@link Cancellable} that when {@link Cancellable#cancel()}} is invoked,
+ * it will try to shutdown the process.
+ *
+ */
+ Cancellable start(YarnContainerInfo containerInfo, YarnLaunchContext launchContext);
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
new file mode 100644
index 0000000..4f7597b
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.api.LocalFile;
+import org.apache.twill.filesystem.ForwardingLocationFactory;
+import org.apache.twill.filesystem.HDFSLocationFactory;
+import org.apache.twill.filesystem.LocationFactory;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Collection of helper methods to simplify YARN calls.
+ */
+public class YarnUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(YarnUtils.class);
+ private static final AtomicReference<Boolean> HADOOP_20 = new AtomicReference<Boolean>();
+
+ public static YarnLocalResource createLocalResource(LocalFile localFile) {
+ Preconditions.checkArgument(localFile.getLastModified() >= 0, "Last modified time should be >= 0.");
+ Preconditions.checkArgument(localFile.getSize() >= 0, "File size should be >= 0.");
+
+ YarnLocalResource resource = createAdapter(YarnLocalResource.class);
+ resource.setVisibility(LocalResourceVisibility.APPLICATION);
+ resource.setResource(ConverterUtils.getYarnUrlFromURI(localFile.getURI()));
+ resource.setTimestamp(localFile.getLastModified());
+ resource.setSize(localFile.getSize());
+ return setLocalResourceType(resource, localFile);
+ }
+
+ public static YarnLaunchContext createLaunchContext() {
+ return createAdapter(YarnLaunchContext.class);
+ }
+
+ // temporary workaround since older versions of hadoop don't have the getVirtualCores method.
+ public static int getVirtualCores(Resource resource) {
+ try {
+ Method getVirtualCores = Resource.class.getMethod("getVirtualCores");
+ return (Integer) getVirtualCores.invoke(resource);
+ } catch (Exception e) {
+ return 0;
+ }
+ }
+
+ /**
+ * Temporary workaround since older versions of hadoop don't have the setCores method.
+ *
+ * @param resource
+ * @param cores
+ * @return true if virtual cores was set, false if not.
+ */
+ public static boolean setVirtualCores(Resource resource, int cores) {
+ try {
+ Method setVirtualCores = Resource.class.getMethod("setVirtualCores", int.class);
+ setVirtualCores.invoke(resource, cores);
+ } catch (Exception e) {
+ // It's ok to ignore this exception, as it's using older version of API.
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Creates {@link ApplicationId} from the given cluster timestamp and id.
+ */
+ public static ApplicationId createApplicationId(long timestamp, int id) {
+ try {
+ try {
+ // For Hadoop-2.1
+ Method method = ApplicationId.class.getMethod("newInstance", long.class, int.class);
+ return (ApplicationId) method.invoke(null, timestamp, id);
+ } catch (NoSuchMethodException e) {
+ // Try with Hadoop-2.0 way
+ ApplicationId appId = Records.newRecord(ApplicationId.class);
+
+ Method setClusterTimestamp = ApplicationId.class.getMethod("setClusterTimestamp", long.class);
+ Method setId = ApplicationId.class.getMethod("setId", int.class);
+
+ setClusterTimestamp.invoke(appId, timestamp);
+ setId.invoke(appId, id);
+
+ return appId;
+ }
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ /**
+ * Helper method to get delegation tokens for the given LocationFactory.
+ * @param config The hadoop configuration.
+ * @param locationFactory The LocationFactory for generating tokens.
+ * @param credentials Credentials for storing tokens acquired.
+ * @return List of delegation Tokens acquired.
+ */
+ public static List<Token<?>> addDelegationTokens(Configuration config,
+ LocationFactory locationFactory,
+ Credentials credentials) throws IOException {
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ LOG.debug("Security is not enabled");
+ return ImmutableList.of();
+ }
+
+ FileSystem fileSystem = getFileSystem(locationFactory);
+
+ if (fileSystem == null) {
+ LOG.debug("LocationFactory is not HDFS");
+ return ImmutableList.of();
+ }
+
+ String renewer = getYarnTokenRenewer(config);
+
+ Token<?>[] tokens = fileSystem.addDelegationTokens(renewer, credentials);
+ return tokens == null ? ImmutableList.<Token<?>>of() : ImmutableList.copyOf(tokens);
+ }
+
+ public static ByteBuffer encodeCredentials(Credentials credentials) {
+ try {
+ DataOutputBuffer out = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(out);
+ return ByteBuffer.wrap(out.getData(), 0, out.getLength());
+ } catch (IOException e) {
+ // Shouldn't throw
+ LOG.error("Failed to encode Credentials.", e);
+ throw Throwables.propagate(e);
+ }
+ }
+
+ /**
+ * Decodes {@link Credentials} from the given buffer.
+ * If the buffer is null or empty, it returns an empty Credentials.
+ */
+ public static Credentials decodeCredentials(ByteBuffer buffer) throws IOException {
+ Credentials credentials = new Credentials();
+ if (buffer != null && buffer.hasRemaining()) {
+ DataInputByteBuffer in = new DataInputByteBuffer();
+ in.reset(buffer);
+ credentials.readTokenStorageStream(in);
+ }
+ return credentials;
+ }
+
+ public static String getYarnTokenRenewer(Configuration config) throws IOException {
+ String rmHost = getRMAddress(config).getHostName();
+ String renewer = SecurityUtil.getServerPrincipal(config.get(YarnConfiguration.RM_PRINCIPAL), rmHost);
+
+ if (renewer == null || renewer.length() == 0) {
+ throw new IOException("No Kerberos principal for Yarn RM to use as renewer");
+ }
+
+ return renewer;
+ }
+
+ public static InetSocketAddress getRMAddress(Configuration config) {
+ return config.getSocketAddr(YarnConfiguration.RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_PORT);
+ }
+
+ /**
+ * Returns true if Hadoop-2.0 classes are in the classpath.
+ */
+ public static boolean isHadoop20() {
+ Boolean hadoop20 = HADOOP_20.get();
+ if (hadoop20 != null) {
+ return hadoop20;
+ }
+ try {
+ Class.forName("org.apache.hadoop.yarn.client.api.NMClient");
+ HADOOP_20.set(false);
+ return false;
+ } catch (ClassNotFoundException e) {
+ HADOOP_20.set(true);
+ return true;
+ }
+ }
+
+ /**
+ * Helper method to create adapter class for bridging between Hadoop 2.0 and 2.1
+ */
+ private static <T> T createAdapter(Class<T> clz) {
+ String className = clz.getPackage().getName();
+
+ if (isHadoop20()) {
+ className += ".Hadoop20" + clz.getSimpleName();
+ } else {
+ className += ".Hadoop21" + clz.getSimpleName();
+ }
+
+ try {
+ return (T) Class.forName(className).newInstance();
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ private static YarnLocalResource setLocalResourceType(YarnLocalResource localResource, LocalFile localFile) {
+ if (localFile.isArchive()) {
+ if (localFile.getPattern() == null) {
+ localResource.setType(LocalResourceType.ARCHIVE);
+ } else {
+ localResource.setType(LocalResourceType.PATTERN);
+ localResource.setPattern(localFile.getPattern());
+ }
+ } else {
+ localResource.setType(LocalResourceType.FILE);
+ }
+ return localResource;
+ }
+
+ private static <T> Map<String, T> transformResource(Map<String, YarnLocalResource> from) {
+ return Maps.transformValues(from, new Function<YarnLocalResource, T>() {
+ @Override
+ public T apply(YarnLocalResource resource) {
+ return resource.getLocalResource();
+ }
+ });
+ }
+
+ /**
+ * Gets the Hadoop FileSystem from LocationFactory.
+ */
+ private static FileSystem getFileSystem(LocationFactory locationFactory) {
+ if (locationFactory instanceof HDFSLocationFactory) {
+ return ((HDFSLocationFactory) locationFactory).getFileSystem();
+ }
+ if (locationFactory instanceof ForwardingLocationFactory) {
+ return getFileSystem(((ForwardingLocationFactory) locationFactory).getDelegate());
+ }
+ return null;
+ }
+
+ private YarnUtils() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/package-info.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/package-info.java b/yarn/src/main/java/org/apache/twill/internal/yarn/package-info.java
new file mode 100644
index 0000000..d6ec9f7
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains class for interacting with Yarn.
+ */
+package org.apache.twill.internal.yarn;
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/yarn/LocationSecureStoreUpdater.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/yarn/LocationSecureStoreUpdater.java b/yarn/src/main/java/org/apache/twill/yarn/LocationSecureStoreUpdater.java
new file mode 100644
index 0000000..4d20c9c
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/yarn/LocationSecureStoreUpdater.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.SecureStore;
+import org.apache.twill.api.SecureStoreUpdater;
+import org.apache.twill.filesystem.LocationFactory;
+import org.apache.twill.internal.yarn.YarnUtils;
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+
+import java.io.IOException;
+
+/**
+ * Package private class for updating location related secure store.
+ */
+final class LocationSecureStoreUpdater implements SecureStoreUpdater {
+
+ private final Configuration configuration;
+ private final LocationFactory locationFactory;
+
+ LocationSecureStoreUpdater(Configuration configuration, LocationFactory locationFactory) {
+ this.configuration = configuration;
+ this.locationFactory = locationFactory;
+ }
+
+ @Override
+ public SecureStore update(String application, RunId runId) {
+ try {
+ Credentials credentials = new Credentials();
+ YarnUtils.addDelegationTokens(configuration, locationFactory, credentials);
+ return YarnSecureStore.create(credentials);
+ } catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java b/yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java
new file mode 100644
index 0000000..2974c3f
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.ResourceReport;
+import org.apache.twill.internal.json.ResourceReportAdapter;
+import com.google.common.base.Charsets;
+import com.google.common.io.Closeables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.net.URL;
+
+/**
+ * Package private class to get {@link ResourceReport} from the application master.
+ */
+final class ResourceReportClient {
+ private static final Logger LOG = LoggerFactory.getLogger(ResourceReportClient.class);
+
+ private final ResourceReportAdapter reportAdapter;
+ private final URL resourceUrl;
+
+ ResourceReportClient(URL resourceUrl) {
+ this.resourceUrl = resourceUrl;
+ this.reportAdapter = ResourceReportAdapter.create();
+ }
+
+ /**
+ * Returns the resource usage of the application fetched from the resource endpoint URL.
+ * @return A {@link ResourceReport} or {@code null} if failed to fetch the report.
+ */
+ public ResourceReport get() {
+ try {
+ Reader reader = new BufferedReader(new InputStreamReader(resourceUrl.openStream(), Charsets.UTF_8));
+ try {
+ return reportAdapter.fromJson(reader);
+ } finally {
+ Closeables.closeQuietly(reader);
+ }
+ } catch (Exception e) {
+ LOG.error("Exception getting resource report from {}.", resourceUrl, e);
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/yarn/YarnSecureStore.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/yarn/YarnSecureStore.java b/yarn/src/main/java/org/apache/twill/yarn/YarnSecureStore.java
new file mode 100644
index 0000000..e6f461a
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/yarn/YarnSecureStore.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.SecureStore;
+import org.apache.hadoop.security.Credentials;
+
+/**
+ * A {@link SecureStore} for hadoop credentials.
+ */
+public final class YarnSecureStore implements SecureStore {
+
+ private final Credentials credentials;
+
+ public static SecureStore create(Credentials credentials) {
+ return new YarnSecureStore(credentials);
+ }
+
+ private YarnSecureStore(Credentials credentials) {
+ this.credentials = credentials;
+ }
+
+ @Override
+ public Credentials getStore() {
+ return credentials;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java b/yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
new file mode 100644
index 0000000..4c240fb
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.ResourceReport;
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.logging.LogHandler;
+import org.apache.twill.internal.AbstractTwillController;
+import org.apache.twill.internal.Constants;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.appmaster.TrackerService;
+import org.apache.twill.internal.state.StateNode;
+import org.apache.twill.internal.state.SystemMessages;
+import org.apache.twill.internal.yarn.YarnApplicationReport;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.ZKClient;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.commons.lang.time.StopWatch;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URL;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A {@link org.apache.twill.api.TwillController} that controllers application running on Hadoop YARN.
+ */
+final class YarnTwillController extends AbstractTwillController implements TwillController {
+
+ private static final Logger LOG = LoggerFactory.getLogger(YarnTwillController.class);
+
+ private final Callable<ProcessController<YarnApplicationReport>> startUp;
+ private ProcessController<YarnApplicationReport> processController;
+ private ResourceReportClient resourcesClient;
+
+ /**
+ * Creates an instance without any {@link LogHandler}.
+ */
+ YarnTwillController(RunId runId, ZKClient zkClient, Callable<ProcessController<YarnApplicationReport>> startUp) {
+ this(runId, zkClient, ImmutableList.<LogHandler>of(), startUp);
+ }
+
+ YarnTwillController(RunId runId, ZKClient zkClient, Iterable<LogHandler> logHandlers,
+ Callable<ProcessController<YarnApplicationReport>> startUp) {
+ super(runId, zkClient, logHandlers);
+ this.startUp = startUp;
+ }
+
+
+ /**
+ * Sends a message to application to notify the secure store has be updated.
+ */
+ ListenableFuture<Void> secureStoreUpdated() {
+ return sendMessage(SystemMessages.SECURE_STORE_UPDATED, null);
+ }
+
+ @Override
+ protected void doStartUp() {
+ super.doStartUp();
+
+ // Submit and poll the status of the yarn application
+ try {
+ processController = startUp.call();
+
+ YarnApplicationReport report = processController.getReport();
+ LOG.debug("Application {} submit", report.getApplicationId());
+
+ YarnApplicationState state = report.getYarnApplicationState();
+ StopWatch stopWatch = new StopWatch();
+ stopWatch.start();
+ stopWatch.split();
+ long maxTime = TimeUnit.MILLISECONDS.convert(Constants.APPLICATION_MAX_START_SECONDS, TimeUnit.SECONDS);
+
+ LOG.info("Checking yarn application status");
+ while (!hasRun(state) && stopWatch.getSplitTime() < maxTime) {
+ report = processController.getReport();
+ state = report.getYarnApplicationState();
+ LOG.debug("Yarn application status: {}", state);
+ TimeUnit.SECONDS.sleep(1);
+ stopWatch.split();
+ }
+ LOG.info("Yarn application is in state {}", state);
+ if (state != YarnApplicationState.RUNNING) {
+ LOG.info("Yarn application is not in running state. Shutting down controller.",
+ Constants.APPLICATION_MAX_START_SECONDS);
+ forceShutDown();
+ } else {
+ try {
+ URL resourceUrl = URI.create(String.format("http://%s:%d", report.getHost(), report.getRpcPort()))
+ .resolve(TrackerService.PATH).toURL();
+ resourcesClient = new ResourceReportClient(resourceUrl);
+ } catch (IOException e) {
+ resourcesClient = null;
+ }
+ }
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ protected void doShutDown() {
+ if (processController == null) {
+ LOG.warn("No process controller for application that is not submitted.");
+ return;
+ }
+
+ // Wait for the stop message being processed
+ try {
+ Uninterruptibles.getUninterruptibly(getStopMessageFuture(),
+ Constants.APPLICATION_MAX_STOP_SECONDS, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("Failed to wait for stop message being processed.", e);
+ // Kill the application through yarn
+ kill();
+ }
+
+ // Poll application status from yarn
+ try {
+ StopWatch stopWatch = new StopWatch();
+ stopWatch.start();
+ stopWatch.split();
+ long maxTime = TimeUnit.MILLISECONDS.convert(Constants.APPLICATION_MAX_STOP_SECONDS, TimeUnit.SECONDS);
+
+ YarnApplicationReport report = processController.getReport();
+ FinalApplicationStatus finalStatus = report.getFinalApplicationStatus();
+ while (finalStatus == FinalApplicationStatus.UNDEFINED && stopWatch.getSplitTime() < maxTime) {
+ LOG.debug("Yarn application final status for {} {}", report.getApplicationId(), finalStatus);
+ TimeUnit.SECONDS.sleep(1);
+ stopWatch.split();
+ finalStatus = processController.getReport().getFinalApplicationStatus();
+ }
+ LOG.debug("Yarn application final status is {}", finalStatus);
+
+ // Application not finished after max stop time, kill the application
+ if (finalStatus == FinalApplicationStatus.UNDEFINED) {
+ kill();
+ }
+ } catch (Exception e) {
+ LOG.warn("Exception while waiting for application report: {}", e.getMessage(), e);
+ kill();
+ }
+
+ super.doShutDown();
+ }
+
+ @Override
+ public void kill() {
+ if (processController != null) {
+ YarnApplicationReport report = processController.getReport();
+ LOG.info("Killing application {}", report.getApplicationId());
+ processController.cancel();
+ } else {
+ LOG.warn("No process controller for application that is not submitted.");
+ }
+ }
+
+ @Override
+ protected void instanceNodeUpdated(NodeData nodeData) {
+
+ }
+
+ @Override
+ protected void stateNodeUpdated(StateNode stateNode) {
+
+ }
+
+ private boolean hasRun(YarnApplicationState state) {
+ switch (state) {
+ case RUNNING:
+ case FINISHED:
+ case FAILED:
+ case KILLED:
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public ResourceReport getResourceReport() {
+ // in case the user calls this before starting, return null
+ return (resourcesClient == null) ? null : resourcesClient.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java b/yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java
new file mode 100644
index 0000000..11c2ae6
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.logging.LogHandler;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.yarn.YarnApplicationReport;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Factory for creating {@link YarnTwillController}.
+ */
+interface YarnTwillControllerFactory {
+
+ YarnTwillController create(RunId runId, Iterable<LogHandler> logHandlers,
+ Callable<ProcessController<YarnApplicationReport>> startUp);
+}