You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/12 22:59:56 UTC
[14/28] Making maven site works.
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java
new file mode 100644
index 0000000..b327b94
--- /dev/null
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ *
+ */
+public final class Hadoop20YarnLocalResource implements YarnLocalResource {
+
+ private final LocalResource localResource;
+
+ public Hadoop20YarnLocalResource() {
+ this.localResource = Records.newRecord(LocalResource.class);
+ }
+
+ @Override
+ public <T> T getLocalResource() {
+ return (T) localResource;
+ }
+
+ @Override
+ public URL getResource() {
+ return localResource.getResource();
+ }
+
+ @Override
+ public void setResource(URL resource) {
+ localResource.setResource(resource);
+ }
+
+ @Override
+ public long getSize() {
+ return localResource.getSize();
+ }
+
+ @Override
+ public void setSize(long size) {
+ localResource.setSize(size);
+ }
+
+ @Override
+ public long getTimestamp() {
+ return localResource.getTimestamp();
+ }
+
+ @Override
+ public void setTimestamp(long timestamp) {
+ localResource.setTimestamp(timestamp);
+ }
+
+ @Override
+ public LocalResourceType getType() {
+ return localResource.getType();
+ }
+
+ @Override
+ public void setType(LocalResourceType type) {
+ localResource.setType(type);
+ }
+
+ @Override
+ public LocalResourceVisibility getVisibility() {
+ return localResource.getVisibility();
+ }
+
+ @Override
+ public void setVisibility(LocalResourceVisibility visibility) {
+ localResource.setVisibility(visibility);
+ }
+
+ @Override
+ public String getPattern() {
+ return localResource.getPattern();
+ }
+
+ @Override
+ public void setPattern(String pattern) {
+ localResource.setPattern(pattern);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
new file mode 100644
index 0000000..98ecc67
--- /dev/null
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.common.Cancellable;
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+
+/**
+ *
+ */
+public final class Hadoop20YarnNMClient implements YarnNMClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Hadoop20YarnNMClient.class);
+
+ private final YarnRPC yarnRPC;
+ private final Configuration yarnConf;
+
+ public Hadoop20YarnNMClient(YarnRPC yarnRPC, Configuration yarnConf) {
+ this.yarnRPC = yarnRPC;
+ this.yarnConf = yarnConf;
+ }
+
+ @Override
+ public Cancellable start(YarnContainerInfo containerInfo, YarnLaunchContext launchContext) {
+ ContainerLaunchContext context = launchContext.getLaunchContext();
+ context.setUser(System.getProperty("user.name"));
+
+ Container container = containerInfo.getContainer();
+
+ context.setContainerId(container.getId());
+ context.setResource(container.getResource());
+
+ StartContainerRequest startRequest = Records.newRecord(StartContainerRequest.class);
+ startRequest.setContainerLaunchContext(context);
+
+ ContainerManager manager = connectContainerManager(container);
+ try {
+ manager.startContainer(startRequest);
+ return new ContainerTerminator(container, manager);
+ } catch (YarnRemoteException e) {
+ LOG.error("Error in launching process", e);
+ throw Throwables.propagate(e);
+ }
+
+ }
+
+ /**
+ * Helper to connect to container manager (node manager).
+ */
+ private ContainerManager connectContainerManager(Container container) {
+ String cmIpPortStr = String.format("%s:%d", container.getNodeId().getHost(), container.getNodeId().getPort());
+ InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
+ return ((ContainerManager) yarnRPC.getProxy(ContainerManager.class, cmAddress, yarnConf));
+ }
+
+ private static final class ContainerTerminator implements Cancellable {
+
+ private final Container container;
+ private final ContainerManager manager;
+
+ private ContainerTerminator(Container container, ContainerManager manager) {
+ this.container = container;
+ this.manager = manager;
+ }
+
+ @Override
+ public void cancel() {
+ LOG.info("Request to stop container {}.", container.getId());
+ StopContainerRequest stopRequest = Records.newRecord(StopContainerRequest.class);
+ stopRequest.setContainerId(container.getId());
+ try {
+ manager.stopContainer(stopRequest);
+ boolean completed = false;
+ while (!completed) {
+ GetContainerStatusRequest statusRequest = Records.newRecord(GetContainerStatusRequest.class);
+ statusRequest.setContainerId(container.getId());
+ GetContainerStatusResponse statusResponse = manager.getContainerStatus(statusRequest);
+ LOG.info("Container status: {} {}", statusResponse.getStatus(), statusResponse.getStatus().getDiagnostics());
+
+ completed = (statusResponse.getStatus().getState() == ContainerState.COMPLETE);
+ }
+ LOG.info("Container {} stopped.", container.getId());
+ } catch (YarnRemoteException e) {
+ LOG.error("Fail to stop container {}", container.getId(), e);
+ throw Throwables.propagate(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java
new file mode 100644
index 0000000..26b6fa2
--- /dev/null
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn.ports;
+
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.service.Service;
+
+/**
+ * Ported from Apache Hadoop YARN.
+ */
+public interface AMRMClient extends Service {
+
+ /**
+ * Value used to define no locality.
+ */
+ static final String ANY = "*";
+
+ /**
+ * Object to represent container request for resources.
+ * Resources may be localized to nodes and racks.
+ * Resources may be assigned priorities.
+ * Can ask for multiple containers of a given type.
+ */
+ public static class ContainerRequest {
+ Resource capability;
+ String[] hosts;
+ String[] racks;
+ Priority priority;
+ int containerCount;
+
+ public ContainerRequest(Resource capability, String[] hosts,
+ String[] racks, Priority priority, int containerCount) {
+ this.capability = capability;
+ this.hosts = (hosts != null ? hosts.clone() : null);
+ this.racks = (racks != null ? racks.clone() : null);
+ this.priority = priority;
+ this.containerCount = containerCount;
+ }
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Capability[").append(capability).append("]");
+ sb.append("Priority[").append(priority).append("]");
+ sb.append("ContainerCount[").append(containerCount).append("]");
+ return sb.toString();
+ }
+ }
+
+ /**
+ * Register the application master. This must be called before any
+ * other interaction
+ * @param appHostName Name of the host on which master is running
+ * @param appHostPort Port master is listening on
+ * @param appTrackingUrl URL at which the master info can be seen
+ * @return <code>RegisterApplicationMasterResponse</code>
+ * @throws org.apache.hadoop.yarn.exceptions.YarnRemoteException
+ */
+ public RegisterApplicationMasterResponse
+ registerApplicationMaster(String appHostName,
+ int appHostPort,
+ String appTrackingUrl)
+ throws YarnRemoteException;
+
+ /**
+ * Request additional containers and receive new container allocations.
+ * Requests made via <code>addContainerRequest</code> are sent to the
+ * <code>ResourceManager</code>. New containers assigned to the master are
+ * retrieved. Status of completed containers and node health updates are
+ * also retrieved.
+ * This also doubles as a heartbeat to the ResourceManager and must be
+ * made periodically.
+ * The call may not always return any new allocations of containers.
+ * App should not make concurrent allocate requests. May cause request loss.
+ * @param progressIndicator Indicates progress made by the master
+ * @return the response of the allocate request
+ * @throws YarnRemoteException
+ */
+ public AllocationResponse allocate(float progressIndicator)
+ throws YarnRemoteException;
+
+ /**
+ * Unregister the Application Master. This must be called in the end.
+ * @param appStatus Success/Failure status of the master
+ * @param appMessage Diagnostics message on failure
+ * @param appTrackingUrl New URL to get master info
+ * @throws YarnRemoteException
+ */
+ public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
+ String appMessage,
+ String appTrackingUrl)
+ throws YarnRemoteException;
+
+ /**
+ * Request containers for resources before calling <code>allocate</code>.
+ * @param req Resource request
+ */
+ public void addContainerRequest(ContainerRequest req);
+
+ /**
+ * Remove previous container request. The previous container request may have
+ * already been sent to the ResourceManager. So even after the remove request
+ * the app must be prepared to receive an allocation for the previous request
+ * even after the remove request
+ * @param req Resource request
+ */
+ public void removeContainerRequest(ContainerRequest req);
+
+ /**
+ * Release containers assigned by the Resource Manager. If the app cannot use
+ * the container or wants to give up the container then it can release it.
+ * The app needs to make new requests for the released resource capability if
+ * it still needs it. For example, if it released non-local resources
+ * @param containerId
+ */
+ public void releaseAssignedContainer(ContainerId containerId);
+
+ /**
+ * Get the currently available resources in the cluster.
+ * A valid value is available after a call to allocate has been made
+ * @return Currently available resources
+ */
+ public Resource getClusterAvailableResources();
+
+ /**
+ * Get the current number of nodes in the cluster.
+ * A valid values is available after a call to allocate has been made
+ * @return Current number of nodes in the cluster
+ */
+ public int getClusterNodeCount();
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClientImpl.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClientImpl.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClientImpl.java
new file mode 100644
index 0000000..c1bd75a
--- /dev/null
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClientImpl.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn.ports;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Ported from Apache Hadoop YARN.
+ */
+public final class AMRMClientImpl extends AbstractService implements AMRMClient {
+
+ private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class);
+
+ private final RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+
+ private int lastResponseId = 0;
+
+ protected AMRMProtocol rmClient;
+ protected final ApplicationAttemptId appAttemptId;
+ protected Resource clusterAvailableResources;
+ protected int clusterNodeCount;
+
+ //Key -> Priority
+ //Value -> Map
+ //Key->ResourceName (e.g., hostname, rackname, *)
+ //Value->Map
+ //Key->Resource Capability
+ //Value->ResourceRequest
+ protected final
+ Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
+ remoteRequestsTable =
+ new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
+
+ protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
+ new org.apache.hadoop.yarn.util.BuilderUtils.ResourceRequestComparator());
+ protected final Set<ContainerId> release = new TreeSet<ContainerId>();
+
+ public AMRMClientImpl(ApplicationAttemptId appAttemptId) {
+ super(AMRMClientImpl.class.getName());
+ this.appAttemptId = appAttemptId;
+ }
+
+ @Override
+ public synchronized void init(Configuration conf) {
+ super.init(conf);
+ }
+
+ @Override
+ public synchronized void start() {
+ final YarnConfiguration conf = new YarnConfiguration(getConfig());
+ final YarnRPC rpc = YarnRPC.create(conf);
+ final InetSocketAddress rmAddress = conf.getSocketAddr(
+ YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+
+ UserGroupInformation currentUser;
+ try {
+ currentUser = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ String tokenURLEncodedStr = System.getenv().get(
+ ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
+ Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
+
+ try {
+ token.decodeFromUrlString(tokenURLEncodedStr);
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+
+ SecurityUtil.setTokenService(token, rmAddress);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("AppMasterToken is " + token);
+ }
+ currentUser.addToken(token);
+ }
+
+ rmClient = currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
+ @Override
+ public AMRMProtocol run() {
+ return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress,
+ conf);
+ }
+ });
+ LOG.debug("Connecting to ResourceManager at " + rmAddress);
+ super.start();
+ }
+
+ @Override
+ public synchronized void stop() {
+ if (this.rmClient != null) {
+ RPC.stopProxy(this.rmClient);
+ }
+ super.stop();
+ }
+
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ String appHostName, int appHostPort, String appTrackingUrl)
+ throws YarnRemoteException {
+ // do this only once ???
+ RegisterApplicationMasterRequest request = recordFactory
+ .newRecordInstance(RegisterApplicationMasterRequest.class);
+ synchronized (this) {
+ request.setApplicationAttemptId(appAttemptId);
+ }
+ request.setHost(appHostName);
+ request.setRpcPort(appHostPort);
+ if (appTrackingUrl != null) {
+ request.setTrackingUrl(appTrackingUrl);
+ }
+ RegisterApplicationMasterResponse response = rmClient
+ .registerApplicationMaster(request);
+ return response;
+ }
+
+ @Override
+ public AllocationResponse allocate(float progressIndicator)
+ throws YarnRemoteException {
+ AllocateResponse allocateResponse = null;
+ ArrayList<ResourceRequest> askList = null;
+ ArrayList<ContainerId> releaseList = null;
+ AllocateRequest allocateRequest = null;
+
+ try {
+ synchronized (this) {
+ askList = new ArrayList<ResourceRequest>(ask);
+ releaseList = new ArrayList<ContainerId>(release);
+ // optimistically clear this collection assuming no RPC failure
+ ask.clear();
+ release.clear();
+ allocateRequest = BuilderUtils
+ .newAllocateRequest(appAttemptId, lastResponseId, progressIndicator,
+ askList, releaseList);
+ }
+
+ allocateResponse = rmClient.allocate(allocateRequest);
+ AllocationResponse response = AllocationResponses.create(allocateResponse);
+
+ synchronized (this) {
+ // update these on successful RPC
+ clusterNodeCount = allocateResponse.getNumClusterNodes();
+ lastResponseId = response.getResponseId();
+ clusterAvailableResources = response.getAvailableResources();
+ }
+
+ return response;
+ } finally {
+ // TODO how to differentiate remote YARN exception vs error in RPC
+ if (allocateResponse == null) {
+ // We hit an exception in allocate()
+ // Preserve ask and release for next call to allocate()
+ synchronized (this) {
+ release.addAll(releaseList);
+ // Requests could have been added or deleted during call to allocate.
+ // If requests were added/removed then there is nothing to do since
+ // the ResourceRequest object in ask would have the actual new value.
+ // If ask does not have this ResourceRequest then it was unchanged and
+ // so we can add the value back safely.
+ // This assumes that there will no concurrent calls to allocate() and
+ // so we don't have to worry about ask being changed in the
+ // synchronized block at the beginning of this method.
+ for (ResourceRequest oldAsk : askList) {
+ if (!ask.contains(oldAsk)) {
+ ask.add(oldAsk);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
+ String appMessage, String appTrackingUrl) throws YarnRemoteException {
+ FinishApplicationMasterRequest request = recordFactory
+ .newRecordInstance(FinishApplicationMasterRequest.class);
+ request.setAppAttemptId(appAttemptId);
+ request.setFinishApplicationStatus(appStatus);
+ if (appMessage != null) {
+ request.setDiagnostics(appMessage);
+ }
+ if (appTrackingUrl != null) {
+ request.setTrackingUrl(appTrackingUrl);
+ }
+ rmClient.finishApplicationMaster(request);
+ }
+
+ @Override
+ public synchronized void addContainerRequest(ContainerRequest req) {
+ // Create resource requests
+ if (req.hosts != null) {
+ for (String host : req.hosts) {
+ addResourceRequest(req.priority, host, req.capability, req.containerCount);
+ }
+ }
+
+ if (req.racks != null) {
+ for (String rack : req.racks) {
+ addResourceRequest(req.priority, rack, req.capability, req.containerCount);
+ }
+ }
+
+ // Off switch
+ addResourceRequest(req.priority, ANY, req.capability, req.containerCount);
+ }
+
+ @Override
+ public synchronized void removeContainerRequest(ContainerRequest req) {
+ // Update resource requests
+ if (req.hosts != null) {
+ for (String hostName : req.hosts) {
+ decResourceRequest(req.priority, hostName, req.capability, req.containerCount);
+ }
+ }
+
+ if (req.racks != null) {
+ for (String rack : req.racks) {
+ decResourceRequest(req.priority, rack, req.capability, req.containerCount);
+ }
+ }
+
+ decResourceRequest(req.priority, ANY, req.capability, req.containerCount);
+ }
+
+ @Override
+ public synchronized void releaseAssignedContainer(ContainerId containerId) {
+ release.add(containerId);
+ }
+
+ @Override
+ public synchronized Resource getClusterAvailableResources() {
+ return clusterAvailableResources;
+ }
+
+ @Override
+ public synchronized int getClusterNodeCount() {
+ return clusterNodeCount;
+ }
+
+ private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
+ // This code looks weird but is needed because of the following scenario.
+ // A ResourceRequest is removed from the remoteRequestTable. A 0 container
+ // request is added to 'ask' to notify the RM about not needing it any more.
+ // Before the call to allocate, the user now requests more containers. If
+ // the locations of the 0 size request and the new request are the same
+ // (with the difference being only container count), then the set comparator
+ // will consider both to be the same and not add the new request to ask. So
+ // we need to check for the "same" request being present and remove it and
+ // then add it back. The comparator is container count agnostic.
+ // This should happen only rarely but we do need to guard against it.
+ if (ask.contains(remoteRequest)) {
+ ask.remove(remoteRequest);
+ }
+ ask.add(remoteRequest);
+ }
+
+ private void addResourceRequest(Priority priority, String resourceName,
+ Resource capability, int containerCount) {
+ Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+ this.remoteRequestsTable.get(priority);
+ if (remoteRequests == null) {
+ remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
+ this.remoteRequestsTable.put(priority, remoteRequests);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added priority=" + priority);
+ }
+ }
+ Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+ if (reqMap == null) {
+ reqMap = new HashMap<Resource, ResourceRequest>();
+ remoteRequests.put(resourceName, reqMap);
+ }
+ ResourceRequest remoteRequest = reqMap.get(capability);
+ if (remoteRequest == null) {
+ remoteRequest = BuilderUtils.
+ newResourceRequest(priority, resourceName, capability, 0);
+ reqMap.put(capability, remoteRequest);
+ }
+
+ remoteRequest.setNumContainers(remoteRequest.getNumContainers() + containerCount);
+
+ // Note this down for next interaction with ResourceManager
+ addResourceRequestToAsk(remoteRequest);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("addResourceRequest:" + " applicationId="
+ + appAttemptId + " priority=" + priority.getPriority()
+ + " resourceName=" + resourceName + " numContainers="
+ + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+ }
+ }
+
+ private void decResourceRequest(Priority priority, String resourceName,
+ Resource capability, int containerCount) {
+ Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+ this.remoteRequestsTable.get(priority);
+
+ if (remoteRequests == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not decrementing resource as priority " + priority
+ + " is not present in request table");
+ }
+ return;
+ }
+
+ Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+ if (reqMap == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not decrementing resource as " + resourceName
+ + " is not present in request table");
+ }
+ return;
+ }
+ ResourceRequest remoteRequest = reqMap.get(capability);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("BEFORE decResourceRequest:" + " applicationId="
+ + appAttemptId + " priority=" + priority.getPriority()
+ + " resourceName=" + resourceName + " numContainers="
+ + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+ }
+
+ remoteRequest.
+ setNumContainers(remoteRequest.getNumContainers() - containerCount);
+ if (remoteRequest.getNumContainers() < 0) {
+ // guard against spurious removals
+ remoteRequest.setNumContainers(0);
+ }
+ // Send the ResourceRequest to RM even if is 0 because it needs to override
+ // a previously sent value. If ResourceRequest was not sent previously then
+ // sending 0 ought to be a no-op on RM.
+ addResourceRequestToAsk(remoteRequest);
+
+ // Delete entries from map if no longer needed.
+ if (remoteRequest.getNumContainers() == 0) {
+ reqMap.remove(capability);
+ if (reqMap.size() == 0) {
+ remoteRequests.remove(resourceName);
+ }
+ if (remoteRequests.size() == 0) {
+ remoteRequestsTable.remove(priority);
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.info("AFTER decResourceRequest:" + " applicationId="
+ + appAttemptId + " priority=" + priority.getPriority()
+ + " resourceName=" + resourceName + " numContainers="
+ + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponse.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponse.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponse.java
new file mode 100644
index 0000000..89734fc
--- /dev/null
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponse.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn.ports;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+import java.util.List;
+
+/**
+ * This interface is to abstract the differences in Vanilla Hadoop YARN 2.0 and CDH 4.4
+ */
+public interface AllocationResponse {
+
+ int getResponseId();
+
+ Resource getAvailableResources();
+
+ List<Container> getAllocatedContainers();
+
+ List<ContainerStatus> getCompletedContainersStatuses();
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponses.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponses.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponses.java
new file mode 100644
index 0000000..ea46c3b
--- /dev/null
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponses.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn.ports;
+
+import com.google.common.base.Throwables;
+import com.google.common.reflect.TypeToken;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+import java.util.List;
+
+/**
+ * Factory for building instance of {@link AllocationResponse} based on the response type.
+ */
+public final class AllocationResponses {
+
+ /**
+ * A hack for CDH 4.4.0, as the AllocateResponse class is being rewritten and diverted from YARN 2.0
+ */
+ private static final boolean IS_CDH_4_4;
+
+ static {
+ boolean result = false;
+ try {
+ try {
+ // See if it is standard YARN 2.0 AllocateResponse object.
+ AllocateResponse.class.getMethod("getAMResponse");
+ } catch (NoSuchMethodException e) {
+ // See if it is CDH 4.4 AllocateResponse object.
+ AllocationResponse.class.getMethod("getAllocatedContainers");
+ result = true;
+ }
+ } catch (Exception e) {
+ // Something very wrong in here, as it shouldn't arrive here.
+ e.printStackTrace();
+ throw Throwables.propagate(e);
+ }
+
+ IS_CDH_4_4 = result;
+ }
+
+ public static AllocationResponse create(Object response) {
+ if (IS_CDH_4_4) {
+ return new ReflectionAllocationResponse(response);
+ }
+
+ try {
+ Object amResponse = response.getClass().getMethod("getAMResponse").invoke(response);
+ return new ReflectionAllocationResponse(amResponse);
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ private static final class ReflectionAllocationResponse implements AllocationResponse {
+
+ private final Object response;
+
+ private ReflectionAllocationResponse(Object response) {
+ this.response = response;
+ }
+
+ @Override
+ public int getResponseId() {
+ return call("getResponseId", TypeToken.of(Integer.class));
+ }
+
+ @Override
+ public Resource getAvailableResources() {
+ return call("getAvailableResources", TypeToken.of(Resource.class));
+ }
+
+ @Override
+ public List<Container> getAllocatedContainers() {
+ return call("getAllocatedContainers", new TypeToken<List<Container>>() {});
+ }
+
+ @Override
+ public List<ContainerStatus> getCompletedContainersStatuses() {
+ return call("getCompletedContainersStatuses", new TypeToken<List<ContainerStatus>>() {});
+ }
+
+ private <T> T call(String methodName, TypeToken<T> resultType) {
+ try {
+ return (T) resultType.getRawType().cast(response.getClass().getMethod(methodName).invoke(response));
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+ }
+
+ private AllocationResponses() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
new file mode 100644
index 0000000..ce8f90f
--- /dev/null
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ *
+ */
+public final class Hadoop21YarnAMClient extends AbstractIdleService implements YarnAMClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Hadoop21YarnAMClient.class);
+
+ private static final Function<ContainerStatus, YarnContainerStatus> STATUS_TRANSFORM;
+
+ static {
+ STATUS_TRANSFORM = new Function<ContainerStatus, YarnContainerStatus>() {
+ @Override
+ public YarnContainerStatus apply(ContainerStatus status) {
+ return new Hadoop21YarnContainerStatus(status);
+ }
+ };
+ }
+
+ private final ContainerId containerId;
+ private final Multimap<String, AMRMClient.ContainerRequest> containerRequests;
+ private final AMRMClient<AMRMClient.ContainerRequest> amrmClient;
+ private final Hadoop21YarnNMClient nmClient;
+ private InetSocketAddress trackerAddr;
+ private URL trackerUrl;
+ private Resource maxCapability;
+
+ public Hadoop21YarnAMClient(Configuration conf) {
+ String masterContainerId = System.getenv().get(ApplicationConstants.Environment.CONTAINER_ID.name());
+ Preconditions.checkArgument(masterContainerId != null,
+ "Missing %s from environment", ApplicationConstants.Environment.CONTAINER_ID.name());
+ this.containerId = ConverterUtils.toContainerId(masterContainerId);
+ this.containerRequests = ArrayListMultimap.create();
+
+ this.amrmClient = AMRMClient.createAMRMClient();
+ this.amrmClient.init(conf);
+ this.nmClient = new Hadoop21YarnNMClient(conf);
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ Preconditions.checkNotNull(trackerAddr, "Tracker address not set.");
+ Preconditions.checkNotNull(trackerUrl, "Tracker URL not set.");
+
+ amrmClient.start();
+ RegisterApplicationMasterResponse response = amrmClient.registerApplicationMaster(trackerAddr.getHostName(),
+ trackerAddr.getPort(),
+ trackerUrl.toString());
+ maxCapability = response.getMaximumResourceCapability();
+ nmClient.startAndWait();
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ nmClient.stopAndWait();
+ amrmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, trackerUrl.toString());
+ amrmClient.stop();
+ }
+
+ @Override
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+
+ @Override
+ public String getHost() {
+ return System.getenv().get(ApplicationConstants.Environment.NM_HOST.name());
+ }
+
+ @Override
+ public void setTracker(InetSocketAddress trackerAddr, URL trackerUrl) {
+ this.trackerAddr = trackerAddr;
+ this.trackerUrl = trackerUrl;
+ }
+
+ @Override
+ public synchronized void allocate(float progress, AllocateHandler handler) throws Exception {
+ AllocateResponse allocateResponse = amrmClient.allocate(progress);
+ List<ProcessLauncher<YarnContainerInfo>> launchers
+ = Lists.newArrayListWithCapacity(allocateResponse.getAllocatedContainers().size());
+
+ for (Container container : allocateResponse.getAllocatedContainers()) {
+ launchers.add(new RunnableProcessLauncher(new Hadoop21YarnContainerInfo(container), nmClient));
+ }
+
+ if (!launchers.isEmpty()) {
+ handler.acquired(launchers);
+
+ // If no process has been launched through the given launcher, return the container.
+ for (ProcessLauncher<YarnContainerInfo> l : launchers) {
+ // This cast always works.
+ RunnableProcessLauncher launcher = (RunnableProcessLauncher) l;
+ if (!launcher.isLaunched()) {
+ Container container = launcher.getContainerInfo().getContainer();
+ LOG.info("Nothing to run in container, releasing it: {}", container);
+ amrmClient.releaseAssignedContainer(container.getId());
+ }
+ }
+ }
+
+ List<YarnContainerStatus> completed = ImmutableList.copyOf(
+ Iterables.transform(allocateResponse.getCompletedContainersStatuses(), STATUS_TRANSFORM));
+ if (!completed.isEmpty()) {
+ handler.completed(completed);
+ }
+ }
+
+ @Override
+ public ContainerRequestBuilder addContainerRequest(Resource capability) {
+ return addContainerRequest(capability, 1);
+ }
+
+ @Override
+ public ContainerRequestBuilder addContainerRequest(Resource capability, int count) {
+ return new ContainerRequestBuilder(adjustCapability(capability), count) {
+ @Override
+ public String apply() {
+ synchronized (Hadoop21YarnAMClient.this) {
+ String id = UUID.randomUUID().toString();
+
+ String[] hosts = this.hosts.isEmpty() ? null : this.hosts.toArray(new String[this.hosts.size()]);
+ String[] racks = this.racks.isEmpty() ? null : this.racks.toArray(new String[this.racks.size()]);
+
+ for (int i = 0; i < count; i++) {
+ AMRMClient.ContainerRequest request = new AMRMClient.ContainerRequest(capability, hosts, racks, priority);
+ containerRequests.put(id, request);
+ amrmClient.addContainerRequest(request);
+ }
+
+ return id;
+ }
+ }
+ };
+ }
+
+ @Override
+ public synchronized void completeContainerRequest(String id) {
+ for (AMRMClient.ContainerRequest request : containerRequests.removeAll(id)) {
+ amrmClient.removeContainerRequest(request);
+ }
+ }
+
+ private Resource adjustCapability(Resource resource) {
+ int cores = resource.getVirtualCores();
+ int updatedCores = Math.min(resource.getVirtualCores(), maxCapability.getVirtualCores());
+
+ if (cores != updatedCores) {
+ resource.setVirtualCores(updatedCores);
+ LOG.info("Adjust virtual cores requirement from {} to {}.", cores, updatedCores);
+ }
+
+ int updatedMemory = Math.min(resource.getMemory(), maxCapability.getMemory());
+ if (resource.getMemory() != updatedMemory) {
+ resource.setMemory(updatedMemory);
+ LOG.info("Adjust memory requirement from {} to {} MB.", resource.getMemory(), updatedMemory);
+ }
+
+ return resource;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
new file mode 100644
index 0000000..50b212d
--- /dev/null
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.appmaster.ApplicationMasterProcessLauncher;
+import org.apache.twill.internal.appmaster.ApplicationSubmitter;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public final class Hadoop21YarnAppClient extends AbstractIdleService implements YarnAppClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Hadoop21YarnAppClient.class);
+ private final YarnClient yarnClient;
+
+ public Hadoop21YarnAppClient(Configuration configuration) {
+ this.yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(configuration);
+ }
+
+ @Override
+ public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec) throws Exception {
+ // Request for new application
+ YarnClientApplication application = yarnClient.createApplication();
+ final GetNewApplicationResponse response = application.getNewApplicationResponse();
+ final ApplicationId appId = response.getApplicationId();
+
+ // Setup the context for application submission
+ final ApplicationSubmissionContext appSubmissionContext = application.getApplicationSubmissionContext();
+ appSubmissionContext.setApplicationId(appId);
+ appSubmissionContext.setApplicationName(twillSpec.getName());
+
+ ApplicationSubmitter submitter = new ApplicationSubmitter() {
+ @Override
+ public ProcessController<YarnApplicationReport> submit(YarnLaunchContext context, Resource capability) {
+ ContainerLaunchContext launchContext = context.getLaunchContext();
+
+ addRMToken(launchContext);
+ appSubmissionContext.setAMContainerSpec(launchContext);
+ appSubmissionContext.setResource(adjustMemory(response, capability));
+ appSubmissionContext.setMaxAppAttempts(2);
+
+ try {
+ yarnClient.submitApplication(appSubmissionContext);
+ return new ProcessControllerImpl(yarnClient, appId);
+ } catch (Exception e) {
+ LOG.error("Failed to submit application {}", appId, e);
+ throw Throwables.propagate(e);
+ }
+ }
+ };
+
+ return new ApplicationMasterProcessLauncher(appId, submitter);
+ }
+
+ private Resource adjustMemory(GetNewApplicationResponse response, Resource capability) {
+ int maxMemory = response.getMaximumResourceCapability().getMemory();
+ int updatedMemory = capability.getMemory();
+
+ if (updatedMemory > maxMemory) {
+ capability.setMemory(maxMemory);
+ }
+
+ return capability;
+ }
+
+ private void addRMToken(ContainerLaunchContext context) {
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ return;
+ }
+
+ try {
+ Credentials credentials = YarnUtils.decodeCredentials(context.getTokens());
+
+ Configuration config = yarnClient.getConfig();
+ Token<TokenIdentifier> token = ConverterUtils.convertFromYarn(
+ yarnClient.getRMDelegationToken(new Text(YarnUtils.getYarnTokenRenewer(config))),
+ YarnUtils.getRMAddress(config));
+
+ LOG.info("Added RM delegation token {}", token);
+ credentials.addToken(token.getService(), token);
+
+ context.setTokens(YarnUtils.encodeCredentials(credentials));
+
+ } catch (Exception e) {
+ LOG.error("Fails to create credentials.", e);
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public ProcessLauncher<ApplicationId> createLauncher(String user, TwillSpecification twillSpec) throws Exception {
+ // Ignore user
+ return createLauncher(twillSpec);
+ }
+
+ @Override
+ public ProcessController<YarnApplicationReport> createProcessController(ApplicationId appId) {
+ return new ProcessControllerImpl(yarnClient, appId);
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ yarnClient.start();
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ yarnClient.stop();
+ }
+
+ private static final class ProcessControllerImpl implements ProcessController<YarnApplicationReport> {
+ private final YarnClient yarnClient;
+ private final ApplicationId appId;
+
+ public ProcessControllerImpl(YarnClient yarnClient, ApplicationId appId) {
+ this.yarnClient = yarnClient;
+ this.appId = appId;
+ }
+
+ @Override
+ public YarnApplicationReport getReport() {
+ try {
+ return new Hadoop21YarnApplicationReport(yarnClient.getApplicationReport(appId));
+ } catch (Exception e) {
+ LOG.error("Failed to get application report {}", appId, e);
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ try {
+ yarnClient.killApplication(appId);
+ } catch (Exception e) {
+ LOG.error("Failed to kill application {}", appId, e);
+ throw Throwables.propagate(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnApplicationReport.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnApplicationReport.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnApplicationReport.java
new file mode 100644
index 0000000..6e614f5
--- /dev/null
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnApplicationReport.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+
+/**
+ *
+ */
+public final class Hadoop21YarnApplicationReport implements YarnApplicationReport {
+
+ private final ApplicationReport report;
+
+ public Hadoop21YarnApplicationReport(ApplicationReport report) {
+ this.report = report;
+ }
+
+ @Override
+ public ApplicationId getApplicationId() {
+ return report.getApplicationId();
+ }
+
+ @Override
+ public ApplicationAttemptId getCurrentApplicationAttemptId() {
+ return report.getCurrentApplicationAttemptId();
+ }
+
+ @Override
+ public String getQueue() {
+ return report.getQueue();
+ }
+
+ @Override
+ public String getName() {
+ return report.getName();
+ }
+
+ @Override
+ public String getHost() {
+ return report.getHost();
+ }
+
+ @Override
+ public int getRpcPort() {
+ return report.getRpcPort();
+ }
+
+ @Override
+ public YarnApplicationState getYarnApplicationState() {
+ return report.getYarnApplicationState();
+ }
+
+ @Override
+ public String getDiagnostics() {
+ return report.getDiagnostics();
+ }
+
+ @Override
+ public String getTrackingUrl() {
+ return report.getTrackingUrl();
+ }
+
+ @Override
+ public String getOriginalTrackingUrl() {
+ return report.getOriginalTrackingUrl();
+ }
+
+ @Override
+ public long getStartTime() {
+ return report.getStartTime();
+ }
+
+ @Override
+ public long getFinishTime() {
+ return report.getFinishTime();
+ }
+
+ @Override
+ public FinalApplicationStatus getFinalApplicationStatus() {
+ return report.getFinalApplicationStatus();
+ }
+
+ @Override
+ public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
+ return report.getApplicationResourceUsageReport();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java
new file mode 100644
index 0000000..86903c1
--- /dev/null
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import com.google.common.base.Throwables;
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ *
+ */
+public final class Hadoop21YarnContainerInfo implements YarnContainerInfo {
+
+ private final Container container;
+
+ public Hadoop21YarnContainerInfo(Container container) {
+ this.container = container;
+ }
+
+ @Override
+ public <T> T getContainer() {
+ return (T) container;
+ }
+
+ @Override
+ public String getId() {
+ return container.getId().toString();
+ }
+
+ @Override
+ public InetAddress getHost() {
+ try {
+ return InetAddress.getByName(container.getNodeId().getHost());
+ } catch (UnknownHostException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public int getPort() {
+ return container.getNodeId().getPort();
+ }
+
+ @Override
+ public int getMemoryMB() {
+ return container.getResource().getMemory();
+ }
+
+ @Override
+ public int getVirtualCores() {
+ return container.getResource().getVirtualCores();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerStatus.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerStatus.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerStatus.java
new file mode 100644
index 0000000..f5758c7
--- /dev/null
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerStatus.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+
+/**
+ *
+ */
+public final class Hadoop21YarnContainerStatus implements YarnContainerStatus {
+
+ private final ContainerStatus containerStatus;
+
+ public Hadoop21YarnContainerStatus(ContainerStatus containerStatus) {
+ this.containerStatus = containerStatus;
+ }
+
+ @Override
+ public String getContainerId() {
+ return containerStatus.getContainerId().toString();
+ }
+
+ @Override
+ public ContainerState getState() {
+ return containerStatus.getState();
+ }
+
+ @Override
+ public int getExitStatus() {
+ return containerStatus.getExitStatus();
+ }
+
+ @Override
+ public String getDiagnostics() {
+ return containerStatus.getDiagnostics();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLaunchContext.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLaunchContext.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLaunchContext.java
new file mode 100644
index 0000000..8621f93
--- /dev/null
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLaunchContext.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public final class Hadoop21YarnLaunchContext implements YarnLaunchContext {
+
+ private static final Function<YarnLocalResource, LocalResource> RESOURCE_TRANSFORM;
+
+ static {
+ // Creates transform function from YarnLocalResource -> LocalResource
+ RESOURCE_TRANSFORM = new Function<YarnLocalResource, LocalResource>() {
+ @Override
+ public LocalResource apply(YarnLocalResource input) {
+ return input.getLocalResource();
+ }
+ };
+ }
+
+ private final ContainerLaunchContext launchContext;
+
+ public Hadoop21YarnLaunchContext() {
+ launchContext = Records.newRecord(ContainerLaunchContext.class);
+ }
+
+ @Override
+ public <T> T getLaunchContext() {
+ return (T) launchContext;
+ }
+
+ @Override
+ public void setCredentials(Credentials credentials) {
+ launchContext.setTokens(YarnUtils.encodeCredentials(credentials));
+ }
+
+ @Override
+ public void setLocalResources(Map<String, YarnLocalResource> localResources) {
+ launchContext.setLocalResources(Maps.transformValues(localResources, RESOURCE_TRANSFORM));
+ }
+
+ @Override
+ public void setServiceData(Map<String, ByteBuffer> serviceData) {
+ launchContext.setServiceData(serviceData);
+ }
+
+ @Override
+ public Map<String, String> getEnvironment() {
+ return launchContext.getEnvironment();
+ }
+
+ @Override
+ public void setEnvironment(Map<String, String> environment) {
+ launchContext.setEnvironment(environment);
+ }
+
+ @Override
+ public List<String> getCommands() {
+ return launchContext.getCommands();
+ }
+
+ @Override
+ public void setCommands(List<String> commands) {
+ launchContext.setCommands(commands);
+ }
+
+ @Override
+ public void setApplicationACLs(Map<ApplicationAccessType, String> acls) {
+ launchContext.setApplicationACLs(acls);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLocalResource.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLocalResource.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLocalResource.java
new file mode 100644
index 0000000..3f756bd
--- /dev/null
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLocalResource.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ *
+ */
+public final class Hadoop21YarnLocalResource implements YarnLocalResource {
+
+ private final LocalResource localResource;
+
+ public Hadoop21YarnLocalResource() {
+ this.localResource = Records.newRecord(LocalResource.class);
+ }
+
+ @Override
+ public <T> T getLocalResource() {
+ return (T) localResource;
+ }
+
+ @Override
+ public URL getResource() {
+ return localResource.getResource();
+ }
+
+ @Override
+ public void setResource(URL resource) {
+ localResource.setResource(resource);
+ }
+
+ @Override
+ public long getSize() {
+ return localResource.getSize();
+ }
+
+ @Override
+ public void setSize(long size) {
+ localResource.setSize(size);
+ }
+
+ @Override
+ public long getTimestamp() {
+ return localResource.getTimestamp();
+ }
+
+ @Override
+ public void setTimestamp(long timestamp) {
+ localResource.setTimestamp(timestamp);
+ }
+
+ @Override
+ public LocalResourceType getType() {
+ return localResource.getType();
+ }
+
+ @Override
+ public void setType(LocalResourceType type) {
+ localResource.setType(type);
+ }
+
+ @Override
+ public LocalResourceVisibility getVisibility() {
+ return localResource.getVisibility();
+ }
+
+ @Override
+ public void setVisibility(LocalResourceVisibility visibility) {
+ localResource.setVisibility(visibility);
+ }
+
+ @Override
+ public String getPattern() {
+ return localResource.getPattern();
+ }
+
+ @Override
+ public void setPattern(String pattern) {
+ localResource.setPattern(pattern);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java
new file mode 100644
index 0000000..d3a6a80
--- /dev/null
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.common.Cancellable;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public final class Hadoop21YarnNMClient extends AbstractIdleService implements YarnNMClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Hadoop21YarnNMClient.class);
+
+ private final NMClient nmClient;
+
+ public Hadoop21YarnNMClient(Configuration configuration) {
+ this.nmClient = NMClient.createNMClient();
+ nmClient.init(configuration);
+ }
+
+ @Override
+ public Cancellable start(YarnContainerInfo containerInfo, YarnLaunchContext launchContext) {
+ try {
+ Container container = containerInfo.getContainer();
+ nmClient.startContainer(container, launchContext.<ContainerLaunchContext>getLaunchContext());
+ return new ContainerTerminator(container, nmClient);
+ } catch (Exception e) {
+ LOG.error("Error in launching process", e);
+ throw Throwables.propagate(e);
+ }
+
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ nmClient.start();
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ nmClient.stop();
+ }
+
+ private static final class ContainerTerminator implements Cancellable {
+
+ private final Container container;
+ private final NMClient nmClient;
+
+ private ContainerTerminator(Container container, NMClient nmClient) {
+ this.container = container;
+ this.nmClient = nmClient;
+ }
+
+ @Override
+ public void cancel() {
+ LOG.info("Request to stop container {}.", container.getId());
+
+ try {
+ nmClient.stopContainer(container.getId(), container.getNodeId());
+ boolean completed = false;
+ while (!completed) {
+ ContainerStatus status = nmClient.getContainerStatus(container.getId(), container.getNodeId());
+ LOG.info("Container status: {} {}", status, status.getDiagnostics());
+
+ completed = (status.getState() == ContainerState.COMPLETE);
+ }
+ LOG.info("Container {} stopped.", container.getId());
+ } catch (Exception e) {
+ LOG.error("Fail to stop container {}", container.getId(), e);
+ throw Throwables.propagate(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
new file mode 100644
index 0000000..b0eeb43
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.filesystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.UUID;
+
+/**
+ * A concrete implementation of {@link Location} for the HDFS filesystem.
+ */
+final class HDFSLocation implements Location {
+ private final FileSystem fs;
+ private final Path path;
+
+ /**
+ * Constructs a HDFSLocation.
+ *
+ * @param fs An instance of {@link FileSystem}
+ * @param path of the file.
+ */
+ HDFSLocation(FileSystem fs, Path path) {
+ this.fs = fs;
+ this.path = path;
+ }
+
+ /**
+ * Checks if this location exists on HDFS.
+ *
+ * @return true if found; false otherwise.
+ * @throws IOException
+ */
+ @Override
+ public boolean exists() throws IOException {
+ return fs.exists(path);
+ }
+
+ /**
+ * @return An {@link InputStream} for this location on HDFS.
+ * @throws IOException
+ */
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return fs.open(path);
+ }
+
+ /**
+ * @return An {@link OutputStream} for this location on HDFS.
+ * @throws IOException
+ */
+ @Override
+ public OutputStream getOutputStream() throws IOException {
+ return fs.create(path);
+ }
+
+ @Override
+ public OutputStream getOutputStream(String permission) throws IOException {
+ Configuration conf = fs.getConf();
+ return fs.create(path,
+ new FsPermission(permission),
+ true,
+ conf.getInt("io.file.buffer.size", 4096),
+ fs.getDefaultReplication(path),
+ fs.getDefaultBlockSize(path),
+ null);
+ }
+
+ /**
+ * Appends the child to the current {@link Location} on HDFS.
+ * <p>
+ * Returns a new instance of Location.
+ * </p>
+ *
+ * @param child to be appended to this location.
+ * @return A new instance of {@link Location}
+ * @throws IOException
+ */
+ @Override
+ public Location append(String child) throws IOException {
+ if (child.startsWith("/")) {
+ child = child.substring(1);
+ }
+ return new HDFSLocation(fs, new Path(URI.create(path.toUri() + "/" + child)));
+ }
+
+ @Override
+ public Location getTempFile(String suffix) throws IOException {
+ Path path = new Path(
+ URI.create(this.path.toUri() + "." + UUID.randomUUID() + (suffix == null ? TEMP_FILE_SUFFIX : suffix)));
+ return new HDFSLocation(fs, path);
+ }
+
+ /**
+ * @return Returns the name of the file or directory denoteed by this abstract pathname.
+ */
+ @Override
+ public String getName() {
+ return path.getName();
+ }
+
+ @Override
+ public boolean createNew() throws IOException {
+ return fs.createNewFile(path);
+ }
+
+ /**
+ * @return A {@link URI} for this location on HDFS.
+ */
+ @Override
+ public URI toURI() {
+ return path.toUri();
+ }
+
+ /**
+ * Deletes the file or directory denoted by this abstract pathname. If this
+ * pathname denotes a directory, then the directory must be empty in order
+ * to be deleted.
+ *
+ * @return true if and only if the file or directory is successfully deleted; false otherwise.
+ */
+ @Override
+ public boolean delete() throws IOException {
+ return fs.delete(path, false);
+ }
+
+ @Override
+ public boolean delete(boolean recursive) throws IOException {
+ return fs.delete(path, true);
+ }
+
+ @Override
+ public Location renameTo(Location destination) throws IOException {
+ // Destination will always be of the same type as this location.
+ if (fs instanceof DistributedFileSystem) {
+ ((DistributedFileSystem) fs).rename(path, ((HDFSLocation) destination).path, Options.Rename.OVERWRITE);
+ return new HDFSLocation(fs, new Path(destination.toURI()));
+ }
+
+ if (fs.rename(path, ((HDFSLocation) destination).path)) {
+ return new HDFSLocation(fs, new Path(destination.toURI()));
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Creates the directory named by this abstract pathname, including any necessary
+ * but nonexistent parent directories.
+ *
+ * @return true if and only if the renaming succeeded; false otherwise
+ */
+ @Override
+ public boolean mkdirs() throws IOException {
+ return fs.mkdirs(path);
+ }
+
+ /**
+ * @return Length of file.
+ */
+ @Override
+ public long length() throws IOException {
+ return fs.getFileStatus(path).getLen();
+ }
+
+ @Override
+ public long lastModified() throws IOException {
+ return fs.getFileStatus(path).getModificationTime();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java
new file mode 100644
index 0000000..fa79391
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.filesystem;
+
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * A {@link LocationFactory} that creates HDFS {@link Location}.
+ */
+public final class HDFSLocationFactory implements LocationFactory {
+
+ private final FileSystem fileSystem;
+ private final String pathBase;
+
+ public HDFSLocationFactory(Configuration configuration) {
+ this(getFileSystem(configuration));
+ }
+
+ public HDFSLocationFactory(Configuration configuration, String pathBase) {
+ this(getFileSystem(configuration), pathBase);
+ }
+
+ public HDFSLocationFactory(FileSystem fileSystem) {
+ this(fileSystem, "/");
+ }
+
+ public HDFSLocationFactory(FileSystem fileSystem, String pathBase) {
+ String base = pathBase.equals("/") ? "" : pathBase;
+ base = base.endsWith("/") ? base.substring(0, base.length() - 1) : base;
+
+ this.fileSystem = fileSystem;
+ this.pathBase = base;
+ }
+
+ @Override
+ public Location create(String path) {
+ if (path.startsWith("/")) {
+ path = path.substring(1);
+ }
+ return new HDFSLocation(fileSystem, new Path(fileSystem.getUri() + "/" + pathBase + "/" + path));
+ }
+
+ @Override
+ public Location create(URI uri) {
+ if (!uri.toString().startsWith(fileSystem.getUri().toString())) {
+ // It's a full URI
+ return new HDFSLocation(fileSystem, new Path(uri));
+ }
+ if (uri.isAbsolute()) {
+ return new HDFSLocation(fileSystem, new Path(fileSystem.getUri() + uri.getPath()));
+ }
+ return new HDFSLocation(fileSystem, new Path(fileSystem.getUri() + "/" + pathBase + "/" + uri.getPath()));
+ }
+
+ @Override
+ public Location getHomeLocation() {
+ return new HDFSLocation(fileSystem, fileSystem.getHomeDirectory());
+ }
+
+ /**
+ * Returns the underlying {@link FileSystem} object.
+ */
+ public FileSystem getFileSystem() {
+ return fileSystem;
+ }
+
+ private static FileSystem getFileSystem(Configuration configuration) {
+ try {
+ return FileSystem.get(configuration);
+ } catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/java/org/apache/twill/filesystem/package-info.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/filesystem/package-info.java b/twill-yarn/src/main/java/org/apache/twill/filesystem/package-info.java
new file mode 100644
index 0000000..2ca09fd
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/filesystem/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Contains HDFS location classes.
+ */
+package org.apache.twill.filesystem;