You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/13 23:27:28 UTC
[12/28] [TWILL-14] Bootstrapping for the site generation.
Reorganization of the source tree happens:
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
new file mode 100644
index 0000000..b4b27a9
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.internal.EnvKeys;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.yarn.AbstractYarnProcessLauncher;
+import org.apache.twill.internal.yarn.YarnContainerInfo;
+import org.apache.twill.internal.yarn.YarnLaunchContext;
+import org.apache.twill.internal.yarn.YarnNMClient;
+import com.google.common.base.Objects;
+import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ *
+ */
+public final class RunnableProcessLauncher extends AbstractYarnProcessLauncher<YarnContainerInfo> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RunnableProcessLauncher.class);
+
+ private final YarnContainerInfo containerInfo;
+ private final YarnNMClient nmClient;
+ private boolean launched;
+
+ public RunnableProcessLauncher(YarnContainerInfo containerInfo, YarnNMClient nmClient) {
+ super(containerInfo);
+ this.containerInfo = containerInfo;
+ this.nmClient = nmClient;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("container", containerInfo)
+ .toString();
+ }
+
+ @Override
+ protected <R> ProcessController<R> doLaunch(YarnLaunchContext launchContext) {
+ Map<String, String> env = Maps.newHashMap(launchContext.getEnvironment());
+
+ // Set extra environments
+ env.put(EnvKeys.YARN_CONTAINER_ID, containerInfo.getId());
+ env.put(EnvKeys.YARN_CONTAINER_HOST, containerInfo.getHost().getHostName());
+ env.put(EnvKeys.YARN_CONTAINER_PORT, Integer.toString(containerInfo.getPort()));
+ env.put(EnvKeys.YARN_CONTAINER_MEMORY_MB, Integer.toString(containerInfo.getMemoryMB()));
+ env.put(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES, Integer.toString(containerInfo.getVirtualCores()));
+
+ launchContext.setEnvironment(env);
+
+ LOG.info("Launching in container {}, {}", containerInfo.getId(), launchContext.getCommands());
+ final Cancellable cancellable = nmClient.start(containerInfo, launchContext);
+ launched = true;
+
+ return new ProcessController<R>() {
+ @Override
+ public R getReport() {
+ // No reporting support for runnable launch yet.
+ return null;
+
+ }
+
+ @Override
+ public void cancel() {
+ cancellable.cancel();
+ }
+ };
+ }
+
+ public boolean isLaunched() {
+ return launched;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
new file mode 100644
index 0000000..beef0d4
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import org.apache.twill.api.ResourceReport;
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.ServiceController;
+import org.apache.twill.api.TwillRunResources;
+import org.apache.twill.internal.ContainerInfo;
+import org.apache.twill.internal.DefaultResourceReport;
+import org.apache.twill.internal.DefaultTwillRunResources;
+import org.apache.twill.internal.RunIds;
+import org.apache.twill.internal.TwillContainerController;
+import org.apache.twill.internal.TwillContainerLauncher;
+import org.apache.twill.internal.container.TwillContainerMain;
+import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.yarn.YarnContainerStatus;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multiset;
+import com.google.common.collect.Table;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A helper class for ApplicationMasterService to keep track of running containers and to interact
+ * with them.
+ */
+final class RunningContainers {
+ private static final Logger LOG = LoggerFactory.getLogger(RunningContainers.class);
+
+ /**
+ * Function to return cardinality of a given BitSet.
+ */
+ private static final Function<BitSet, Integer> BITSET_CARDINALITY = new Function<BitSet, Integer>() {
+ @Override
+ public Integer apply(BitSet input) {
+ return input.cardinality();
+ }
+ };
+
+ // Table of <runnableName, containerId, controller>
+ private final Table<String, String, TwillContainerController> containers;
+
+ // Map from runnableName to a BitSet, with the <instanceId> bit turned on for having an instance running.
+ private final Map<String, BitSet> runnableInstances;
+ private final DefaultResourceReport resourceReport;
+ private final Deque<String> startSequence;
+ private final Lock containerLock;
+ private final Condition containerChange;
+
+ RunningContainers(String appId, TwillRunResources appMasterResources) {
+ containers = HashBasedTable.create();
+ runnableInstances = Maps.newHashMap();
+ startSequence = Lists.newLinkedList();
+ containerLock = new ReentrantLock();
+ containerChange = containerLock.newCondition();
+ resourceReport = new DefaultResourceReport(appId, appMasterResources);
+ }
+
+ /**
+ * Returns {@code true} if there is no live container.
+ */
+ boolean isEmpty() {
+ containerLock.lock();
+ try {
+ return runnableInstances.isEmpty();
+ } finally {
+ containerLock.unlock();
+ }
+ }
+
+ void start(String runnableName, ContainerInfo containerInfo, TwillContainerLauncher launcher) {
+ containerLock.lock();
+ try {
+ int instanceId = getStartInstanceId(runnableName);
+ RunId runId = getRunId(runnableName, instanceId);
+ TwillContainerController controller = launcher.start(runId, instanceId,
+ TwillContainerMain.class, "$HADOOP_CONF_DIR");
+ containers.put(runnableName, containerInfo.getId(), controller);
+
+ TwillRunResources resources = new DefaultTwillRunResources(instanceId,
+ containerInfo.getId(),
+ containerInfo.getVirtualCores(),
+ containerInfo.getMemoryMB(),
+ containerInfo.getHost().getHostName());
+ resourceReport.addRunResources(runnableName, resources);
+
+ if (startSequence.isEmpty() || !runnableName.equals(startSequence.peekLast())) {
+ startSequence.addLast(runnableName);
+ }
+ containerChange.signalAll();
+
+ } finally {
+ containerLock.unlock();
+ }
+ }
+
+ ResourceReport getResourceReport() {
+ return resourceReport;
+ }
+
+ /**
+ * Stops and removes the last running container of the given runnable.
+ */
+ void removeLast(String runnableName) {
+ containerLock.lock();
+ try {
+ int maxInstanceId = getMaxInstanceId(runnableName);
+ if (maxInstanceId < 0) {
+ LOG.warn("No running container found for {}", runnableName);
+ return;
+ }
+
+ String lastContainerId = null;
+ TwillContainerController lastController = null;
+
+ // Find the controller with the maxInstanceId
+ for (Map.Entry<String, TwillContainerController> entry : containers.row(runnableName).entrySet()) {
+ if (getInstanceId(entry.getValue().getRunId()) == maxInstanceId) {
+ lastContainerId = entry.getKey();
+ lastController = entry.getValue();
+ break;
+ }
+ }
+
+ Preconditions.checkState(lastContainerId != null,
+ "No container found for {} with instanceId = {}", runnableName, maxInstanceId);
+
+ LOG.info("Stopping service: {} {}", runnableName, lastController.getRunId());
+ lastController.stopAndWait();
+ containers.remove(runnableName, lastContainerId);
+ removeInstanceId(runnableName, maxInstanceId);
+ resourceReport.removeRunnableResources(runnableName, lastContainerId);
+ containerChange.signalAll();
+ } finally {
+ containerLock.unlock();
+ }
+ }
+
+ /**
+ * Blocks until there are changes in running containers.
+ */
+ void waitForCount(String runnableName, int count) throws InterruptedException {
+ containerLock.lock();
+ try {
+ while (getRunningInstances(runnableName) != count) {
+ containerChange.await();
+ }
+ } finally {
+ containerLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the number of running instances of the given runnable.
+ */
+ int count(String runnableName) {
+ containerLock.lock();
+ try {
+ return getRunningInstances(runnableName);
+ } finally {
+ containerLock.unlock();
+ }
+ }
+
+ /**
+ * Returns a Map contains running instances of all runnables.
+ */
+ Map<String, Integer> countAll() {
+ containerLock.lock();
+ try {
+ return ImmutableMap.copyOf(Maps.transformValues(runnableInstances, BITSET_CARDINALITY));
+ } finally {
+ containerLock.unlock();
+ }
+ }
+
+ void sendToAll(Message message, Runnable completion) {
+ containerLock.lock();
+ try {
+ if (containers.isEmpty()) {
+ completion.run();
+ }
+
+ // Sends the command to all running containers
+ AtomicInteger count = new AtomicInteger(containers.size());
+ for (Map.Entry<String, Map<String, TwillContainerController>> entry : containers.rowMap().entrySet()) {
+ for (TwillContainerController controller : entry.getValue().values()) {
+ sendMessage(entry.getKey(), message, controller, count, completion);
+ }
+ }
+ } finally {
+ containerLock.unlock();
+ }
+ }
+
+ void sendToRunnable(String runnableName, Message message, Runnable completion) {
+ containerLock.lock();
+ try {
+ Collection<TwillContainerController> controllers = containers.row(runnableName).values();
+ if (controllers.isEmpty()) {
+ completion.run();
+ }
+
+ AtomicInteger count = new AtomicInteger(controllers.size());
+ for (TwillContainerController controller : controllers) {
+ sendMessage(runnableName, message, controller, count, completion);
+ }
+ } finally {
+ containerLock.unlock();
+ }
+ }
+
+ /**
+ * Stops all running services. Only called when the AppMaster stops.
+ */
+ void stopAll() {
+ containerLock.lock();
+ try {
+ // Stop it one by one in reverse order of start sequence
+ Iterator<String> itor = startSequence.descendingIterator();
+ List<ListenableFuture<ServiceController.State>> futures = Lists.newLinkedList();
+ while (itor.hasNext()) {
+ String runnableName = itor.next();
+ LOG.info("Stopping all instances of " + runnableName);
+
+ futures.clear();
+ // Parallel stops all running containers of the current runnable.
+ for (TwillContainerController controller : containers.row(runnableName).values()) {
+ futures.add(controller.stop());
+ }
+ // Wait for containers to stop. Assumes the future returned by Futures.successfulAsList won't throw exception.
+ Futures.getUnchecked(Futures.successfulAsList(futures));
+
+ LOG.info("Terminated all instances of " + runnableName);
+ }
+ containers.clear();
+ runnableInstances.clear();
+ } finally {
+ containerLock.unlock();
+ }
+ }
+
+ Set<String> getContainerIds() {
+ containerLock.lock();
+ try {
+ return ImmutableSet.copyOf(containers.columnKeySet());
+ } finally {
+ containerLock.unlock();
+ }
+ }
+
+ /**
+ * Handle completion of container.
+ * @param status The completion status.
+ * @param restartRunnables Set of runnable names that requires restart.
+ */
+ void handleCompleted(YarnContainerStatus status, Multiset<String> restartRunnables) {
+ containerLock.lock();
+ String containerId = status.getContainerId();
+ int exitStatus = status.getExitStatus();
+ ContainerState state = status.getState();
+
+ try {
+ Map<String, TwillContainerController> lookup = containers.column(containerId);
+ if (lookup.isEmpty()) {
+ // It's OK because if a container is stopped through removeLast, this would be empty.
+ return;
+ }
+
+ if (lookup.size() != 1) {
+ LOG.warn("More than one controller found for container {}", containerId);
+ }
+
+ if (exitStatus != 0) {
+ LOG.warn("Container {} exited abnormally with state {}, exit code {}. Re-request the container.",
+ containerId, state, exitStatus);
+ restartRunnables.add(lookup.keySet().iterator().next());
+ } else {
+ LOG.info("Container {} exited normally with state {}", containerId, state);
+ }
+
+ for (Map.Entry<String, TwillContainerController> completedEntry : lookup.entrySet()) {
+ String runnableName = completedEntry.getKey();
+ TwillContainerController controller = completedEntry.getValue();
+ controller.completed(exitStatus);
+
+ removeInstanceId(runnableName, getInstanceId(controller.getRunId()));
+ resourceReport.removeRunnableResources(runnableName, containerId);
+ }
+
+ lookup.clear();
+ containerChange.signalAll();
+ } finally {
+ containerLock.unlock();
+ }
+ }
+
+ /**
+ * Sends a command through the given {@link org.apache.twill.internal.TwillContainerController} of a runnable. Decrements the count
+ * when the sending of command completed. Triggers completion when count reaches zero.
+ */
+ private void sendMessage(final String runnableName, final Message message,
+ final TwillContainerController controller, final AtomicInteger count,
+ final Runnable completion) {
+ Futures.addCallback(controller.sendMessage(message), new FutureCallback<Message>() {
+ @Override
+ public void onSuccess(Message result) {
+ if (count.decrementAndGet() == 0) {
+ completion.run();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ try {
+ LOG.error("Failed to send message. Runnable: {}, RunId: {}, Message: {}.",
+ runnableName, controller.getRunId(), message, t);
+ } finally {
+ if (count.decrementAndGet() == 0) {
+ completion.run();
+ }
+ }
+ }
+ });
+ }
+
+ /**
+ * Returns the instanceId to start the given runnable.
+ */
+ private int getStartInstanceId(String runnableName) {
+ BitSet instances = runnableInstances.get(runnableName);
+ if (instances == null) {
+ instances = new BitSet();
+ runnableInstances.put(runnableName, instances);
+ }
+ int instanceId = instances.nextClearBit(0);
+ instances.set(instanceId);
+ return instanceId;
+ }
+
+ private void removeInstanceId(String runnableName, int instanceId) {
+ BitSet instances = runnableInstances.get(runnableName);
+ if (instances == null) {
+ return;
+ }
+ instances.clear(instanceId);
+ if (instances.isEmpty()) {
+ runnableInstances.remove(runnableName);
+ }
+ }
+
+ /**
+ * Returns the largest instanceId for the given runnable. Returns -1 if no container is running.
+ */
+ private int getMaxInstanceId(String runnableName) {
+ BitSet instances = runnableInstances.get(runnableName);
+ if (instances == null || instances.isEmpty()) {
+ return -1;
+ }
+ return instances.length() - 1;
+ }
+
+ /**
+ * Returns nnumber of running instances for the given runnable.
+ */
+ private int getRunningInstances(String runableName) {
+ BitSet instances = runnableInstances.get(runableName);
+ return instances == null ? 0 : instances.cardinality();
+ }
+
+ private RunId getRunId(String runnableName, int instanceId) {
+ RunId baseId;
+
+ Collection<TwillContainerController> controllers = containers.row(runnableName).values();
+ if (controllers.isEmpty()) {
+ baseId = RunIds.generate();
+ } else {
+ String id = controllers.iterator().next().getRunId().getId();
+ baseId = RunIds.fromString(id.substring(0, id.lastIndexOf('-')));
+ }
+
+ return RunIds.fromString(baseId.getId() + '-' + instanceId);
+ }
+
+ private int getInstanceId(RunId runId) {
+ String id = runId.getId();
+ return Integer.parseInt(id.substring(id.lastIndexOf('-') + 1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
new file mode 100644
index 0000000..ca299e0
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import org.apache.twill.api.ResourceReport;
+import org.apache.twill.internal.json.ResourceReportAdapter;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
+import org.jboss.netty.handler.codec.http.HttpContentCompressor;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpVersion;
+import org.jboss.netty.util.CharsetUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URL;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Webservice that the Application Master will register back to the resource manager
+ * for clients to track application progress. Currently used purely for getting a
+ * breakdown of resource usage as a {@link org.apache.twill.api.ResourceReport}.
+ */
+public final class TrackerService extends AbstractIdleService {
+
+ // TODO: This is temporary. When support more REST API, this would get moved.
+ public static final String PATH = "/resources";
+
+ private static final Logger LOG = LoggerFactory.getLogger(TrackerService.class);
+ private static final int NUM_BOSS_THREADS = 1;
+ private static final int CLOSE_CHANNEL_TIMEOUT = 5;
+ private static final int MAX_INPUT_SIZE = 100 * 1024 * 1024;
+
+ private final String host;
+ private ServerBootstrap bootstrap;
+ private InetSocketAddress bindAddress;
+ private URL url;
+ private final ChannelGroup channelGroup;
+ private final ResourceReport resourceReport;
+
+ /**
+ * Initialize the service.
+ *
+ * @param resourceReport live report that the service will return to clients.
+ * @param appMasterHost the application master host.
+ */
+ public TrackerService(ResourceReport resourceReport, String appMasterHost) {
+ this.channelGroup = new DefaultChannelGroup("appMasterTracker");
+ this.resourceReport = resourceReport;
+ this.host = appMasterHost;
+ }
+
+ /**
+ * Returns the address this tracker service is bounded to.
+ */
+ public InetSocketAddress getBindAddress() {
+ return bindAddress;
+ }
+
+ /**
+ * @return tracker url.
+ */
+ public URL getUrl() {
+ return url;
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ Executor bossThreads = Executors.newFixedThreadPool(NUM_BOSS_THREADS,
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("boss-thread")
+ .build());
+
+ Executor workerThreads = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("worker-thread#%d")
+ .build());
+
+ ChannelFactory factory = new NioServerSocketChannelFactory(bossThreads, workerThreads);
+
+ bootstrap = new ServerBootstrap(factory);
+
+ bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+ public ChannelPipeline getPipeline() {
+ ChannelPipeline pipeline = Channels.pipeline();
+
+ pipeline.addLast("decoder", new HttpRequestDecoder());
+ pipeline.addLast("aggregator", new HttpChunkAggregator(MAX_INPUT_SIZE));
+ pipeline.addLast("encoder", new HttpResponseEncoder());
+ pipeline.addLast("compressor", new HttpContentCompressor());
+ pipeline.addLast("handler", new ReportHandler(resourceReport));
+
+ return pipeline;
+ }
+ });
+
+ Channel channel = bootstrap.bind(new InetSocketAddress(host, 0));
+ bindAddress = (InetSocketAddress) channel.getLocalAddress();
+ url = URI.create(String.format("http://%s:%d", host, bindAddress.getPort()))
+ .resolve(TrackerService.PATH).toURL();
+ channelGroup.add(channel);
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ try {
+ if (!channelGroup.close().await(CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS)) {
+ LOG.warn("Timeout when closing all channels.");
+ }
+ } finally {
+ bootstrap.releaseExternalResources();
+ }
+ }
+
+ /**
+ * Handler to return resources used by this application master, which will be available through
+ * the host and port set when this application master registered itself to the resource manager.
+ */
+ public class ReportHandler extends SimpleChannelUpstreamHandler {
+ private final ResourceReport report;
+ private final ResourceReportAdapter reportAdapter;
+
+ public ReportHandler(ResourceReport report) {
+ this.report = report;
+ this.reportAdapter = ResourceReportAdapter.create();
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ HttpRequest request = (HttpRequest) e.getMessage();
+ if (!isValid(request)) {
+ write404(e);
+ return;
+ }
+
+ writeResponse(e);
+ }
+
+ // only accepts GET on /resources for now
+ private boolean isValid(HttpRequest request) {
+ return (request.getMethod() == HttpMethod.GET) && PATH.equals(request.getUri());
+ }
+
+ private void write404(MessageEvent e) {
+ HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND);
+ ChannelFuture future = e.getChannel().write(response);
+ future.addListener(ChannelFutureListener.CLOSE);
+ }
+
+ private void writeResponse(MessageEvent e) {
+ HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+ response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=UTF-8");
+
+ ChannelBuffer content = ChannelBuffers.dynamicBuffer();
+ Writer writer = new OutputStreamWriter(new ChannelBufferOutputStream(content), CharsetUtil.UTF_8);
+ reportAdapter.toJson(report, writer);
+ try {
+ writer.close();
+ } catch (IOException e1) {
+ LOG.error("error writing resource report", e1);
+ }
+ response.setContent(content);
+ ChannelFuture future = e.getChannel().write(response);
+ future.addListener(ChannelFutureListener.CLOSE);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ e.getChannel().close();
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/package-info.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/package-info.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/package-info.java
new file mode 100644
index 0000000..bf8e677
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains implementation of Twill application master.
+ */
+package org.apache.twill.internal.appmaster;
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
new file mode 100644
index 0000000..bbd6c10
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.container;
+
+import org.apache.twill.api.LocalFile;
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.RuntimeSpecification;
+import org.apache.twill.api.TwillRunnableSpecification;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.discovery.DiscoveryService;
+import org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.twill.internal.Arguments;
+import org.apache.twill.internal.BasicTwillContext;
+import org.apache.twill.internal.Constants;
+import org.apache.twill.internal.ContainerInfo;
+import org.apache.twill.internal.EnvContainerInfo;
+import org.apache.twill.internal.EnvKeys;
+import org.apache.twill.internal.RunIds;
+import org.apache.twill.internal.ServiceMain;
+import org.apache.twill.internal.json.ArgumentsCodec;
+import org.apache.twill.internal.json.TwillSpecificationAdapter;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClient;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.Service;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.Reader;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public final class TwillContainerMain extends ServiceMain {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TwillContainerMain.class);
+
+ /**
+ * Main method for launching a {@link TwillContainerService} which runs
+ * a {@link org.apache.twill.api.TwillRunnable}.
+ */
+ public static void main(final String[] args) throws Exception {
+ // Try to load the secure store from localized file, which AM requested RM to localize it for this container.
+ loadSecureStore();
+
+ String zkConnectStr = System.getenv(EnvKeys.TWILL_ZK_CONNECT);
+ File twillSpecFile = new File(Constants.Files.TWILL_SPEC);
+ RunId appRunId = RunIds.fromString(System.getenv(EnvKeys.TWILL_APP_RUN_ID));
+ RunId runId = RunIds.fromString(System.getenv(EnvKeys.TWILL_RUN_ID));
+ String runnableName = System.getenv(EnvKeys.TWILL_RUNNABLE_NAME);
+ int instanceId = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_ID));
+ int instanceCount = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_COUNT));
+
+ ZKClientService zkClientService = ZKClientServices.delegate(
+ ZKClients.reWatchOnExpire(
+ ZKClients.retryOnFailure(ZKClientService.Builder.of(zkConnectStr).build(),
+ RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
+
+ DiscoveryService discoveryService = new ZKDiscoveryService(zkClientService);
+
+ TwillSpecification twillSpec = loadTwillSpec(twillSpecFile);
+ renameLocalFiles(twillSpec.getRunnables().get(runnableName));
+
+ TwillRunnableSpecification runnableSpec = twillSpec.getRunnables().get(runnableName).getRunnableSpecification();
+ ContainerInfo containerInfo = new EnvContainerInfo();
+ Arguments arguments = decodeArgs();
+ BasicTwillContext context = new BasicTwillContext(
+ runId, appRunId, containerInfo.getHost(),
+ arguments.getRunnableArguments().get(runnableName).toArray(new String[0]),
+ arguments.getArguments().toArray(new String[0]),
+ runnableSpec, instanceId, discoveryService, instanceCount,
+ containerInfo.getMemoryMB(), containerInfo.getVirtualCores()
+ );
+
+ Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
+ Service service = new TwillContainerService(context, containerInfo,
+ getContainerZKClient(zkClientService, appRunId, runnableName),
+ runId, runnableSpec, getClassLoader(),
+ createAppLocation(conf));
+ new TwillContainerMain().doMain(zkClientService, service);
+ }
+
+ private static void loadSecureStore() throws IOException {
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ return;
+ }
+
+ File file = new File(Constants.Files.CREDENTIALS);
+ if (file.exists()) {
+ Credentials credentials = new Credentials();
+ DataInputStream input = new DataInputStream(new FileInputStream(file));
+ try {
+ credentials.readTokenStorageStream(input);
+ } finally {
+ input.close();
+ }
+
+ UserGroupInformation.getCurrentUser().addCredentials(credentials);
+ LOG.info("Secure store updated from {}", file);
+ }
+ }
+
+ private static void renameLocalFiles(RuntimeSpecification runtimeSpec) {
+ for (LocalFile file : runtimeSpec.getLocalFiles()) {
+ if (file.isArchive()) {
+ String path = file.getURI().toString();
+ String name = file.getName() + (path.endsWith(".tar.gz") ? ".tar.gz" : path.substring(path.lastIndexOf('.')));
+ Preconditions.checkState(new File(name).renameTo(new File(file.getName())),
+ "Fail to rename file from %s to %s.",
+ name, file.getName());
+ }
+ }
+ }
+
+ private static ZKClient getContainerZKClient(ZKClient zkClient, RunId appRunId, String runnableName) {
+ return ZKClients.namespace(zkClient, String.format("/%s/runnables/%s", appRunId, runnableName));
+ }
+
+ /**
+ * Returns the ClassLoader for the runnable.
+ */
+ private static ClassLoader getClassLoader() {
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ if (classLoader == null) {
+ return ClassLoader.getSystemClassLoader();
+ }
+ return classLoader;
+ }
+
+ private static TwillSpecification loadTwillSpec(File specFile) throws IOException {
+ Reader reader = Files.newReader(specFile, Charsets.UTF_8);
+ try {
+ return TwillSpecificationAdapter.create().fromJson(reader);
+ } finally {
+ reader.close();
+ }
+ }
+
+ private static Arguments decodeArgs() throws IOException {
+ return ArgumentsCodec.decode(Files.newReaderSupplier(new File(Constants.Files.ARGUMENTS), Charsets.UTF_8));
+ }
+
+ @Override
+ protected String getHostname() {
+ return System.getenv(EnvKeys.YARN_CONTAINER_HOST);
+ }
+
+ @Override
+ protected String getKafkaZKConnect() {
+ return System.getenv(EnvKeys.TWILL_LOG_KAFKA_ZK);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
new file mode 100644
index 0000000..f5bc1f2
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.container;
+
+import org.apache.twill.api.Command;
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.TwillRunnable;
+import org.apache.twill.api.TwillRunnableSpecification;
+import org.apache.twill.common.Threads;
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.internal.AbstractTwillService;
+import org.apache.twill.internal.AbstractTwillService;
+import org.apache.twill.internal.BasicTwillContext;
+import org.apache.twill.internal.ContainerInfo;
+import org.apache.twill.internal.ContainerLiveNodeData;
+import org.apache.twill.internal.ZKServiceDecorator;
+import org.apache.twill.internal.logging.Loggings;
+import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.state.MessageCallback;
+import org.apache.twill.internal.utils.Instances;
+import org.apache.twill.zookeeper.ZKClient;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * This class act as a yarn container and run a {@link org.apache.twill.api.TwillRunnable}.
+ */
+public final class TwillContainerService extends AbstractTwillService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TwillContainerService.class);
+
+ private final TwillRunnableSpecification specification;
+ private final ClassLoader classLoader;
+ private final ContainerLiveNodeData containerLiveNode;
+ private final BasicTwillContext context;
+ private final ZKServiceDecorator serviceDelegate;
+ private ExecutorService commandExecutor;
+ private TwillRunnable runnable;
+
+ public TwillContainerService(BasicTwillContext context, ContainerInfo containerInfo, ZKClient zkClient,
+ RunId runId, TwillRunnableSpecification specification, ClassLoader classLoader,
+ Location applicationLocation) {
+ super(applicationLocation);
+
+ this.specification = specification;
+ this.classLoader = classLoader;
+ this.serviceDelegate = new ZKServiceDecorator(zkClient, runId, createLiveNodeSupplier(), new ServiceDelegate());
+ this.context = context;
+ this.containerLiveNode = new ContainerLiveNodeData(containerInfo.getId(),
+ containerInfo.getHost().getCanonicalHostName());
+ }
+
+ private ListenableFuture<String> processMessage(final String messageId, final Message message) {
+ LOG.debug("Message received: {} {}.", messageId, message);
+
+ if (handleSecureStoreUpdate(message)) {
+ return Futures.immediateFuture(messageId);
+ }
+
+ final SettableFuture<String> result = SettableFuture.create();
+ Command command = message.getCommand();
+ if (message.getType() == Message.Type.SYSTEM
+ && "instances".equals(command.getCommand()) && command.getOptions().containsKey("count")) {
+ context.setInstanceCount(Integer.parseInt(command.getOptions().get("count")));
+ }
+
+ commandExecutor.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ runnable.handleCommand(message.getCommand());
+ result.set(messageId);
+ } catch (Exception e) {
+ result.setException(e);
+ }
+ }
+ });
+ return result;
+ }
+
+ private Supplier<? extends JsonElement> createLiveNodeSupplier() {
+ return new Supplier<JsonElement>() {
+ @Override
+ public JsonElement get() {
+ return new Gson().toJsonTree(containerLiveNode);
+ }
+ };
+ }
+
+ @Override
+ protected Service getServiceDelegate() {
+ return serviceDelegate;
+ }
+
+ private final class ServiceDelegate extends AbstractExecutionThreadService implements MessageCallback {
+
+ @Override
+ protected void startUp() throws Exception {
+ commandExecutor = Executors.newSingleThreadExecutor(
+ Threads.createDaemonThreadFactory("runnable-command-executor"));
+
+ Class<?> runnableClass = classLoader.loadClass(specification.getClassName());
+ Preconditions.checkArgument(TwillRunnable.class.isAssignableFrom(runnableClass),
+ "Class %s is not instance of TwillRunnable.", specification.getClassName());
+
+ runnable = Instances.newInstance((Class<TwillRunnable>) runnableClass);
+ runnable.initialize(context);
+ }
+
+ @Override
+ protected void triggerShutdown() {
+ try {
+ runnable.stop();
+ } catch (Throwable t) {
+ LOG.error("Exception when stopping runnable.", t);
+ }
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ commandExecutor.shutdownNow();
+ runnable.destroy();
+ Loggings.forceFlush();
+ }
+
+ @Override
+ protected void run() throws Exception {
+ runnable.run();
+ }
+
+ @Override
+ public ListenableFuture<String> onReceived(String messageId, Message message) {
+ if (state() == State.RUNNING) {
+ // Only process message if the service is still alive
+ return processMessage(messageId, message);
+ }
+ return Futures.immediateFuture(messageId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
new file mode 100644
index 0000000..b810854
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.api.LocalFile;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.utils.Paths;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Abstract class to help creating different types of process launcher that process on yarn.
+ *
+ * @param <T> Type of the object that contains information about the container that the process is going to launch.
+ */
+public abstract class AbstractYarnProcessLauncher<T> implements ProcessLauncher<T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractYarnProcessLauncher.class);
+
+ private final T containerInfo;
+
+ protected AbstractYarnProcessLauncher(T containerInfo) {
+ this.containerInfo = containerInfo;
+ }
+
+ @Override
+ public T getContainerInfo() {
+ return containerInfo;
+ }
+
+ @Override
+ public <C> PrepareLaunchContext prepareLaunch(Map<String, String> environments,
+ Iterable<LocalFile> resources, C credentials) {
+ if (credentials != null) {
+ Preconditions.checkArgument(credentials instanceof Credentials, "Credentials should be of type %s",
+ Credentials.class.getName());
+ }
+ return new PrepareLaunchContextImpl(environments, resources, (Credentials) credentials);
+ }
+
+ /**
+ * Tells whether to append suffix to localize resource name for archive file type. Default is true.
+ */
+ protected boolean useArchiveSuffix() {
+ return true;
+ }
+
+ /**
+ * For children class to override to perform actual process launching.
+ */
+ protected abstract <R> ProcessController<R> doLaunch(YarnLaunchContext launchContext);
+
+ /**
+ * Implementation for the {@link PrepareLaunchContext}.
+ */
+ private final class PrepareLaunchContextImpl implements PrepareLaunchContext {
+
+ private final Credentials credentials;
+ private final YarnLaunchContext launchContext;
+ private final Map<String, YarnLocalResource> localResources;
+ private final Map<String, String> environment;
+ private final List<String> commands;
+
+ private PrepareLaunchContextImpl(Map<String, String> env, Iterable<LocalFile> localFiles, Credentials credentials) {
+ this.credentials = credentials;
+ this.launchContext = YarnUtils.createLaunchContext();
+ this.localResources = Maps.newHashMap();
+ this.environment = Maps.newHashMap(env);
+ this.commands = Lists.newLinkedList();
+
+ for (LocalFile localFile : localFiles) {
+ addLocalFile(localFile);
+ }
+ }
+
+ private void addLocalFile(LocalFile localFile) {
+ String name = localFile.getName();
+ // Always append the file extension as the resource name so that archive expansion by Yarn could work.
+ // Renaming would happen by the Container Launcher.
+ if (localFile.isArchive() && useArchiveSuffix()) {
+ String path = localFile.getURI().toString();
+ String suffix = Paths.getExtension(path);
+ if (!suffix.isEmpty()) {
+ name += '.' + suffix;
+ }
+ }
+ localResources.put(name, YarnUtils.createLocalResource(localFile));
+ }
+
+ @Override
+ public ResourcesAdder withResources() {
+ return new MoreResourcesImpl();
+ }
+
+ @Override
+ public AfterResources noResources() {
+ return new MoreResourcesImpl();
+ }
+
+ private final class MoreResourcesImpl implements MoreResources {
+
+ @Override
+ public MoreResources add(LocalFile localFile) {
+ addLocalFile(localFile);
+ return this;
+ }
+
+ @Override
+ public EnvironmentAdder withEnvironment() {
+ return finish();
+ }
+
+ @Override
+ public AfterEnvironment noEnvironment() {
+ return finish();
+ }
+
+ private MoreEnvironmentImpl finish() {
+ launchContext.setLocalResources(localResources);
+ return new MoreEnvironmentImpl();
+ }
+ }
+
+ private final class MoreEnvironmentImpl implements MoreEnvironment {
+
+ @Override
+ public CommandAdder withCommands() {
+ launchContext.setEnvironment(environment);
+ return new MoreCommandImpl();
+ }
+
+ @Override
+ public <V> MoreEnvironment add(String key, V value) {
+ environment.put(key, value.toString());
+ return this;
+ }
+ }
+
+ private final class MoreCommandImpl implements MoreCommand, StdOutSetter, StdErrSetter {
+
+ private final StringBuilder commandBuilder = new StringBuilder();
+
+ @Override
+ public StdOutSetter add(String cmd, String... args) {
+ commandBuilder.append(cmd);
+ for (String arg : args) {
+ commandBuilder.append(' ').append(arg);
+ }
+ return this;
+ }
+
+ @Override
+ public <R> ProcessController<R> launch() {
+ if (credentials != null && !credentials.getAllTokens().isEmpty()) {
+ for (Token<?> token : credentials.getAllTokens()) {
+ LOG.info("Launch with delegation token {}", token);
+ }
+ launchContext.setCredentials(credentials);
+ }
+ launchContext.setCommands(commands);
+ return doLaunch(launchContext);
+ }
+
+ @Override
+ public MoreCommand redirectError(String stderr) {
+ redirect(2, stderr);
+ return noError();
+ }
+
+ @Override
+ public MoreCommand noError() {
+ commands.add(commandBuilder.toString());
+ commandBuilder.setLength(0);
+ return this;
+ }
+
+ @Override
+ public StdErrSetter redirectOutput(String stdout) {
+ redirect(1, stdout);
+ return this;
+ }
+
+ @Override
+ public StdErrSetter noOutput() {
+ return this;
+ }
+
+ private void redirect(int type, String out) {
+ commandBuilder.append(' ')
+ .append(type).append('>')
+ .append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append('/').append(out);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
new file mode 100644
index 0000000..6f47b6c
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ *
+ */
+public final class VersionDetectYarnAMClientFactory implements YarnAMClientFactory {
+
+ private final Configuration conf;
+
+ public VersionDetectYarnAMClientFactory(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public YarnAMClient create() {
+ try {
+ Class<YarnAMClient> clz;
+ if (YarnUtils.isHadoop20()) {
+ // Uses hadoop-2.0 class
+ String clzName = getClass().getPackage().getName() + ".Hadoop20YarnAMClient";
+ clz = (Class<YarnAMClient>) Class.forName(clzName);
+ } else {
+ // Uses hadoop-2.1 class
+ String clzName = getClass().getPackage().getName() + ".Hadoop21YarnAMClient";
+ clz = (Class<YarnAMClient>) Class.forName(clzName);
+ }
+
+ return clz.getConstructor(Configuration.class).newInstance(conf);
+
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
new file mode 100644
index 0000000..f9db959
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ *
+ */
+public final class VersionDetectYarnAppClientFactory implements YarnAppClientFactory {
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public YarnAppClient create(Configuration configuration) {
+ try {
+ Class<YarnAppClient> clz;
+
+ if (YarnUtils.isHadoop20()) {
+ // Uses hadoop-2.0 class.
+ String clzName = getClass().getPackage().getName() + ".Hadoop20YarnAppClient";
+ clz = (Class<YarnAppClient>) Class.forName(clzName);
+ } else {
+ // Uses hadoop-2.1 class
+ String clzName = getClass().getPackage().getName() + ".Hadoop21YarnAppClient";
+ clz = (Class<YarnAppClient>) Class.forName(clzName);
+ }
+
+ return clz.getConstructor(Configuration.class).newInstance(configuration);
+
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
new file mode 100644
index 0000000..83ba6a8
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.internal.ProcessLauncher;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Service;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This interface provides abstraction for AM to interacts with YARN to abstract out YARN version specific
+ * code, making multi-version compatibility easier.
+ */
+public interface YarnAMClient extends Service {
+
+ /**
+ * Builder for creating a container request.
+ */
+ abstract class ContainerRequestBuilder {
+
+ protected final Resource capability;
+ protected final int count;
+ protected final Set<String> hosts = Sets.newHashSet();
+ protected final Set<String> racks = Sets.newHashSet();
+ protected final Priority priority = Records.newRecord(Priority.class);
+
+ protected ContainerRequestBuilder(Resource capability, int count) {
+ this.capability = capability;
+ this.count = count;
+ }
+
+ public ContainerRequestBuilder addHosts(String firstHost, String...moreHosts) {
+ return add(hosts, firstHost, moreHosts);
+ }
+
+ public ContainerRequestBuilder addRacks(String firstRack, String...moreRacks) {
+ return add(racks, firstRack, moreRacks);
+ }
+
+ public ContainerRequestBuilder setPriority(int prio) {
+ priority.setPriority(prio);
+ return this;
+ }
+
+ /**
+ * Adds a container request. Returns an unique ID for the request.
+ */
+ public abstract String apply();
+
+ private <T> ContainerRequestBuilder add(Collection<T> collection, T first, T... more) {
+ collection.add(first);
+ Collections.addAll(collection, more);
+ return this;
+ }
+ }
+
+ ContainerId getContainerId();
+
+ String getHost();
+
+ /**
+ * Sets the tracker address and tracker url. This method should be called before calling {@link #start()}.
+ */
+ void setTracker(InetSocketAddress trackerAddr, URL trackerUrl);
+
+ /**
+ * Callback for allocate call.
+ */
+ // TODO: Move AM heartbeat logic into this interface so AM only needs to handle callback.
+ interface AllocateHandler {
+ void acquired(List<ProcessLauncher<YarnContainerInfo>> launchers);
+
+ void completed(List<YarnContainerStatus> completed);
+ }
+
+ void allocate(float progress, AllocateHandler handler) throws Exception;
+
+ ContainerRequestBuilder addContainerRequest(Resource capability);
+
+ ContainerRequestBuilder addContainerRequest(Resource capability, int count);
+
+ /**
+ * Notify a container request is fulfilled.
+ *
+ * Note: This method is needed to workaround a seemingly bug from AMRMClient implementation in YARN that if
+ * a container is requested after a previous container was acquired (with the same capability), multiple containers
+ * will get allocated instead of one.
+ *
+ * @param id The ID returned by {@link YarnAMClient.ContainerRequestBuilder#apply()}.
+ */
+ void completeContainerRequest(String id);
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClientFactory.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClientFactory.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClientFactory.java
new file mode 100644
index 0000000..b2a1194
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClientFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+/**
+ *
+ */
+public interface YarnAMClientFactory {
+
+ YarnAMClient create();
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
new file mode 100644
index 0000000..71a9e68
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.ProcessLauncher;
+import com.google.common.util.concurrent.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * Interface for launching Yarn application from client.
+ */
+public interface YarnAppClient extends Service {
+
+ /**
+ * Creates a {@link ProcessLauncher} for launching the application represented by the given spec.
+ */
+ ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec) throws Exception;
+
+ /**
+ * Creates a {@link ProcessLauncher} for launching application with the given user and spec.
+ *
+ * @deprecated This method will get removed.
+ */
+ @Deprecated
+ ProcessLauncher<ApplicationId> createLauncher(String user, TwillSpecification twillSpec) throws Exception;
+
+ ProcessController<YarnApplicationReport> createProcessController(ApplicationId appId);
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClientFactory.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClientFactory.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClientFactory.java
new file mode 100644
index 0000000..70cecad
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClientFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ *
+ */
+public interface YarnAppClientFactory {
+
+ YarnAppClient create(Configuration configuration);
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnApplicationReport.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnApplicationReport.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnApplicationReport.java
new file mode 100644
index 0000000..4dbb1d1
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnApplicationReport.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+
+/**
+ * This interface is for adapting differences in ApplicationReport in different Hadoop version.
+ */
+public interface YarnApplicationReport {
+
+ /**
+ * Get the <code>ApplicationId</code> of the application.
+ * @return <code>ApplicationId</code> of the application
+ */
+ ApplicationId getApplicationId();
+
+ /**
+ * Get the <code>ApplicationAttemptId</code> of the current
+ * attempt of the application
+ * @return <code>ApplicationAttemptId</code> of the attempt
+ */
+ ApplicationAttemptId getCurrentApplicationAttemptId();
+
+ /**
+ * Get the <em>queue</em> to which the application was submitted.
+ * @return <em>queue</em> to which the application was submitted
+ */
+ String getQueue();
+
+ /**
+ * Get the user-defined <em>name</em> of the application.
+ * @return <em>name</em> of the application
+ */
+ String getName();
+
+ /**
+ * Get the <em>host</em> on which the <code>ApplicationMaster</code>
+ * is running.
+ * @return <em>host</em> on which the <code>ApplicationMaster</code>
+ * is running
+ */
+ String getHost();
+
+ /**
+ * Get the <em>RPC port</em> of the <code>ApplicationMaster</code>.
+ * @return <em>RPC port</em> of the <code>ApplicationMaster</code>
+ */
+ int getRpcPort();
+
+
+ /**
+ * Get the <code>YarnApplicationState</code> of the application.
+ * @return <code>YarnApplicationState</code> of the application
+ */
+ YarnApplicationState getYarnApplicationState();
+
+
+ /**
+ * Get the <em>diagnositic information</em> of the application in case of
+ * errors.
+ * @return <em>diagnositic information</em> of the application in case
+ * of errors
+ */
+ String getDiagnostics();
+
+
+ /**
+ * Get the <em>tracking url</em> for the application.
+ * @return <em>tracking url</em> for the application
+ */
+ String getTrackingUrl();
+
+
+ /**
+ * Get the original not-proxied <em>tracking url</em> for the application.
+ * This is intended to only be used by the proxy itself.
+ * @return the original not-proxied <em>tracking url</em> for the application
+ */
+ String getOriginalTrackingUrl();
+
+ /**
+ * Get the <em>start time</em> of the application.
+ * @return <em>start time</em> of the application
+ */
+ long getStartTime();
+
+
+ /**
+ * Get the <em>finish time</em> of the application.
+ * @return <em>finish time</em> of the application
+ */
+ long getFinishTime();
+
+
+ /**
+ * Get the <em>final finish status</em> of the application.
+ * @return <em>final finish status</em> of the application
+ */
+ FinalApplicationStatus getFinalApplicationStatus();
+
+ /**
+ * Retrieve the structure containing the job resources for this application
+ * @return the job resources structure for this application
+ */
+ ApplicationResourceUsageReport getApplicationResourceUsageReport();
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerInfo.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerInfo.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerInfo.java
new file mode 100644
index 0000000..e806da7
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerInfo.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.internal.ContainerInfo;
+
+/**
+ *
+ */
+public interface YarnContainerInfo extends ContainerInfo {
+
+ <T> T getContainer();
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerStatus.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerStatus.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerStatus.java
new file mode 100644
index 0000000..57e712c
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerStatus.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.ContainerState;
+
+/**
+ * This interface is for adapting differences in ContainerStatus between Hadoop 2.0 and 2.1
+ */
+public interface YarnContainerStatus {
+
+ String getContainerId();
+
+ ContainerState getState();
+
+ int getExitStatus();
+
+ String getDiagnostics();
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnLaunchContext.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnLaunchContext.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnLaunchContext.java
new file mode 100644
index 0000000..984a1be
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnLaunchContext.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This interface is for adapting ContainerLaunchContext in different Hadoop version
+ */
+public interface YarnLaunchContext {
+
+ <T> T getLaunchContext();
+
+ void setCredentials(Credentials credentials);
+
+ void setLocalResources(Map<String, YarnLocalResource> localResources);
+
+ void setServiceData(Map<String, ByteBuffer> serviceData);
+
+ Map<String, String> getEnvironment();
+
+ void setEnvironment(Map<String, String> environment);
+
+ List<String> getCommands();
+
+ void setCommands(List<String> commands);
+
+ void setApplicationACLs(Map<ApplicationAccessType, String> acls);
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnLocalResource.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnLocalResource.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnLocalResource.java
new file mode 100644
index 0000000..9bfc224
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnLocalResource.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+
+/**
+ * A adapter interface for the LocalResource class/interface in different Hadoop version.
+ */
+public interface YarnLocalResource {
+
+ /**
+ * Returns the actual LocalResource object in Yarn.
+ */
+ <T> T getLocalResource();
+
+ /**
+ * Get the <em>location</em> of the resource to be localized.
+ * @return <em>location</em> of the resource to be localized
+ */
+ URL getResource();
+
+ /**
+ * Set <em>location</em> of the resource to be localized.
+ * @param resource <em>location</em> of the resource to be localized
+ */
+ void setResource(URL resource);
+
+ /**
+ * Get the <em>size</em> of the resource to be localized.
+ * @return <em>size</em> of the resource to be localized
+ */
+ long getSize();
+
+ /**
+ * Set the <em>size</em> of the resource to be localized.
+ * @param size <em>size</em> of the resource to be localized
+ */
+ void setSize(long size);
+
+ /**
+ * Get the original <em>timestamp</em> of the resource to be localized, used
+ * for verification.
+ * @return <em>timestamp</em> of the resource to be localized
+ */
+ long getTimestamp();
+
+ /**
+ * Set the <em>timestamp</em> of the resource to be localized, used
+ * for verification.
+ * @param timestamp <em>timestamp</em> of the resource to be localized
+ */
+ void setTimestamp(long timestamp);
+
+ /**
+ * Get the <code>LocalResourceType</code> of the resource to be localized.
+ * @return <code>LocalResourceType</code> of the resource to be localized
+ */
+ LocalResourceType getType();
+
+ /**
+ * Set the <code>LocalResourceType</code> of the resource to be localized.
+ * @param type <code>LocalResourceType</code> of the resource to be localized
+ */
+ void setType(LocalResourceType type);
+
+ /**
+ * Get the <code>LocalResourceVisibility</code> of the resource to be
+ * localized.
+ * @return <code>LocalResourceVisibility</code> of the resource to be
+ * localized
+ */
+ LocalResourceVisibility getVisibility();
+
+ /**
+ * Set the <code>LocalResourceVisibility</code> of the resource to be
+ * localized.
+ * @param visibility <code>LocalResourceVisibility</code> of the resource to be
+ * localized
+ */
+ void setVisibility(LocalResourceVisibility visibility);
+
+ /**
+ * Get the <em>pattern</em> that should be used to extract entries from the
+ * archive (only used when type is <code>PATTERN</code>).
+ * @return <em>pattern</em> that should be used to extract entries from the
+ * archive.
+ */
+ String getPattern();
+
+ /**
+ * Set the <em>pattern</em> that should be used to extract entries from the
+ * archive (only used when type is <code>PATTERN</code>).
+ * @param pattern <em>pattern</em> that should be used to extract entries
+ * from the archive.
+ */
+ void setPattern(String pattern);
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnNMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnNMClient.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnNMClient.java
new file mode 100644
index 0000000..d863c91
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnNMClient.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.common.Cancellable;
+
+/**
+ * Abstraction for dealing with API differences in different hadoop yarn version
+ */
+public interface YarnNMClient {
+
+ /**
+ * Starts a process based on the given launch context.
+ *
+ * @param containerInfo The containerInfo that the new process will launch in.
+ * @param launchContext Contains information about the process going to start.
+ * @return A {@link Cancellable} that when {@link Cancellable#cancel()}} is invoked,
+ * it will try to shutdown the process.
+ *
+ */
+ Cancellable start(YarnContainerInfo containerInfo, YarnLaunchContext launchContext);
+}