You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/12 22:59:47 UTC
[05/28] Making maven site works.
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
deleted file mode 100644
index bbd6c10..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.container;
-
-import org.apache.twill.api.LocalFile;
-import org.apache.twill.api.RunId;
-import org.apache.twill.api.RuntimeSpecification;
-import org.apache.twill.api.TwillRunnableSpecification;
-import org.apache.twill.api.TwillSpecification;
-import org.apache.twill.discovery.DiscoveryService;
-import org.apache.twill.discovery.ZKDiscoveryService;
-import org.apache.twill.internal.Arguments;
-import org.apache.twill.internal.BasicTwillContext;
-import org.apache.twill.internal.Constants;
-import org.apache.twill.internal.ContainerInfo;
-import org.apache.twill.internal.EnvContainerInfo;
-import org.apache.twill.internal.EnvKeys;
-import org.apache.twill.internal.RunIds;
-import org.apache.twill.internal.ServiceMain;
-import org.apache.twill.internal.json.ArgumentsCodec;
-import org.apache.twill.internal.json.TwillSpecificationAdapter;
-import org.apache.twill.zookeeper.RetryStrategies;
-import org.apache.twill.zookeeper.ZKClient;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.twill.zookeeper.ZKClientServices;
-import org.apache.twill.zookeeper.ZKClients;
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import com.google.common.io.Files;
-import com.google.common.util.concurrent.Service;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.Reader;
-import java.util.concurrent.TimeUnit;
-
-/**
- *
- */
-public final class TwillContainerMain extends ServiceMain {
-
- private static final Logger LOG = LoggerFactory.getLogger(TwillContainerMain.class);
-
- /**
- * Main method for launching a {@link TwillContainerService} which runs
- * a {@link org.apache.twill.api.TwillRunnable}.
- */
- public static void main(final String[] args) throws Exception {
- // Try to load the secure store from localized file, which AM requested RM to localize it for this container.
- loadSecureStore();
-
- String zkConnectStr = System.getenv(EnvKeys.TWILL_ZK_CONNECT);
- File twillSpecFile = new File(Constants.Files.TWILL_SPEC);
- RunId appRunId = RunIds.fromString(System.getenv(EnvKeys.TWILL_APP_RUN_ID));
- RunId runId = RunIds.fromString(System.getenv(EnvKeys.TWILL_RUN_ID));
- String runnableName = System.getenv(EnvKeys.TWILL_RUNNABLE_NAME);
- int instanceId = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_ID));
- int instanceCount = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_COUNT));
-
- ZKClientService zkClientService = ZKClientServices.delegate(
- ZKClients.reWatchOnExpire(
- ZKClients.retryOnFailure(ZKClientService.Builder.of(zkConnectStr).build(),
- RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
-
- DiscoveryService discoveryService = new ZKDiscoveryService(zkClientService);
-
- TwillSpecification twillSpec = loadTwillSpec(twillSpecFile);
- renameLocalFiles(twillSpec.getRunnables().get(runnableName));
-
- TwillRunnableSpecification runnableSpec = twillSpec.getRunnables().get(runnableName).getRunnableSpecification();
- ContainerInfo containerInfo = new EnvContainerInfo();
- Arguments arguments = decodeArgs();
- BasicTwillContext context = new BasicTwillContext(
- runId, appRunId, containerInfo.getHost(),
- arguments.getRunnableArguments().get(runnableName).toArray(new String[0]),
- arguments.getArguments().toArray(new String[0]),
- runnableSpec, instanceId, discoveryService, instanceCount,
- containerInfo.getMemoryMB(), containerInfo.getVirtualCores()
- );
-
- Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
- Service service = new TwillContainerService(context, containerInfo,
- getContainerZKClient(zkClientService, appRunId, runnableName),
- runId, runnableSpec, getClassLoader(),
- createAppLocation(conf));
- new TwillContainerMain().doMain(zkClientService, service);
- }
-
- private static void loadSecureStore() throws IOException {
- if (!UserGroupInformation.isSecurityEnabled()) {
- return;
- }
-
- File file = new File(Constants.Files.CREDENTIALS);
- if (file.exists()) {
- Credentials credentials = new Credentials();
- DataInputStream input = new DataInputStream(new FileInputStream(file));
- try {
- credentials.readTokenStorageStream(input);
- } finally {
- input.close();
- }
-
- UserGroupInformation.getCurrentUser().addCredentials(credentials);
- LOG.info("Secure store updated from {}", file);
- }
- }
-
- private static void renameLocalFiles(RuntimeSpecification runtimeSpec) {
- for (LocalFile file : runtimeSpec.getLocalFiles()) {
- if (file.isArchive()) {
- String path = file.getURI().toString();
- String name = file.getName() + (path.endsWith(".tar.gz") ? ".tar.gz" : path.substring(path.lastIndexOf('.')));
- Preconditions.checkState(new File(name).renameTo(new File(file.getName())),
- "Fail to rename file from %s to %s.",
- name, file.getName());
- }
- }
- }
-
- private static ZKClient getContainerZKClient(ZKClient zkClient, RunId appRunId, String runnableName) {
- return ZKClients.namespace(zkClient, String.format("/%s/runnables/%s", appRunId, runnableName));
- }
-
- /**
- * Returns the ClassLoader for the runnable.
- */
- private static ClassLoader getClassLoader() {
- ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
- if (classLoader == null) {
- return ClassLoader.getSystemClassLoader();
- }
- return classLoader;
- }
-
- private static TwillSpecification loadTwillSpec(File specFile) throws IOException {
- Reader reader = Files.newReader(specFile, Charsets.UTF_8);
- try {
- return TwillSpecificationAdapter.create().fromJson(reader);
- } finally {
- reader.close();
- }
- }
-
- private static Arguments decodeArgs() throws IOException {
- return ArgumentsCodec.decode(Files.newReaderSupplier(new File(Constants.Files.ARGUMENTS), Charsets.UTF_8));
- }
-
- @Override
- protected String getHostname() {
- return System.getenv(EnvKeys.YARN_CONTAINER_HOST);
- }
-
- @Override
- protected String getKafkaZKConnect() {
- return System.getenv(EnvKeys.TWILL_LOG_KAFKA_ZK);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java b/yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
deleted file mode 100644
index f5bc1f2..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.container;
-
-import org.apache.twill.api.Command;
-import org.apache.twill.api.RunId;
-import org.apache.twill.api.TwillRunnable;
-import org.apache.twill.api.TwillRunnableSpecification;
-import org.apache.twill.common.Threads;
-import org.apache.twill.filesystem.Location;
-import org.apache.twill.internal.AbstractTwillService;
-import org.apache.twill.internal.AbstractTwillService;
-import org.apache.twill.internal.BasicTwillContext;
-import org.apache.twill.internal.ContainerInfo;
-import org.apache.twill.internal.ContainerLiveNodeData;
-import org.apache.twill.internal.ZKServiceDecorator;
-import org.apache.twill.internal.logging.Loggings;
-import org.apache.twill.internal.state.Message;
-import org.apache.twill.internal.state.MessageCallback;
-import org.apache.twill.internal.utils.Instances;
-import org.apache.twill.zookeeper.ZKClient;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.util.concurrent.AbstractExecutionThreadService;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Service;
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.gson.Gson;
-import com.google.gson.JsonElement;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * This class act as a yarn container and run a {@link org.apache.twill.api.TwillRunnable}.
- */
-public final class TwillContainerService extends AbstractTwillService {
-
- private static final Logger LOG = LoggerFactory.getLogger(TwillContainerService.class);
-
- private final TwillRunnableSpecification specification;
- private final ClassLoader classLoader;
- private final ContainerLiveNodeData containerLiveNode;
- private final BasicTwillContext context;
- private final ZKServiceDecorator serviceDelegate;
- private ExecutorService commandExecutor;
- private TwillRunnable runnable;
-
- public TwillContainerService(BasicTwillContext context, ContainerInfo containerInfo, ZKClient zkClient,
- RunId runId, TwillRunnableSpecification specification, ClassLoader classLoader,
- Location applicationLocation) {
- super(applicationLocation);
-
- this.specification = specification;
- this.classLoader = classLoader;
- this.serviceDelegate = new ZKServiceDecorator(zkClient, runId, createLiveNodeSupplier(), new ServiceDelegate());
- this.context = context;
- this.containerLiveNode = new ContainerLiveNodeData(containerInfo.getId(),
- containerInfo.getHost().getCanonicalHostName());
- }
-
- private ListenableFuture<String> processMessage(final String messageId, final Message message) {
- LOG.debug("Message received: {} {}.", messageId, message);
-
- if (handleSecureStoreUpdate(message)) {
- return Futures.immediateFuture(messageId);
- }
-
- final SettableFuture<String> result = SettableFuture.create();
- Command command = message.getCommand();
- if (message.getType() == Message.Type.SYSTEM
- && "instances".equals(command.getCommand()) && command.getOptions().containsKey("count")) {
- context.setInstanceCount(Integer.parseInt(command.getOptions().get("count")));
- }
-
- commandExecutor.execute(new Runnable() {
-
- @Override
- public void run() {
- try {
- runnable.handleCommand(message.getCommand());
- result.set(messageId);
- } catch (Exception e) {
- result.setException(e);
- }
- }
- });
- return result;
- }
-
- private Supplier<? extends JsonElement> createLiveNodeSupplier() {
- return new Supplier<JsonElement>() {
- @Override
- public JsonElement get() {
- return new Gson().toJsonTree(containerLiveNode);
- }
- };
- }
-
- @Override
- protected Service getServiceDelegate() {
- return serviceDelegate;
- }
-
- private final class ServiceDelegate extends AbstractExecutionThreadService implements MessageCallback {
-
- @Override
- protected void startUp() throws Exception {
- commandExecutor = Executors.newSingleThreadExecutor(
- Threads.createDaemonThreadFactory("runnable-command-executor"));
-
- Class<?> runnableClass = classLoader.loadClass(specification.getClassName());
- Preconditions.checkArgument(TwillRunnable.class.isAssignableFrom(runnableClass),
- "Class %s is not instance of TwillRunnable.", specification.getClassName());
-
- runnable = Instances.newInstance((Class<TwillRunnable>) runnableClass);
- runnable.initialize(context);
- }
-
- @Override
- protected void triggerShutdown() {
- try {
- runnable.stop();
- } catch (Throwable t) {
- LOG.error("Exception when stopping runnable.", t);
- }
- }
-
- @Override
- protected void shutDown() throws Exception {
- commandExecutor.shutdownNow();
- runnable.destroy();
- Loggings.forceFlush();
- }
-
- @Override
- protected void run() throws Exception {
- runnable.run();
- }
-
- @Override
- public ListenableFuture<String> onReceived(String messageId, Message message) {
- if (state() == State.RUNNING) {
- // Only process message if the service is still alive
- return processMessage(messageId, message);
- }
- return Futures.immediateFuture(messageId);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java b/yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
deleted file mode 100644
index b810854..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.twill.api.LocalFile;
-import org.apache.twill.internal.ProcessController;
-import org.apache.twill.internal.ProcessLauncher;
-import org.apache.twill.internal.utils.Paths;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Abstract class to help creating different types of process launcher that process on yarn.
- *
- * @param <T> Type of the object that contains information about the container that the process is going to launch.
- */
-public abstract class AbstractYarnProcessLauncher<T> implements ProcessLauncher<T> {
-
- private static final Logger LOG = LoggerFactory.getLogger(AbstractYarnProcessLauncher.class);
-
- private final T containerInfo;
-
- protected AbstractYarnProcessLauncher(T containerInfo) {
- this.containerInfo = containerInfo;
- }
-
- @Override
- public T getContainerInfo() {
- return containerInfo;
- }
-
- @Override
- public <C> PrepareLaunchContext prepareLaunch(Map<String, String> environments,
- Iterable<LocalFile> resources, C credentials) {
- if (credentials != null) {
- Preconditions.checkArgument(credentials instanceof Credentials, "Credentials should be of type %s",
- Credentials.class.getName());
- }
- return new PrepareLaunchContextImpl(environments, resources, (Credentials) credentials);
- }
-
- /**
- * Tells whether to append suffix to localize resource name for archive file type. Default is true.
- */
- protected boolean useArchiveSuffix() {
- return true;
- }
-
- /**
- * For children class to override to perform actual process launching.
- */
- protected abstract <R> ProcessController<R> doLaunch(YarnLaunchContext launchContext);
-
- /**
- * Implementation for the {@link PrepareLaunchContext}.
- */
- private final class PrepareLaunchContextImpl implements PrepareLaunchContext {
-
- private final Credentials credentials;
- private final YarnLaunchContext launchContext;
- private final Map<String, YarnLocalResource> localResources;
- private final Map<String, String> environment;
- private final List<String> commands;
-
- private PrepareLaunchContextImpl(Map<String, String> env, Iterable<LocalFile> localFiles, Credentials credentials) {
- this.credentials = credentials;
- this.launchContext = YarnUtils.createLaunchContext();
- this.localResources = Maps.newHashMap();
- this.environment = Maps.newHashMap(env);
- this.commands = Lists.newLinkedList();
-
- for (LocalFile localFile : localFiles) {
- addLocalFile(localFile);
- }
- }
-
- private void addLocalFile(LocalFile localFile) {
- String name = localFile.getName();
- // Always append the file extension as the resource name so that archive expansion by Yarn could work.
- // Renaming would happen by the Container Launcher.
- if (localFile.isArchive() && useArchiveSuffix()) {
- String path = localFile.getURI().toString();
- String suffix = Paths.getExtension(path);
- if (!suffix.isEmpty()) {
- name += '.' + suffix;
- }
- }
- localResources.put(name, YarnUtils.createLocalResource(localFile));
- }
-
- @Override
- public ResourcesAdder withResources() {
- return new MoreResourcesImpl();
- }
-
- @Override
- public AfterResources noResources() {
- return new MoreResourcesImpl();
- }
-
- private final class MoreResourcesImpl implements MoreResources {
-
- @Override
- public MoreResources add(LocalFile localFile) {
- addLocalFile(localFile);
- return this;
- }
-
- @Override
- public EnvironmentAdder withEnvironment() {
- return finish();
- }
-
- @Override
- public AfterEnvironment noEnvironment() {
- return finish();
- }
-
- private MoreEnvironmentImpl finish() {
- launchContext.setLocalResources(localResources);
- return new MoreEnvironmentImpl();
- }
- }
-
- private final class MoreEnvironmentImpl implements MoreEnvironment {
-
- @Override
- public CommandAdder withCommands() {
- launchContext.setEnvironment(environment);
- return new MoreCommandImpl();
- }
-
- @Override
- public <V> MoreEnvironment add(String key, V value) {
- environment.put(key, value.toString());
- return this;
- }
- }
-
- private final class MoreCommandImpl implements MoreCommand, StdOutSetter, StdErrSetter {
-
- private final StringBuilder commandBuilder = new StringBuilder();
-
- @Override
- public StdOutSetter add(String cmd, String... args) {
- commandBuilder.append(cmd);
- for (String arg : args) {
- commandBuilder.append(' ').append(arg);
- }
- return this;
- }
-
- @Override
- public <R> ProcessController<R> launch() {
- if (credentials != null && !credentials.getAllTokens().isEmpty()) {
- for (Token<?> token : credentials.getAllTokens()) {
- LOG.info("Launch with delegation token {}", token);
- }
- launchContext.setCredentials(credentials);
- }
- launchContext.setCommands(commands);
- return doLaunch(launchContext);
- }
-
- @Override
- public MoreCommand redirectError(String stderr) {
- redirect(2, stderr);
- return noError();
- }
-
- @Override
- public MoreCommand noError() {
- commands.add(commandBuilder.toString());
- commandBuilder.setLength(0);
- return this;
- }
-
- @Override
- public StdErrSetter redirectOutput(String stdout) {
- redirect(1, stdout);
- return this;
- }
-
- @Override
- public StdErrSetter noOutput() {
- return this;
- }
-
- private void redirect(int type, String out) {
- commandBuilder.append(' ')
- .append(type).append('>')
- .append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append('/').append(out);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java b/yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
deleted file mode 100644
index 6f47b6c..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import com.google.common.base.Throwables;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- *
- */
-public final class VersionDetectYarnAMClientFactory implements YarnAMClientFactory {
-
- private final Configuration conf;
-
- public VersionDetectYarnAMClientFactory(Configuration conf) {
- this.conf = conf;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public YarnAMClient create() {
- try {
- Class<YarnAMClient> clz;
- if (YarnUtils.isHadoop20()) {
- // Uses hadoop-2.0 class
- String clzName = getClass().getPackage().getName() + ".Hadoop20YarnAMClient";
- clz = (Class<YarnAMClient>) Class.forName(clzName);
- } else {
- // Uses hadoop-2.1 class
- String clzName = getClass().getPackage().getName() + ".Hadoop21YarnAMClient";
- clz = (Class<YarnAMClient>) Class.forName(clzName);
- }
-
- return clz.getConstructor(Configuration.class).newInstance(conf);
-
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java b/yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
deleted file mode 100644
index f9db959..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import com.google.common.base.Throwables;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- *
- */
-public final class VersionDetectYarnAppClientFactory implements YarnAppClientFactory {
-
- @Override
- @SuppressWarnings("unchecked")
- public YarnAppClient create(Configuration configuration) {
- try {
- Class<YarnAppClient> clz;
-
- if (YarnUtils.isHadoop20()) {
- // Uses hadoop-2.0 class.
- String clzName = getClass().getPackage().getName() + ".Hadoop20YarnAppClient";
- clz = (Class<YarnAppClient>) Class.forName(clzName);
- } else {
- // Uses hadoop-2.1 class
- String clzName = getClass().getPackage().getName() + ".Hadoop21YarnAppClient";
- clz = (Class<YarnAppClient>) Class.forName(clzName);
- }
-
- return clz.getConstructor(Configuration.class).newInstance(configuration);
-
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
deleted file mode 100644
index 83ba6a8..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.twill.internal.ProcessLauncher;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Service;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.util.Records;
-
-import java.net.InetSocketAddress;
-import java.net.URL;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-/**
- * This interface provides abstraction for AM to interacts with YARN to abstract out YARN version specific
- * code, making multi-version compatibility easier.
- */
-public interface YarnAMClient extends Service {
-
- /**
- * Builder for creating a container request.
- */
- abstract class ContainerRequestBuilder {
-
- protected final Resource capability;
- protected final int count;
- protected final Set<String> hosts = Sets.newHashSet();
- protected final Set<String> racks = Sets.newHashSet();
- protected final Priority priority = Records.newRecord(Priority.class);
-
- protected ContainerRequestBuilder(Resource capability, int count) {
- this.capability = capability;
- this.count = count;
- }
-
- public ContainerRequestBuilder addHosts(String firstHost, String...moreHosts) {
- return add(hosts, firstHost, moreHosts);
- }
-
- public ContainerRequestBuilder addRacks(String firstRack, String...moreRacks) {
- return add(racks, firstRack, moreRacks);
- }
-
- public ContainerRequestBuilder setPriority(int prio) {
- priority.setPriority(prio);
- return this;
- }
-
- /**
- * Adds a container request. Returns an unique ID for the request.
- */
- public abstract String apply();
-
- private <T> ContainerRequestBuilder add(Collection<T> collection, T first, T... more) {
- collection.add(first);
- Collections.addAll(collection, more);
- return this;
- }
- }
-
- ContainerId getContainerId();
-
- String getHost();
-
- /**
- * Sets the tracker address and tracker url. This method should be called before calling {@link #start()}.
- */
- void setTracker(InetSocketAddress trackerAddr, URL trackerUrl);
-
- /**
- * Callback for allocate call.
- */
- // TODO: Move AM heartbeat logic into this interface so AM only needs to handle callback.
- interface AllocateHandler {
- void acquired(List<ProcessLauncher<YarnContainerInfo>> launchers);
-
- void completed(List<YarnContainerStatus> completed);
- }
-
- void allocate(float progress, AllocateHandler handler) throws Exception;
-
- ContainerRequestBuilder addContainerRequest(Resource capability);
-
- ContainerRequestBuilder addContainerRequest(Resource capability, int count);
-
- /**
- * Notify a container request is fulfilled.
- *
- * Note: This method is needed to workaround a seemingly bug from AMRMClient implementation in YARN that if
- * a container is requested after a previous container was acquired (with the same capability), multiple containers
- * will get allocated instead of one.
- *
- * @param id The ID returned by {@link YarnAMClient.ContainerRequestBuilder#apply()}.
- */
- void completeContainerRequest(String id);
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClientFactory.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClientFactory.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClientFactory.java
deleted file mode 100644
index b2a1194..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClientFactory.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-/**
- *
- */
-public interface YarnAMClientFactory {
-
- YarnAMClient create();
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
deleted file mode 100644
index 71a9e68..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.twill.api.TwillSpecification;
-import org.apache.twill.internal.ProcessController;
-import org.apache.twill.internal.ProcessLauncher;
-import com.google.common.util.concurrent.Service;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-
-/**
- * Interface for launching Yarn application from client.
- */
-public interface YarnAppClient extends Service {
-
- /**
- * Creates a {@link ProcessLauncher} for launching the application represented by the given spec.
- */
- ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec) throws Exception;
-
- /**
- * Creates a {@link ProcessLauncher} for launching application with the given user and spec.
- *
- * @deprecated This method will get removed.
- */
- @Deprecated
- ProcessLauncher<ApplicationId> createLauncher(String user, TwillSpecification twillSpec) throws Exception;
-
- ProcessController<YarnApplicationReport> createProcessController(ApplicationId appId);
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClientFactory.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClientFactory.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClientFactory.java
deleted file mode 100644
index 70cecad..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClientFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.hadoop.conf.Configuration;
-
-/**
- *
- */
-public interface YarnAppClientFactory {
-
- YarnAppClient create(Configuration configuration);
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/yarn/YarnApplicationReport.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnApplicationReport.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnApplicationReport.java
deleted file mode 100644
index 4dbb1d1..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnApplicationReport.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-
-/**
- * This interface is for adapting differences in ApplicationReport in different Hadoop version.
- */
-public interface YarnApplicationReport {
-
- /**
- * Get the <code>ApplicationId</code> of the application.
- * @return <code>ApplicationId</code> of the application
- */
- ApplicationId getApplicationId();
-
- /**
- * Get the <code>ApplicationAttemptId</code> of the current
- * attempt of the application
- * @return <code>ApplicationAttemptId</code> of the attempt
- */
- ApplicationAttemptId getCurrentApplicationAttemptId();
-
- /**
- * Get the <em>queue</em> to which the application was submitted.
- * @return <em>queue</em> to which the application was submitted
- */
- String getQueue();
-
- /**
- * Get the user-defined <em>name</em> of the application.
- * @return <em>name</em> of the application
- */
- String getName();
-
- /**
- * Get the <em>host</em> on which the <code>ApplicationMaster</code>
- * is running.
- * @return <em>host</em> on which the <code>ApplicationMaster</code>
- * is running
- */
- String getHost();
-
- /**
- * Get the <em>RPC port</em> of the <code>ApplicationMaster</code>.
- * @return <em>RPC port</em> of the <code>ApplicationMaster</code>
- */
- int getRpcPort();
-
-
- /**
- * Get the <code>YarnApplicationState</code> of the application.
- * @return <code>YarnApplicationState</code> of the application
- */
- YarnApplicationState getYarnApplicationState();
-
-
- /**
- * Get the <em>diagnositic information</em> of the application in case of
- * errors.
- * @return <em>diagnositic information</em> of the application in case
- * of errors
- */
- String getDiagnostics();
-
-
- /**
- * Get the <em>tracking url</em> for the application.
- * @return <em>tracking url</em> for the application
- */
- String getTrackingUrl();
-
-
- /**
- * Get the original not-proxied <em>tracking url</em> for the application.
- * This is intended to only be used by the proxy itself.
- * @return the original not-proxied <em>tracking url</em> for the application
- */
- String getOriginalTrackingUrl();
-
- /**
- * Get the <em>start time</em> of the application.
- * @return <em>start time</em> of the application
- */
- long getStartTime();
-
-
- /**
- * Get the <em>finish time</em> of the application.
- * @return <em>finish time</em> of the application
- */
- long getFinishTime();
-
-
- /**
- * Get the <em>final finish status</em> of the application.
- * @return <em>final finish status</em> of the application
- */
- FinalApplicationStatus getFinalApplicationStatus();
-
- /**
- * Retrieve the structure containing the job resources for this application
- * @return the job resources structure for this application
- */
- ApplicationResourceUsageReport getApplicationResourceUsageReport();
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerInfo.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerInfo.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerInfo.java
deleted file mode 100644
index e806da7..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerInfo.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.twill.internal.ContainerInfo;
-
-/**
- *
- */
-public interface YarnContainerInfo extends ContainerInfo {
-
- <T> T getContainer();
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerStatus.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerStatus.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerStatus.java
deleted file mode 100644
index 57e712c..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerStatus.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.hadoop.yarn.api.records.ContainerState;
-
-/**
- * This interface is for adapting differences in ContainerStatus between Hadoop 2.0 and 2.1
- */
-public interface YarnContainerStatus {
-
- String getContainerId();
-
- ContainerState getState();
-
- int getExitStatus();
-
- String getDiagnostics();
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/yarn/YarnLaunchContext.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnLaunchContext.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnLaunchContext.java
deleted file mode 100644
index 984a1be..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnLaunchContext.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This interface is for adapting ContainerLaunchContext in different Hadoop version
- */
-public interface YarnLaunchContext {
-
- <T> T getLaunchContext();
-
- void setCredentials(Credentials credentials);
-
- void setLocalResources(Map<String, YarnLocalResource> localResources);
-
- void setServiceData(Map<String, ByteBuffer> serviceData);
-
- Map<String, String> getEnvironment();
-
- void setEnvironment(Map<String, String> environment);
-
- List<String> getCommands();
-
- void setCommands(List<String> commands);
-
- void setApplicationACLs(Map<ApplicationAccessType, String> acls);
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/yarn/YarnLocalResource.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnLocalResource.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnLocalResource.java
deleted file mode 100644
index 9bfc224..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnLocalResource.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.URL;
-
-/**
- * A adapter interface for the LocalResource class/interface in different Hadoop version.
- */
-public interface YarnLocalResource {
-
- /**
- * Returns the actual LocalResource object in Yarn.
- */
- <T> T getLocalResource();
-
- /**
- * Get the <em>location</em> of the resource to be localized.
- * @return <em>location</em> of the resource to be localized
- */
- URL getResource();
-
- /**
- * Set <em>location</em> of the resource to be localized.
- * @param resource <em>location</em> of the resource to be localized
- */
- void setResource(URL resource);
-
- /**
- * Get the <em>size</em> of the resource to be localized.
- * @return <em>size</em> of the resource to be localized
- */
- long getSize();
-
- /**
- * Set the <em>size</em> of the resource to be localized.
- * @param size <em>size</em> of the resource to be localized
- */
- void setSize(long size);
-
- /**
- * Get the original <em>timestamp</em> of the resource to be localized, used
- * for verification.
- * @return <em>timestamp</em> of the resource to be localized
- */
- long getTimestamp();
-
- /**
- * Set the <em>timestamp</em> of the resource to be localized, used
- * for verification.
- * @param timestamp <em>timestamp</em> of the resource to be localized
- */
- void setTimestamp(long timestamp);
-
- /**
- * Get the <code>LocalResourceType</code> of the resource to be localized.
- * @return <code>LocalResourceType</code> of the resource to be localized
- */
- LocalResourceType getType();
-
- /**
- * Set the <code>LocalResourceType</code> of the resource to be localized.
- * @param type <code>LocalResourceType</code> of the resource to be localized
- */
- void setType(LocalResourceType type);
-
- /**
- * Get the <code>LocalResourceVisibility</code> of the resource to be
- * localized.
- * @return <code>LocalResourceVisibility</code> of the resource to be
- * localized
- */
- LocalResourceVisibility getVisibility();
-
- /**
- * Set the <code>LocalResourceVisibility</code> of the resource to be
- * localized.
- * @param visibility <code>LocalResourceVisibility</code> of the resource to be
- * localized
- */
- void setVisibility(LocalResourceVisibility visibility);
-
- /**
- * Get the <em>pattern</em> that should be used to extract entries from the
- * archive (only used when type is <code>PATTERN</code>).
- * @return <em>pattern</em> that should be used to extract entries from the
- * archive.
- */
- String getPattern();
-
- /**
- * Set the <em>pattern</em> that should be used to extract entries from the
- * archive (only used when type is <code>PATTERN</code>).
- * @param pattern <em>pattern</em> that should be used to extract entries
- * from the archive.
- */
- void setPattern(String pattern);
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/yarn/YarnNMClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnNMClient.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnNMClient.java
deleted file mode 100644
index d863c91..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnNMClient.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.twill.common.Cancellable;
-
-/**
- * Abstraction for dealing with API differences in different hadoop yarn version
- */
-public interface YarnNMClient {
-
- /**
- * Starts a process based on the given launch context.
- *
- * @param containerInfo The containerInfo that the new process will launch in.
- * @param launchContext Contains information about the process going to start.
- * @return A {@link Cancellable} that when {@link Cancellable#cancel()}} is invoked,
- * it will try to shutdown the process.
- *
- */
- Cancellable start(YarnContainerInfo containerInfo, YarnLaunchContext launchContext);
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
deleted file mode 100644
index 4f7597b..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.twill.api.LocalFile;
-import org.apache.twill.filesystem.ForwardingLocationFactory;
-import org.apache.twill.filesystem.HDFSLocationFactory;
-import org.apache.twill.filesystem.LocationFactory;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Collection of helper methods to simplify YARN calls.
- */
-public class YarnUtils {
-
- private static final Logger LOG = LoggerFactory.getLogger(YarnUtils.class);
- private static final AtomicReference<Boolean> HADOOP_20 = new AtomicReference<Boolean>();
-
- public static YarnLocalResource createLocalResource(LocalFile localFile) {
- Preconditions.checkArgument(localFile.getLastModified() >= 0, "Last modified time should be >= 0.");
- Preconditions.checkArgument(localFile.getSize() >= 0, "File size should be >= 0.");
-
- YarnLocalResource resource = createAdapter(YarnLocalResource.class);
- resource.setVisibility(LocalResourceVisibility.APPLICATION);
- resource.setResource(ConverterUtils.getYarnUrlFromURI(localFile.getURI()));
- resource.setTimestamp(localFile.getLastModified());
- resource.setSize(localFile.getSize());
- return setLocalResourceType(resource, localFile);
- }
-
- public static YarnLaunchContext createLaunchContext() {
- return createAdapter(YarnLaunchContext.class);
- }
-
- // temporary workaround since older versions of hadoop don't have the getVirtualCores method.
- public static int getVirtualCores(Resource resource) {
- try {
- Method getVirtualCores = Resource.class.getMethod("getVirtualCores");
- return (Integer) getVirtualCores.invoke(resource);
- } catch (Exception e) {
- return 0;
- }
- }
-
- /**
- * Temporary workaround since older versions of hadoop don't have the setCores method.
- *
- * @param resource
- * @param cores
- * @return true if virtual cores was set, false if not.
- */
- public static boolean setVirtualCores(Resource resource, int cores) {
- try {
- Method setVirtualCores = Resource.class.getMethod("setVirtualCores", int.class);
- setVirtualCores.invoke(resource, cores);
- } catch (Exception e) {
- // It's ok to ignore this exception, as it's using older version of API.
- return false;
- }
- return true;
- }
-
- /**
- * Creates {@link ApplicationId} from the given cluster timestamp and id.
- */
- public static ApplicationId createApplicationId(long timestamp, int id) {
- try {
- try {
- // For Hadoop-2.1
- Method method = ApplicationId.class.getMethod("newInstance", long.class, int.class);
- return (ApplicationId) method.invoke(null, timestamp, id);
- } catch (NoSuchMethodException e) {
- // Try with Hadoop-2.0 way
- ApplicationId appId = Records.newRecord(ApplicationId.class);
-
- Method setClusterTimestamp = ApplicationId.class.getMethod("setClusterTimestamp", long.class);
- Method setId = ApplicationId.class.getMethod("setId", int.class);
-
- setClusterTimestamp.invoke(appId, timestamp);
- setId.invoke(appId, id);
-
- return appId;
- }
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
- /**
- * Helper method to get delegation tokens for the given LocationFactory.
- * @param config The hadoop configuration.
- * @param locationFactory The LocationFactory for generating tokens.
- * @param credentials Credentials for storing tokens acquired.
- * @return List of delegation Tokens acquired.
- */
- public static List<Token<?>> addDelegationTokens(Configuration config,
- LocationFactory locationFactory,
- Credentials credentials) throws IOException {
- if (!UserGroupInformation.isSecurityEnabled()) {
- LOG.debug("Security is not enabled");
- return ImmutableList.of();
- }
-
- FileSystem fileSystem = getFileSystem(locationFactory);
-
- if (fileSystem == null) {
- LOG.debug("LocationFactory is not HDFS");
- return ImmutableList.of();
- }
-
- String renewer = getYarnTokenRenewer(config);
-
- Token<?>[] tokens = fileSystem.addDelegationTokens(renewer, credentials);
- return tokens == null ? ImmutableList.<Token<?>>of() : ImmutableList.copyOf(tokens);
- }
-
- public static ByteBuffer encodeCredentials(Credentials credentials) {
- try {
- DataOutputBuffer out = new DataOutputBuffer();
- credentials.writeTokenStorageToStream(out);
- return ByteBuffer.wrap(out.getData(), 0, out.getLength());
- } catch (IOException e) {
- // Shouldn't throw
- LOG.error("Failed to encode Credentials.", e);
- throw Throwables.propagate(e);
- }
- }
-
- /**
- * Decodes {@link Credentials} from the given buffer.
- * If the buffer is null or empty, it returns an empty Credentials.
- */
- public static Credentials decodeCredentials(ByteBuffer buffer) throws IOException {
- Credentials credentials = new Credentials();
- if (buffer != null && buffer.hasRemaining()) {
- DataInputByteBuffer in = new DataInputByteBuffer();
- in.reset(buffer);
- credentials.readTokenStorageStream(in);
- }
- return credentials;
- }
-
- public static String getYarnTokenRenewer(Configuration config) throws IOException {
- String rmHost = getRMAddress(config).getHostName();
- String renewer = SecurityUtil.getServerPrincipal(config.get(YarnConfiguration.RM_PRINCIPAL), rmHost);
-
- if (renewer == null || renewer.length() == 0) {
- throw new IOException("No Kerberos principal for Yarn RM to use as renewer");
- }
-
- return renewer;
- }
-
- public static InetSocketAddress getRMAddress(Configuration config) {
- return config.getSocketAddr(YarnConfiguration.RM_ADDRESS,
- YarnConfiguration.DEFAULT_RM_ADDRESS,
- YarnConfiguration.DEFAULT_RM_PORT);
- }
-
- /**
- * Returns true if Hadoop-2.0 classes are in the classpath.
- */
- public static boolean isHadoop20() {
- Boolean hadoop20 = HADOOP_20.get();
- if (hadoop20 != null) {
- return hadoop20;
- }
- try {
- Class.forName("org.apache.hadoop.yarn.client.api.NMClient");
- HADOOP_20.set(false);
- return false;
- } catch (ClassNotFoundException e) {
- HADOOP_20.set(true);
- return true;
- }
- }
-
- /**
- * Helper method to create adapter class for bridging between Hadoop 2.0 and 2.1
- */
- private static <T> T createAdapter(Class<T> clz) {
- String className = clz.getPackage().getName();
-
- if (isHadoop20()) {
- className += ".Hadoop20" + clz.getSimpleName();
- } else {
- className += ".Hadoop21" + clz.getSimpleName();
- }
-
- try {
- return (T) Class.forName(className).newInstance();
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
- private static YarnLocalResource setLocalResourceType(YarnLocalResource localResource, LocalFile localFile) {
- if (localFile.isArchive()) {
- if (localFile.getPattern() == null) {
- localResource.setType(LocalResourceType.ARCHIVE);
- } else {
- localResource.setType(LocalResourceType.PATTERN);
- localResource.setPattern(localFile.getPattern());
- }
- } else {
- localResource.setType(LocalResourceType.FILE);
- }
- return localResource;
- }
-
- private static <T> Map<String, T> transformResource(Map<String, YarnLocalResource> from) {
- return Maps.transformValues(from, new Function<YarnLocalResource, T>() {
- @Override
- public T apply(YarnLocalResource resource) {
- return resource.getLocalResource();
- }
- });
- }
-
- /**
- * Gets the Hadoop FileSystem from LocationFactory.
- */
- private static FileSystem getFileSystem(LocationFactory locationFactory) {
- if (locationFactory instanceof HDFSLocationFactory) {
- return ((HDFSLocationFactory) locationFactory).getFileSystem();
- }
- if (locationFactory instanceof ForwardingLocationFactory) {
- return getFileSystem(((ForwardingLocationFactory) locationFactory).getDelegate());
- }
- return null;
- }
-
- private YarnUtils() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/yarn/package-info.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/package-info.java b/yarn/src/main/java/org/apache/twill/internal/yarn/package-info.java
deleted file mode 100644
index d6ec9f7..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/yarn/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * This package contains class for interacting with Yarn.
- */
-package org.apache.twill.internal.yarn;
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/yarn/LocationSecureStoreUpdater.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/yarn/LocationSecureStoreUpdater.java b/yarn/src/main/java/org/apache/twill/yarn/LocationSecureStoreUpdater.java
deleted file mode 100644
index 4d20c9c..0000000
--- a/yarn/src/main/java/org/apache/twill/yarn/LocationSecureStoreUpdater.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.yarn;
-
-import org.apache.twill.api.RunId;
-import org.apache.twill.api.SecureStore;
-import org.apache.twill.api.SecureStoreUpdater;
-import org.apache.twill.filesystem.LocationFactory;
-import org.apache.twill.internal.yarn.YarnUtils;
-import com.google.common.base.Throwables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.Credentials;
-
-import java.io.IOException;
-
-/**
- * Package private class for updating location related secure store.
- */
-final class LocationSecureStoreUpdater implements SecureStoreUpdater {
-
- private final Configuration configuration;
- private final LocationFactory locationFactory;
-
- LocationSecureStoreUpdater(Configuration configuration, LocationFactory locationFactory) {
- this.configuration = configuration;
- this.locationFactory = locationFactory;
- }
-
- @Override
- public SecureStore update(String application, RunId runId) {
- try {
- Credentials credentials = new Credentials();
- YarnUtils.addDelegationTokens(configuration, locationFactory, credentials);
- return YarnSecureStore.create(credentials);
- } catch (IOException e) {
- throw Throwables.propagate(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java b/yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java
deleted file mode 100644
index 2974c3f..0000000
--- a/yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.yarn;
-
-import org.apache.twill.api.ResourceReport;
-import org.apache.twill.internal.json.ResourceReportAdapter;
-import com.google.common.base.Charsets;
-import com.google.common.io.Closeables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.net.URL;
-
-/**
- * Package private class to get {@link ResourceReport} from the application master.
- */
-final class ResourceReportClient {
- private static final Logger LOG = LoggerFactory.getLogger(ResourceReportClient.class);
-
- private final ResourceReportAdapter reportAdapter;
- private final URL resourceUrl;
-
- ResourceReportClient(URL resourceUrl) {
- this.resourceUrl = resourceUrl;
- this.reportAdapter = ResourceReportAdapter.create();
- }
-
- /**
- * Returns the resource usage of the application fetched from the resource endpoint URL.
- * @return A {@link ResourceReport} or {@code null} if failed to fetch the report.
- */
- public ResourceReport get() {
- try {
- Reader reader = new BufferedReader(new InputStreamReader(resourceUrl.openStream(), Charsets.UTF_8));
- try {
- return reportAdapter.fromJson(reader);
- } finally {
- Closeables.closeQuietly(reader);
- }
- } catch (Exception e) {
- LOG.error("Exception getting resource report from {}.", resourceUrl, e);
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/yarn/YarnSecureStore.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/yarn/YarnSecureStore.java b/yarn/src/main/java/org/apache/twill/yarn/YarnSecureStore.java
deleted file mode 100644
index e6f461a..0000000
--- a/yarn/src/main/java/org/apache/twill/yarn/YarnSecureStore.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.yarn;
-
-import org.apache.twill.api.SecureStore;
-import org.apache.hadoop.security.Credentials;
-
-/**
- * A {@link SecureStore} for hadoop credentials.
- */
-public final class YarnSecureStore implements SecureStore {
-
- private final Credentials credentials;
-
- public static SecureStore create(Credentials credentials) {
- return new YarnSecureStore(credentials);
- }
-
- private YarnSecureStore(Credentials credentials) {
- this.credentials = credentials;
- }
-
- @Override
- public Credentials getStore() {
- return credentials;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java b/yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
deleted file mode 100644
index 4c240fb..0000000
--- a/yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.yarn;
-
-import org.apache.twill.api.ResourceReport;
-import org.apache.twill.api.RunId;
-import org.apache.twill.api.TwillController;
-import org.apache.twill.api.logging.LogHandler;
-import org.apache.twill.internal.AbstractTwillController;
-import org.apache.twill.internal.Constants;
-import org.apache.twill.internal.ProcessController;
-import org.apache.twill.internal.appmaster.TrackerService;
-import org.apache.twill.internal.state.StateNode;
-import org.apache.twill.internal.state.SystemMessages;
-import org.apache.twill.internal.yarn.YarnApplicationReport;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.ZKClient;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.commons.lang.time.StopWatch;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URL;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A {@link org.apache.twill.api.TwillController} that controllers application running on Hadoop YARN.
- */
-final class YarnTwillController extends AbstractTwillController implements TwillController {
-
- private static final Logger LOG = LoggerFactory.getLogger(YarnTwillController.class);
-
- private final Callable<ProcessController<YarnApplicationReport>> startUp;
- private ProcessController<YarnApplicationReport> processController;
- private ResourceReportClient resourcesClient;
-
- /**
- * Creates an instance without any {@link LogHandler}.
- */
- YarnTwillController(RunId runId, ZKClient zkClient, Callable<ProcessController<YarnApplicationReport>> startUp) {
- this(runId, zkClient, ImmutableList.<LogHandler>of(), startUp);
- }
-
- YarnTwillController(RunId runId, ZKClient zkClient, Iterable<LogHandler> logHandlers,
- Callable<ProcessController<YarnApplicationReport>> startUp) {
- super(runId, zkClient, logHandlers);
- this.startUp = startUp;
- }
-
-
- /**
- * Sends a message to application to notify the secure store has be updated.
- */
- ListenableFuture<Void> secureStoreUpdated() {
- return sendMessage(SystemMessages.SECURE_STORE_UPDATED, null);
- }
-
- @Override
- protected void doStartUp() {
- super.doStartUp();
-
- // Submit and poll the status of the yarn application
- try {
- processController = startUp.call();
-
- YarnApplicationReport report = processController.getReport();
- LOG.debug("Application {} submit", report.getApplicationId());
-
- YarnApplicationState state = report.getYarnApplicationState();
- StopWatch stopWatch = new StopWatch();
- stopWatch.start();
- stopWatch.split();
- long maxTime = TimeUnit.MILLISECONDS.convert(Constants.APPLICATION_MAX_START_SECONDS, TimeUnit.SECONDS);
-
- LOG.info("Checking yarn application status");
- while (!hasRun(state) && stopWatch.getSplitTime() < maxTime) {
- report = processController.getReport();
- state = report.getYarnApplicationState();
- LOG.debug("Yarn application status: {}", state);
- TimeUnit.SECONDS.sleep(1);
- stopWatch.split();
- }
- LOG.info("Yarn application is in state {}", state);
- if (state != YarnApplicationState.RUNNING) {
- LOG.info("Yarn application is not in running state. Shutting down controller.",
- Constants.APPLICATION_MAX_START_SECONDS);
- forceShutDown();
- } else {
- try {
- URL resourceUrl = URI.create(String.format("http://%s:%d", report.getHost(), report.getRpcPort()))
- .resolve(TrackerService.PATH).toURL();
- resourcesClient = new ResourceReportClient(resourceUrl);
- } catch (IOException e) {
- resourcesClient = null;
- }
- }
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- protected void doShutDown() {
- if (processController == null) {
- LOG.warn("No process controller for application that is not submitted.");
- return;
- }
-
- // Wait for the stop message being processed
- try {
- Uninterruptibles.getUninterruptibly(getStopMessageFuture(),
- Constants.APPLICATION_MAX_STOP_SECONDS, TimeUnit.SECONDS);
- } catch (Exception e) {
- LOG.error("Failed to wait for stop message being processed.", e);
- // Kill the application through yarn
- kill();
- }
-
- // Poll application status from yarn
- try {
- StopWatch stopWatch = new StopWatch();
- stopWatch.start();
- stopWatch.split();
- long maxTime = TimeUnit.MILLISECONDS.convert(Constants.APPLICATION_MAX_STOP_SECONDS, TimeUnit.SECONDS);
-
- YarnApplicationReport report = processController.getReport();
- FinalApplicationStatus finalStatus = report.getFinalApplicationStatus();
- while (finalStatus == FinalApplicationStatus.UNDEFINED && stopWatch.getSplitTime() < maxTime) {
- LOG.debug("Yarn application final status for {} {}", report.getApplicationId(), finalStatus);
- TimeUnit.SECONDS.sleep(1);
- stopWatch.split();
- finalStatus = processController.getReport().getFinalApplicationStatus();
- }
- LOG.debug("Yarn application final status is {}", finalStatus);
-
- // Application not finished after max stop time, kill the application
- if (finalStatus == FinalApplicationStatus.UNDEFINED) {
- kill();
- }
- } catch (Exception e) {
- LOG.warn("Exception while waiting for application report: {}", e.getMessage(), e);
- kill();
- }
-
- super.doShutDown();
- }
-
- @Override
- public void kill() {
- if (processController != null) {
- YarnApplicationReport report = processController.getReport();
- LOG.info("Killing application {}", report.getApplicationId());
- processController.cancel();
- } else {
- LOG.warn("No process controller for application that is not submitted.");
- }
- }
-
- @Override
- protected void instanceNodeUpdated(NodeData nodeData) {
-
- }
-
- @Override
- protected void stateNodeUpdated(StateNode stateNode) {
-
- }
-
- private boolean hasRun(YarnApplicationState state) {
- switch (state) {
- case RUNNING:
- case FINISHED:
- case FAILED:
- case KILLED:
- return true;
- }
- return false;
- }
-
- @Override
- public ResourceReport getResourceReport() {
- // in case the user calls this before starting, return null
- return (resourcesClient == null) ? null : resourcesClient.get();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java b/yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java
deleted file mode 100644
index 11c2ae6..0000000
--- a/yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.yarn;
-
-import org.apache.twill.api.RunId;
-import org.apache.twill.api.logging.LogHandler;
-import org.apache.twill.internal.ProcessController;
-import org.apache.twill.internal.yarn.YarnApplicationReport;
-
-import java.util.concurrent.Callable;
-
-/**
- * Factory for creating {@link YarnTwillController}.
- */
-interface YarnTwillControllerFactory {
-
- YarnTwillController create(RunId runId, Iterable<LogHandler> logHandlers,
- Callable<ProcessController<YarnApplicationReport>> startUp);
-}