You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/13 23:27:23 UTC
[07/28] [TWILL-14] Bootstrapping for the site generation.
Reorganization of the source tree happens:
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClientImpl.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClientImpl.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClientImpl.java
deleted file mode 100644
index c1bd75a..0000000
--- a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClientImpl.java
+++ /dev/null
@@ -1,412 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn.ports;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.api.AMRMProtocol;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-/**
- * Ported from Apache Hadoop YARN.
- */
-public final class AMRMClientImpl extends AbstractService implements AMRMClient {
-
- private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class);
-
- private final RecordFactory recordFactory =
- RecordFactoryProvider.getRecordFactory(null);
-
- private int lastResponseId = 0;
-
- protected AMRMProtocol rmClient;
- protected final ApplicationAttemptId appAttemptId;
- protected Resource clusterAvailableResources;
- protected int clusterNodeCount;
-
- //Key -> Priority
- //Value -> Map
- //Key->ResourceName (e.g., hostname, rackname, *)
- //Value->Map
- //Key->Resource Capability
- //Value->ResourceRequest
- protected final
- Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
- remoteRequestsTable =
- new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
-
- protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
- new org.apache.hadoop.yarn.util.BuilderUtils.ResourceRequestComparator());
- protected final Set<ContainerId> release = new TreeSet<ContainerId>();
-
- public AMRMClientImpl(ApplicationAttemptId appAttemptId) {
- super(AMRMClientImpl.class.getName());
- this.appAttemptId = appAttemptId;
- }
-
- @Override
- public synchronized void init(Configuration conf) {
- super.init(conf);
- }
-
- @Override
- public synchronized void start() {
- final YarnConfiguration conf = new YarnConfiguration(getConfig());
- final YarnRPC rpc = YarnRPC.create(conf);
- final InetSocketAddress rmAddress = conf.getSocketAddr(
- YarnConfiguration.RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
-
- UserGroupInformation currentUser;
- try {
- currentUser = UserGroupInformation.getCurrentUser();
- } catch (IOException e) {
- throw new YarnException(e);
- }
-
- if (UserGroupInformation.isSecurityEnabled()) {
- String tokenURLEncodedStr = System.getenv().get(
- ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
- Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
-
- try {
- token.decodeFromUrlString(tokenURLEncodedStr);
- } catch (IOException e) {
- throw new YarnException(e);
- }
-
- SecurityUtil.setTokenService(token, rmAddress);
- if (LOG.isDebugEnabled()) {
- LOG.debug("AppMasterToken is " + token);
- }
- currentUser.addToken(token);
- }
-
- rmClient = currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
- @Override
- public AMRMProtocol run() {
- return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress,
- conf);
- }
- });
- LOG.debug("Connecting to ResourceManager at " + rmAddress);
- super.start();
- }
-
- @Override
- public synchronized void stop() {
- if (this.rmClient != null) {
- RPC.stopProxy(this.rmClient);
- }
- super.stop();
- }
-
- @Override
- public RegisterApplicationMasterResponse registerApplicationMaster(
- String appHostName, int appHostPort, String appTrackingUrl)
- throws YarnRemoteException {
- // do this only once ???
- RegisterApplicationMasterRequest request = recordFactory
- .newRecordInstance(RegisterApplicationMasterRequest.class);
- synchronized (this) {
- request.setApplicationAttemptId(appAttemptId);
- }
- request.setHost(appHostName);
- request.setRpcPort(appHostPort);
- if (appTrackingUrl != null) {
- request.setTrackingUrl(appTrackingUrl);
- }
- RegisterApplicationMasterResponse response = rmClient
- .registerApplicationMaster(request);
- return response;
- }
-
- @Override
- public AllocationResponse allocate(float progressIndicator)
- throws YarnRemoteException {
- AllocateResponse allocateResponse = null;
- ArrayList<ResourceRequest> askList = null;
- ArrayList<ContainerId> releaseList = null;
- AllocateRequest allocateRequest = null;
-
- try {
- synchronized (this) {
- askList = new ArrayList<ResourceRequest>(ask);
- releaseList = new ArrayList<ContainerId>(release);
- // optimistically clear this collection assuming no RPC failure
- ask.clear();
- release.clear();
- allocateRequest = BuilderUtils
- .newAllocateRequest(appAttemptId, lastResponseId, progressIndicator,
- askList, releaseList);
- }
-
- allocateResponse = rmClient.allocate(allocateRequest);
- AllocationResponse response = AllocationResponses.create(allocateResponse);
-
- synchronized (this) {
- // update these on successful RPC
- clusterNodeCount = allocateResponse.getNumClusterNodes();
- lastResponseId = response.getResponseId();
- clusterAvailableResources = response.getAvailableResources();
- }
-
- return response;
- } finally {
- // TODO how to differentiate remote YARN exception vs error in RPC
- if (allocateResponse == null) {
- // We hit an exception in allocate()
- // Preserve ask and release for next call to allocate()
- synchronized (this) {
- release.addAll(releaseList);
- // Requests could have been added or deleted during call to allocate.
- // If requests were added/removed then there is nothing to do since
- // the ResourceRequest object in ask would have the actual new value.
- // If ask does not have this ResourceRequest then it was unchanged and
- // so we can add the value back safely.
- // This assumes that there will no concurrent calls to allocate() and
- // so we don't have to worry about ask being changed in the
- // synchronized block at the beginning of this method.
- for (ResourceRequest oldAsk : askList) {
- if (!ask.contains(oldAsk)) {
- ask.add(oldAsk);
- }
- }
- }
- }
- }
- }
-
- @Override
- public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
- String appMessage, String appTrackingUrl) throws YarnRemoteException {
- FinishApplicationMasterRequest request = recordFactory
- .newRecordInstance(FinishApplicationMasterRequest.class);
- request.setAppAttemptId(appAttemptId);
- request.setFinishApplicationStatus(appStatus);
- if (appMessage != null) {
- request.setDiagnostics(appMessage);
- }
- if (appTrackingUrl != null) {
- request.setTrackingUrl(appTrackingUrl);
- }
- rmClient.finishApplicationMaster(request);
- }
-
- @Override
- public synchronized void addContainerRequest(ContainerRequest req) {
- // Create resource requests
- if (req.hosts != null) {
- for (String host : req.hosts) {
- addResourceRequest(req.priority, host, req.capability, req.containerCount);
- }
- }
-
- if (req.racks != null) {
- for (String rack : req.racks) {
- addResourceRequest(req.priority, rack, req.capability, req.containerCount);
- }
- }
-
- // Off switch
- addResourceRequest(req.priority, ANY, req.capability, req.containerCount);
- }
-
- @Override
- public synchronized void removeContainerRequest(ContainerRequest req) {
- // Update resource requests
- if (req.hosts != null) {
- for (String hostName : req.hosts) {
- decResourceRequest(req.priority, hostName, req.capability, req.containerCount);
- }
- }
-
- if (req.racks != null) {
- for (String rack : req.racks) {
- decResourceRequest(req.priority, rack, req.capability, req.containerCount);
- }
- }
-
- decResourceRequest(req.priority, ANY, req.capability, req.containerCount);
- }
-
- @Override
- public synchronized void releaseAssignedContainer(ContainerId containerId) {
- release.add(containerId);
- }
-
- @Override
- public synchronized Resource getClusterAvailableResources() {
- return clusterAvailableResources;
- }
-
- @Override
- public synchronized int getClusterNodeCount() {
- return clusterNodeCount;
- }
-
- private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
- // This code looks weird but is needed because of the following scenario.
- // A ResourceRequest is removed from the remoteRequestTable. A 0 container
- // request is added to 'ask' to notify the RM about not needing it any more.
- // Before the call to allocate, the user now requests more containers. If
- // the locations of the 0 size request and the new request are the same
- // (with the difference being only container count), then the set comparator
- // will consider both to be the same and not add the new request to ask. So
- // we need to check for the "same" request being present and remove it and
- // then add it back. The comparator is container count agnostic.
- // This should happen only rarely but we do need to guard against it.
- if (ask.contains(remoteRequest)) {
- ask.remove(remoteRequest);
- }
- ask.add(remoteRequest);
- }
-
- private void addResourceRequest(Priority priority, String resourceName,
- Resource capability, int containerCount) {
- Map<String, Map<Resource, ResourceRequest>> remoteRequests =
- this.remoteRequestsTable.get(priority);
- if (remoteRequests == null) {
- remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
- this.remoteRequestsTable.put(priority, remoteRequests);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Added priority=" + priority);
- }
- }
- Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
- if (reqMap == null) {
- reqMap = new HashMap<Resource, ResourceRequest>();
- remoteRequests.put(resourceName, reqMap);
- }
- ResourceRequest remoteRequest = reqMap.get(capability);
- if (remoteRequest == null) {
- remoteRequest = BuilderUtils.
- newResourceRequest(priority, resourceName, capability, 0);
- reqMap.put(capability, remoteRequest);
- }
-
- remoteRequest.setNumContainers(remoteRequest.getNumContainers() + containerCount);
-
- // Note this down for next interaction with ResourceManager
- addResourceRequestToAsk(remoteRequest);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("addResourceRequest:" + " applicationId="
- + appAttemptId + " priority=" + priority.getPriority()
- + " resourceName=" + resourceName + " numContainers="
- + remoteRequest.getNumContainers() + " #asks=" + ask.size());
- }
- }
-
- private void decResourceRequest(Priority priority, String resourceName,
- Resource capability, int containerCount) {
- Map<String, Map<Resource, ResourceRequest>> remoteRequests =
- this.remoteRequestsTable.get(priority);
-
- if (remoteRequests == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Not decrementing resource as priority " + priority
- + " is not present in request table");
- }
- return;
- }
-
- Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
- if (reqMap == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Not decrementing resource as " + resourceName
- + " is not present in request table");
- }
- return;
- }
- ResourceRequest remoteRequest = reqMap.get(capability);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("BEFORE decResourceRequest:" + " applicationId="
- + appAttemptId + " priority=" + priority.getPriority()
- + " resourceName=" + resourceName + " numContainers="
- + remoteRequest.getNumContainers() + " #asks=" + ask.size());
- }
-
- remoteRequest.
- setNumContainers(remoteRequest.getNumContainers() - containerCount);
- if (remoteRequest.getNumContainers() < 0) {
- // guard against spurious removals
- remoteRequest.setNumContainers(0);
- }
- // Send the ResourceRequest to RM even if is 0 because it needs to override
- // a previously sent value. If ResourceRequest was not sent previously then
- // sending 0 ought to be a no-op on RM.
- addResourceRequestToAsk(remoteRequest);
-
- // Delete entries from map if no longer needed.
- if (remoteRequest.getNumContainers() == 0) {
- reqMap.remove(capability);
- if (reqMap.size() == 0) {
- remoteRequests.remove(resourceName);
- }
- if (remoteRequests.size() == 0) {
- remoteRequestsTable.remove(priority);
- }
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.info("AFTER decResourceRequest:" + " applicationId="
- + appAttemptId + " priority=" + priority.getPriority()
- + " resourceName=" + resourceName + " numContainers="
- + remoteRequest.getNumContainers() + " #asks=" + ask.size());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponse.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponse.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponse.java
deleted file mode 100644
index 89734fc..0000000
--- a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponse.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn.ports;
-
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.Resource;
-
-import java.util.List;
-
-/**
- * This interface is to abstract the differences in Vanilla Hadoop YARN 2.0 and CDH 4.4
- */
-public interface AllocationResponse {
-
- int getResponseId();
-
- Resource getAvailableResources();
-
- List<Container> getAllocatedContainers();
-
- List<ContainerStatus> getCompletedContainersStatuses();
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponses.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponses.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponses.java
deleted file mode 100644
index ea46c3b..0000000
--- a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponses.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn.ports;
-
-import com.google.common.base.Throwables;
-import com.google.common.reflect.TypeToken;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.Resource;
-
-import java.util.List;
-
-/**
- * Factory for building instance of {@link AllocationResponse} based on the response type.
- */
-public final class AllocationResponses {
-
- /**
- * A hack for CDH 4.4.0, as the AllocateResponse class is being rewritten and diverted from YARN 2.0
- */
- private static final boolean IS_CDH_4_4;
-
- static {
- boolean result = false;
- try {
- try {
- // See if it is standard YARN 2.0 AllocateResponse object.
- AllocateResponse.class.getMethod("getAMResponse");
- } catch (NoSuchMethodException e) {
- // See if it is CDH 4.4 AllocateResponse object.
- AllocationResponse.class.getMethod("getAllocatedContainers");
- result = true;
- }
- } catch (Exception e) {
- // Something very wrong in here, as it shouldn't arrive here.
- e.printStackTrace();
- throw Throwables.propagate(e);
- }
-
- IS_CDH_4_4 = result;
- }
-
- public static AllocationResponse create(Object response) {
- if (IS_CDH_4_4) {
- return new ReflectionAllocationResponse(response);
- }
-
- try {
- Object amResponse = response.getClass().getMethod("getAMResponse").invoke(response);
- return new ReflectionAllocationResponse(amResponse);
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
- private static final class ReflectionAllocationResponse implements AllocationResponse {
-
- private final Object response;
-
- private ReflectionAllocationResponse(Object response) {
- this.response = response;
- }
-
- @Override
- public int getResponseId() {
- return call("getResponseId", TypeToken.of(Integer.class));
- }
-
- @Override
- public Resource getAvailableResources() {
- return call("getAvailableResources", TypeToken.of(Resource.class));
- }
-
- @Override
- public List<Container> getAllocatedContainers() {
- return call("getAllocatedContainers", new TypeToken<List<Container>>() {});
- }
-
- @Override
- public List<ContainerStatus> getCompletedContainersStatuses() {
- return call("getCompletedContainersStatuses", new TypeToken<List<ContainerStatus>>() {});
- }
-
- private <T> T call(String methodName, TypeToken<T> resultType) {
- try {
- return (T) resultType.getRawType().cast(response.getClass().getMethod(methodName).invoke(response));
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
- }
-
- private AllocationResponses() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
deleted file mode 100644
index ce8f90f..0000000
--- a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.twill.internal.ProcessLauncher;
-import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.util.concurrent.AbstractIdleService;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.net.URL;
-import java.util.List;
-import java.util.UUID;
-
-/**
- *
- */
-public final class Hadoop21YarnAMClient extends AbstractIdleService implements YarnAMClient {
-
- private static final Logger LOG = LoggerFactory.getLogger(Hadoop21YarnAMClient.class);
-
- private static final Function<ContainerStatus, YarnContainerStatus> STATUS_TRANSFORM;
-
- static {
- STATUS_TRANSFORM = new Function<ContainerStatus, YarnContainerStatus>() {
- @Override
- public YarnContainerStatus apply(ContainerStatus status) {
- return new Hadoop21YarnContainerStatus(status);
- }
- };
- }
-
- private final ContainerId containerId;
- private final Multimap<String, AMRMClient.ContainerRequest> containerRequests;
- private final AMRMClient<AMRMClient.ContainerRequest> amrmClient;
- private final Hadoop21YarnNMClient nmClient;
- private InetSocketAddress trackerAddr;
- private URL trackerUrl;
- private Resource maxCapability;
-
- public Hadoop21YarnAMClient(Configuration conf) {
- String masterContainerId = System.getenv().get(ApplicationConstants.Environment.CONTAINER_ID.name());
- Preconditions.checkArgument(masterContainerId != null,
- "Missing %s from environment", ApplicationConstants.Environment.CONTAINER_ID.name());
- this.containerId = ConverterUtils.toContainerId(masterContainerId);
- this.containerRequests = ArrayListMultimap.create();
-
- this.amrmClient = AMRMClient.createAMRMClient();
- this.amrmClient.init(conf);
- this.nmClient = new Hadoop21YarnNMClient(conf);
- }
-
- @Override
- protected void startUp() throws Exception {
- Preconditions.checkNotNull(trackerAddr, "Tracker address not set.");
- Preconditions.checkNotNull(trackerUrl, "Tracker URL not set.");
-
- amrmClient.start();
- RegisterApplicationMasterResponse response = amrmClient.registerApplicationMaster(trackerAddr.getHostName(),
- trackerAddr.getPort(),
- trackerUrl.toString());
- maxCapability = response.getMaximumResourceCapability();
- nmClient.startAndWait();
- }
-
- @Override
- protected void shutDown() throws Exception {
- nmClient.stopAndWait();
- amrmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, trackerUrl.toString());
- amrmClient.stop();
- }
-
- @Override
- public ContainerId getContainerId() {
- return containerId;
- }
-
- @Override
- public String getHost() {
- return System.getenv().get(ApplicationConstants.Environment.NM_HOST.name());
- }
-
- @Override
- public void setTracker(InetSocketAddress trackerAddr, URL trackerUrl) {
- this.trackerAddr = trackerAddr;
- this.trackerUrl = trackerUrl;
- }
-
- @Override
- public synchronized void allocate(float progress, AllocateHandler handler) throws Exception {
- AllocateResponse allocateResponse = amrmClient.allocate(progress);
- List<ProcessLauncher<YarnContainerInfo>> launchers
- = Lists.newArrayListWithCapacity(allocateResponse.getAllocatedContainers().size());
-
- for (Container container : allocateResponse.getAllocatedContainers()) {
- launchers.add(new RunnableProcessLauncher(new Hadoop21YarnContainerInfo(container), nmClient));
- }
-
- if (!launchers.isEmpty()) {
- handler.acquired(launchers);
-
- // If no process has been launched through the given launcher, return the container.
- for (ProcessLauncher<YarnContainerInfo> l : launchers) {
- // This cast always works.
- RunnableProcessLauncher launcher = (RunnableProcessLauncher) l;
- if (!launcher.isLaunched()) {
- Container container = launcher.getContainerInfo().getContainer();
- LOG.info("Nothing to run in container, releasing it: {}", container);
- amrmClient.releaseAssignedContainer(container.getId());
- }
- }
- }
-
- List<YarnContainerStatus> completed = ImmutableList.copyOf(
- Iterables.transform(allocateResponse.getCompletedContainersStatuses(), STATUS_TRANSFORM));
- if (!completed.isEmpty()) {
- handler.completed(completed);
- }
- }
-
- @Override
- public ContainerRequestBuilder addContainerRequest(Resource capability) {
- return addContainerRequest(capability, 1);
- }
-
- @Override
- public ContainerRequestBuilder addContainerRequest(Resource capability, int count) {
- return new ContainerRequestBuilder(adjustCapability(capability), count) {
- @Override
- public String apply() {
- synchronized (Hadoop21YarnAMClient.this) {
- String id = UUID.randomUUID().toString();
-
- String[] hosts = this.hosts.isEmpty() ? null : this.hosts.toArray(new String[this.hosts.size()]);
- String[] racks = this.racks.isEmpty() ? null : this.racks.toArray(new String[this.racks.size()]);
-
- for (int i = 0; i < count; i++) {
- AMRMClient.ContainerRequest request = new AMRMClient.ContainerRequest(capability, hosts, racks, priority);
- containerRequests.put(id, request);
- amrmClient.addContainerRequest(request);
- }
-
- return id;
- }
- }
- };
- }
-
- @Override
- public synchronized void completeContainerRequest(String id) {
- for (AMRMClient.ContainerRequest request : containerRequests.removeAll(id)) {
- amrmClient.removeContainerRequest(request);
- }
- }
-
- private Resource adjustCapability(Resource resource) {
- int cores = resource.getVirtualCores();
- int updatedCores = Math.min(resource.getVirtualCores(), maxCapability.getVirtualCores());
-
- if (cores != updatedCores) {
- resource.setVirtualCores(updatedCores);
- LOG.info("Adjust virtual cores requirement from {} to {}.", cores, updatedCores);
- }
-
- int updatedMemory = Math.min(resource.getMemory(), maxCapability.getMemory());
- if (resource.getMemory() != updatedMemory) {
- resource.setMemory(updatedMemory);
- LOG.info("Adjust memory requirement from {} to {} MB.", resource.getMemory(), updatedMemory);
- }
-
- return resource;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
deleted file mode 100644
index 50b212d..0000000
--- a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.twill.api.TwillSpecification;
-import org.apache.twill.internal.ProcessController;
-import org.apache.twill.internal.ProcessLauncher;
-import org.apache.twill.internal.appmaster.ApplicationMasterProcessLauncher;
-import org.apache.twill.internal.appmaster.ApplicationSubmitter;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.AbstractIdleService;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.client.api.YarnClientApplication;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- */
-public final class Hadoop21YarnAppClient extends AbstractIdleService implements YarnAppClient {
-
- private static final Logger LOG = LoggerFactory.getLogger(Hadoop21YarnAppClient.class);
- private final YarnClient yarnClient;
-
- public Hadoop21YarnAppClient(Configuration configuration) {
- this.yarnClient = YarnClient.createYarnClient();
- yarnClient.init(configuration);
- }
-
- @Override
- public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec) throws Exception {
- // Request for new application
- YarnClientApplication application = yarnClient.createApplication();
- final GetNewApplicationResponse response = application.getNewApplicationResponse();
- final ApplicationId appId = response.getApplicationId();
-
- // Setup the context for application submission
- final ApplicationSubmissionContext appSubmissionContext = application.getApplicationSubmissionContext();
- appSubmissionContext.setApplicationId(appId);
- appSubmissionContext.setApplicationName(twillSpec.getName());
-
- ApplicationSubmitter submitter = new ApplicationSubmitter() {
- @Override
- public ProcessController<YarnApplicationReport> submit(YarnLaunchContext context, Resource capability) {
- ContainerLaunchContext launchContext = context.getLaunchContext();
-
- addRMToken(launchContext);
- appSubmissionContext.setAMContainerSpec(launchContext);
- appSubmissionContext.setResource(adjustMemory(response, capability));
- appSubmissionContext.setMaxAppAttempts(2);
-
- try {
- yarnClient.submitApplication(appSubmissionContext);
- return new ProcessControllerImpl(yarnClient, appId);
- } catch (Exception e) {
- LOG.error("Failed to submit application {}", appId, e);
- throw Throwables.propagate(e);
- }
- }
- };
-
- return new ApplicationMasterProcessLauncher(appId, submitter);
- }
-
- private Resource adjustMemory(GetNewApplicationResponse response, Resource capability) {
- int maxMemory = response.getMaximumResourceCapability().getMemory();
- int updatedMemory = capability.getMemory();
-
- if (updatedMemory > maxMemory) {
- capability.setMemory(maxMemory);
- }
-
- return capability;
- }
-
- private void addRMToken(ContainerLaunchContext context) {
- if (!UserGroupInformation.isSecurityEnabled()) {
- return;
- }
-
- try {
- Credentials credentials = YarnUtils.decodeCredentials(context.getTokens());
-
- Configuration config = yarnClient.getConfig();
- Token<TokenIdentifier> token = ConverterUtils.convertFromYarn(
- yarnClient.getRMDelegationToken(new Text(YarnUtils.getYarnTokenRenewer(config))),
- YarnUtils.getRMAddress(config));
-
- LOG.info("Added RM delegation token {}", token);
- credentials.addToken(token.getService(), token);
-
- context.setTokens(YarnUtils.encodeCredentials(credentials));
-
- } catch (Exception e) {
- LOG.error("Fails to create credentials.", e);
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public ProcessLauncher<ApplicationId> createLauncher(String user, TwillSpecification twillSpec) throws Exception {
- // Ignore user
- return createLauncher(twillSpec);
- }
-
- @Override
- public ProcessController<YarnApplicationReport> createProcessController(ApplicationId appId) {
- return new ProcessControllerImpl(yarnClient, appId);
- }
-
- @Override
- protected void startUp() throws Exception {
- yarnClient.start();
- }
-
- @Override
- protected void shutDown() throws Exception {
- yarnClient.stop();
- }
-
- private static final class ProcessControllerImpl implements ProcessController<YarnApplicationReport> {
- private final YarnClient yarnClient;
- private final ApplicationId appId;
-
- public ProcessControllerImpl(YarnClient yarnClient, ApplicationId appId) {
- this.yarnClient = yarnClient;
- this.appId = appId;
- }
-
- @Override
- public YarnApplicationReport getReport() {
- try {
- return new Hadoop21YarnApplicationReport(yarnClient.getApplicationReport(appId));
- } catch (Exception e) {
- LOG.error("Failed to get application report {}", appId, e);
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public void cancel() {
- try {
- yarnClient.killApplication(appId);
- } catch (Exception e) {
- LOG.error("Failed to kill application {}", appId, e);
- throw Throwables.propagate(e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnApplicationReport.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnApplicationReport.java b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnApplicationReport.java
deleted file mode 100644
index 6e614f5..0000000
--- a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnApplicationReport.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-
-/**
- *
- */
-public final class Hadoop21YarnApplicationReport implements YarnApplicationReport {
-
- private final ApplicationReport report;
-
- public Hadoop21YarnApplicationReport(ApplicationReport report) {
- this.report = report;
- }
-
- @Override
- public ApplicationId getApplicationId() {
- return report.getApplicationId();
- }
-
- @Override
- public ApplicationAttemptId getCurrentApplicationAttemptId() {
- return report.getCurrentApplicationAttemptId();
- }
-
- @Override
- public String getQueue() {
- return report.getQueue();
- }
-
- @Override
- public String getName() {
- return report.getName();
- }
-
- @Override
- public String getHost() {
- return report.getHost();
- }
-
- @Override
- public int getRpcPort() {
- return report.getRpcPort();
- }
-
- @Override
- public YarnApplicationState getYarnApplicationState() {
- return report.getYarnApplicationState();
- }
-
- @Override
- public String getDiagnostics() {
- return report.getDiagnostics();
- }
-
- @Override
- public String getTrackingUrl() {
- return report.getTrackingUrl();
- }
-
- @Override
- public String getOriginalTrackingUrl() {
- return report.getOriginalTrackingUrl();
- }
-
- @Override
- public long getStartTime() {
- return report.getStartTime();
- }
-
- @Override
- public long getFinishTime() {
- return report.getFinishTime();
- }
-
- @Override
- public FinalApplicationStatus getFinalApplicationStatus() {
- return report.getFinalApplicationStatus();
- }
-
- @Override
- public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
- return report.getApplicationResourceUsageReport();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java
deleted file mode 100644
index 86903c1..0000000
--- a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import com.google.common.base.Throwables;
-import org.apache.hadoop.yarn.api.records.Container;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-/**
- *
- */
-public final class Hadoop21YarnContainerInfo implements YarnContainerInfo {
-
- private final Container container;
-
- public Hadoop21YarnContainerInfo(Container container) {
- this.container = container;
- }
-
- @Override
- public <T> T getContainer() {
- return (T) container;
- }
-
- @Override
- public String getId() {
- return container.getId().toString();
- }
-
- @Override
- public InetAddress getHost() {
- try {
- return InetAddress.getByName(container.getNodeId().getHost());
- } catch (UnknownHostException e) {
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public int getPort() {
- return container.getNodeId().getPort();
- }
-
- @Override
- public int getMemoryMB() {
- return container.getResource().getMemory();
- }
-
- @Override
- public int getVirtualCores() {
- return container.getResource().getVirtualCores();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerStatus.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerStatus.java b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerStatus.java
deleted file mode 100644
index f5758c7..0000000
--- a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerStatus.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-
-/**
- *
- */
-public final class Hadoop21YarnContainerStatus implements YarnContainerStatus {
-
- private final ContainerStatus containerStatus;
-
- public Hadoop21YarnContainerStatus(ContainerStatus containerStatus) {
- this.containerStatus = containerStatus;
- }
-
- @Override
- public String getContainerId() {
- return containerStatus.getContainerId().toString();
- }
-
- @Override
- public ContainerState getState() {
- return containerStatus.getState();
- }
-
- @Override
- public int getExitStatus() {
- return containerStatus.getExitStatus();
- }
-
- @Override
- public String getDiagnostics() {
- return containerStatus.getDiagnostics();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLaunchContext.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLaunchContext.java b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLaunchContext.java
deleted file mode 100644
index 8621f93..0000000
--- a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLaunchContext.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.util.Records;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-/**
- *
- */
-public final class Hadoop21YarnLaunchContext implements YarnLaunchContext {
-
- private static final Function<YarnLocalResource, LocalResource> RESOURCE_TRANSFORM;
-
- static {
- // Creates transform function from YarnLocalResource -> LocalResource
- RESOURCE_TRANSFORM = new Function<YarnLocalResource, LocalResource>() {
- @Override
- public LocalResource apply(YarnLocalResource input) {
- return input.getLocalResource();
- }
- };
- }
-
- private final ContainerLaunchContext launchContext;
-
- public Hadoop21YarnLaunchContext() {
- launchContext = Records.newRecord(ContainerLaunchContext.class);
- }
-
- @Override
- public <T> T getLaunchContext() {
- return (T) launchContext;
- }
-
- @Override
- public void setCredentials(Credentials credentials) {
- launchContext.setTokens(YarnUtils.encodeCredentials(credentials));
- }
-
- @Override
- public void setLocalResources(Map<String, YarnLocalResource> localResources) {
- launchContext.setLocalResources(Maps.transformValues(localResources, RESOURCE_TRANSFORM));
- }
-
- @Override
- public void setServiceData(Map<String, ByteBuffer> serviceData) {
- launchContext.setServiceData(serviceData);
- }
-
- @Override
- public Map<String, String> getEnvironment() {
- return launchContext.getEnvironment();
- }
-
- @Override
- public void setEnvironment(Map<String, String> environment) {
- launchContext.setEnvironment(environment);
- }
-
- @Override
- public List<String> getCommands() {
- return launchContext.getCommands();
- }
-
- @Override
- public void setCommands(List<String> commands) {
- launchContext.setCommands(commands);
- }
-
- @Override
- public void setApplicationACLs(Map<ApplicationAccessType, String> acls) {
- launchContext.setApplicationACLs(acls);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLocalResource.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLocalResource.java b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLocalResource.java
deleted file mode 100644
index 3f756bd..0000000
--- a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLocalResource.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.util.Records;
-
-/**
- *
- */
-public final class Hadoop21YarnLocalResource implements YarnLocalResource {
-
- private final LocalResource localResource;
-
- public Hadoop21YarnLocalResource() {
- this.localResource = Records.newRecord(LocalResource.class);
- }
-
- @Override
- public <T> T getLocalResource() {
- return (T) localResource;
- }
-
- @Override
- public URL getResource() {
- return localResource.getResource();
- }
-
- @Override
- public void setResource(URL resource) {
- localResource.setResource(resource);
- }
-
- @Override
- public long getSize() {
- return localResource.getSize();
- }
-
- @Override
- public void setSize(long size) {
- localResource.setSize(size);
- }
-
- @Override
- public long getTimestamp() {
- return localResource.getTimestamp();
- }
-
- @Override
- public void setTimestamp(long timestamp) {
- localResource.setTimestamp(timestamp);
- }
-
- @Override
- public LocalResourceType getType() {
- return localResource.getType();
- }
-
- @Override
- public void setType(LocalResourceType type) {
- localResource.setType(type);
- }
-
- @Override
- public LocalResourceVisibility getVisibility() {
- return localResource.getVisibility();
- }
-
- @Override
- public void setVisibility(LocalResourceVisibility visibility) {
- localResource.setVisibility(visibility);
- }
-
- @Override
- public String getPattern() {
- return localResource.getPattern();
- }
-
- @Override
- public void setPattern(String pattern) {
- localResource.setPattern(pattern);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java
deleted file mode 100644
index d3a6a80..0000000
--- a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.twill.common.Cancellable;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.AbstractIdleService;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.client.api.NMClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- */
-public final class Hadoop21YarnNMClient extends AbstractIdleService implements YarnNMClient {
-
- private static final Logger LOG = LoggerFactory.getLogger(Hadoop21YarnNMClient.class);
-
- private final NMClient nmClient;
-
- public Hadoop21YarnNMClient(Configuration configuration) {
- this.nmClient = NMClient.createNMClient();
- nmClient.init(configuration);
- }
-
- @Override
- public Cancellable start(YarnContainerInfo containerInfo, YarnLaunchContext launchContext) {
- try {
- Container container = containerInfo.getContainer();
- nmClient.startContainer(container, launchContext.<ContainerLaunchContext>getLaunchContext());
- return new ContainerTerminator(container, nmClient);
- } catch (Exception e) {
- LOG.error("Error in launching process", e);
- throw Throwables.propagate(e);
- }
-
- }
-
- @Override
- protected void startUp() throws Exception {
- nmClient.start();
- }
-
- @Override
- protected void shutDown() throws Exception {
- nmClient.stop();
- }
-
- private static final class ContainerTerminator implements Cancellable {
-
- private final Container container;
- private final NMClient nmClient;
-
- private ContainerTerminator(Container container, NMClient nmClient) {
- this.container = container;
- this.nmClient = nmClient;
- }
-
- @Override
- public void cancel() {
- LOG.info("Request to stop container {}.", container.getId());
-
- try {
- nmClient.stopContainer(container.getId(), container.getNodeId());
- boolean completed = false;
- while (!completed) {
- ContainerStatus status = nmClient.getContainerStatus(container.getId(), container.getNodeId());
- LOG.info("Container status: {} {}", status, status.getDiagnostics());
-
- completed = (status.getState() == ContainerState.COMPLETE);
- }
- LOG.info("Container {} stopped.", container.getId());
- } catch (Exception e) {
- LOG.error("Fail to stop container {}", container.getId(), e);
- throw Throwables.propagate(e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java b/yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
deleted file mode 100644
index b0eeb43..0000000
--- a/yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.filesystem;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URI;
-import java.util.UUID;
-
-/**
- * A concrete implementation of {@link Location} for the HDFS filesystem.
- */
-final class HDFSLocation implements Location {
- private final FileSystem fs;
- private final Path path;
-
- /**
- * Constructs a HDFSLocation.
- *
- * @param fs An instance of {@link FileSystem}
- * @param path of the file.
- */
- HDFSLocation(FileSystem fs, Path path) {
- this.fs = fs;
- this.path = path;
- }
-
- /**
- * Checks if this location exists on HDFS.
- *
- * @return true if found; false otherwise.
- * @throws IOException
- */
- @Override
- public boolean exists() throws IOException {
- return fs.exists(path);
- }
-
- /**
- * @return An {@link InputStream} for this location on HDFS.
- * @throws IOException
- */
- @Override
- public InputStream getInputStream() throws IOException {
- return fs.open(path);
- }
-
- /**
- * @return An {@link OutputStream} for this location on HDFS.
- * @throws IOException
- */
- @Override
- public OutputStream getOutputStream() throws IOException {
- return fs.create(path);
- }
-
- @Override
- public OutputStream getOutputStream(String permission) throws IOException {
- Configuration conf = fs.getConf();
- return fs.create(path,
- new FsPermission(permission),
- true,
- conf.getInt("io.file.buffer.size", 4096),
- fs.getDefaultReplication(path),
- fs.getDefaultBlockSize(path),
- null);
- }
-
- /**
- * Appends the child to the current {@link Location} on HDFS.
- * <p>
- * Returns a new instance of Location.
- * </p>
- *
- * @param child to be appended to this location.
- * @return A new instance of {@link Location}
- * @throws IOException
- */
- @Override
- public Location append(String child) throws IOException {
- if (child.startsWith("/")) {
- child = child.substring(1);
- }
- return new HDFSLocation(fs, new Path(URI.create(path.toUri() + "/" + child)));
- }
-
- @Override
- public Location getTempFile(String suffix) throws IOException {
- Path path = new Path(
- URI.create(this.path.toUri() + "." + UUID.randomUUID() + (suffix == null ? TEMP_FILE_SUFFIX : suffix)));
- return new HDFSLocation(fs, path);
- }
-
- /**
- * @return Returns the name of the file or directory denoteed by this abstract pathname.
- */
- @Override
- public String getName() {
- return path.getName();
- }
-
- @Override
- public boolean createNew() throws IOException {
- return fs.createNewFile(path);
- }
-
- /**
- * @return A {@link URI} for this location on HDFS.
- */
- @Override
- public URI toURI() {
- return path.toUri();
- }
-
- /**
- * Deletes the file or directory denoted by this abstract pathname. If this
- * pathname denotes a directory, then the directory must be empty in order
- * to be deleted.
- *
- * @return true if and only if the file or directory is successfully deleted; false otherwise.
- */
- @Override
- public boolean delete() throws IOException {
- return fs.delete(path, false);
- }
-
- @Override
- public boolean delete(boolean recursive) throws IOException {
- return fs.delete(path, true);
- }
-
- @Override
- public Location renameTo(Location destination) throws IOException {
- // Destination will always be of the same type as this location.
- if (fs instanceof DistributedFileSystem) {
- ((DistributedFileSystem) fs).rename(path, ((HDFSLocation) destination).path, Options.Rename.OVERWRITE);
- return new HDFSLocation(fs, new Path(destination.toURI()));
- }
-
- if (fs.rename(path, ((HDFSLocation) destination).path)) {
- return new HDFSLocation(fs, new Path(destination.toURI()));
- } else {
- return null;
- }
- }
-
- /**
- * Creates the directory named by this abstract pathname, including any necessary
- * but nonexistent parent directories.
- *
- * @return true if and only if the renaming succeeded; false otherwise
- */
- @Override
- public boolean mkdirs() throws IOException {
- return fs.mkdirs(path);
- }
-
- /**
- * @return Length of file.
- */
- @Override
- public long length() throws IOException {
- return fs.getFileStatus(path).getLen();
- }
-
- @Override
- public long lastModified() throws IOException {
- return fs.getFileStatus(path).getModificationTime();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java b/yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java
deleted file mode 100644
index fa79391..0000000
--- a/yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.filesystem;
-
-import com.google.common.base.Throwables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.net.URI;
-
-/**
- * A {@link LocationFactory} that creates HDFS {@link Location}.
- */
-public final class HDFSLocationFactory implements LocationFactory {
-
- private final FileSystem fileSystem;
- private final String pathBase;
-
- public HDFSLocationFactory(Configuration configuration) {
- this(getFileSystem(configuration));
- }
-
- public HDFSLocationFactory(Configuration configuration, String pathBase) {
- this(getFileSystem(configuration), pathBase);
- }
-
- public HDFSLocationFactory(FileSystem fileSystem) {
- this(fileSystem, "/");
- }
-
- public HDFSLocationFactory(FileSystem fileSystem, String pathBase) {
- String base = pathBase.equals("/") ? "" : pathBase;
- base = base.endsWith("/") ? base.substring(0, base.length() - 1) : base;
-
- this.fileSystem = fileSystem;
- this.pathBase = base;
- }
-
- @Override
- public Location create(String path) {
- if (path.startsWith("/")) {
- path = path.substring(1);
- }
- return new HDFSLocation(fileSystem, new Path(fileSystem.getUri() + "/" + pathBase + "/" + path));
- }
-
- @Override
- public Location create(URI uri) {
- if (!uri.toString().startsWith(fileSystem.getUri().toString())) {
- // It's a full URI
- return new HDFSLocation(fileSystem, new Path(uri));
- }
- if (uri.isAbsolute()) {
- return new HDFSLocation(fileSystem, new Path(fileSystem.getUri() + uri.getPath()));
- }
- return new HDFSLocation(fileSystem, new Path(fileSystem.getUri() + "/" + pathBase + "/" + uri.getPath()));
- }
-
- @Override
- public Location getHomeLocation() {
- return new HDFSLocation(fileSystem, fileSystem.getHomeDirectory());
- }
-
- /**
- * Returns the underlying {@link FileSystem} object.
- */
- public FileSystem getFileSystem() {
- return fileSystem;
- }
-
- private static FileSystem getFileSystem(Configuration configuration) {
- try {
- return FileSystem.get(configuration);
- } catch (IOException e) {
- throw Throwables.propagate(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/java/org/apache/twill/filesystem/package-info.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/filesystem/package-info.java b/yarn/src/main/java/org/apache/twill/filesystem/package-info.java
deleted file mode 100644
index 2ca09fd..0000000
--- a/yarn/src/main/java/org/apache/twill/filesystem/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Contains HDFS location classes.
- */
-package org.apache.twill.filesystem;
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/java/org/apache/twill/internal/AbstractTwillService.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/yarn/src/main/java/org/apache/twill/internal/AbstractTwillService.java
deleted file mode 100644
index 47dd07c..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/AbstractTwillService.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.filesystem.Location;
-import org.apache.twill.internal.state.Message;
-import org.apache.twill.internal.state.SystemMessages;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Service;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.concurrent.Executor;
-
-/**
- * A base implementation of {@link Service} handle secure token update.
- */
-public abstract class AbstractTwillService implements Service {
-
- private static final Logger LOG = LoggerFactory.getLogger(AbstractTwillService.class);
-
- protected final Location applicationLocation;
-
- protected volatile Credentials credentials;
-
- protected AbstractTwillService(Location applicationLocation) {
- this.applicationLocation = applicationLocation;
- }
-
- protected abstract Service getServiceDelegate();
-
- /**
- * Returns the location of the secure store, or {@code null} if either not running in secure mode or an error
- * occur when trying to acquire the location.
- */
- protected final Location getSecureStoreLocation() {
- if (!UserGroupInformation.isSecurityEnabled()) {
- return null;
- }
- try {
- return applicationLocation.append(Constants.Files.CREDENTIALS);
- } catch (IOException e) {
- LOG.error("Failed to create secure store location.", e);
- return null;
- }
- }
-
- /**
- * Attempts to handle secure store update.
- *
- * @param message The message received
- * @return {@code true} if the message requests for secure store update, {@code false} otherwise.
- */
- protected final boolean handleSecureStoreUpdate(Message message) {
- if (!SystemMessages.SECURE_STORE_UPDATED.equals(message)) {
- return false;
- }
-
- // If not in secure mode, simply ignore the message.
- if (!UserGroupInformation.isSecurityEnabled()) {
- return true;
- }
-
- try {
- Credentials credentials = new Credentials();
- Location location = getSecureStoreLocation();
- DataInputStream input = new DataInputStream(new BufferedInputStream(location.getInputStream()));
- try {
- credentials.readTokenStorageStream(input);
- } finally {
- input.close();
- }
-
- UserGroupInformation.getCurrentUser().addCredentials(credentials);
- this.credentials = credentials;
-
- LOG.info("Secure store updated from {}.", location.toURI());
-
- } catch (Throwable t) {
- LOG.error("Failed to update secure store.", t);
- }
-
- return true;
- }
-
- @Override
- public final ListenableFuture<State> start() {
- return getServiceDelegate().start();
- }
-
- @Override
- public final State startAndWait() {
- return Futures.getUnchecked(start());
- }
-
- @Override
- public final boolean isRunning() {
- return getServiceDelegate().isRunning();
- }
-
- @Override
- public final State state() {
- return getServiceDelegate().state();
- }
-
- @Override
- public final ListenableFuture<State> stop() {
- return getServiceDelegate().stop();
- }
-
- @Override
- public final State stopAndWait() {
- return Futures.getUnchecked(stop());
- }
-
- @Override
- public final void addListener(Listener listener, Executor executor) {
- getServiceDelegate().addListener(listener, executor);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
deleted file mode 100644
index 4ffb023..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.common.Services;
-import org.apache.twill.filesystem.HDFSLocationFactory;
-import org.apache.twill.filesystem.LocalLocationFactory;
-import org.apache.twill.filesystem.Location;
-import org.apache.twill.internal.logging.KafkaAppender;
-import org.apache.twill.zookeeper.ZKClientService;
-import ch.qos.logback.classic.LoggerContext;
-import ch.qos.logback.classic.joran.JoranConfigurator;
-import ch.qos.logback.classic.util.ContextInitializer;
-import ch.qos.logback.core.joran.spi.JoranException;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Service;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.ILoggerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.xml.sax.InputSource;
-
-import java.io.File;
-import java.io.StringReader;
-import java.net.URI;
-import java.util.concurrent.ExecutionException;
-
-/**
- * Class for main method that starts a service.
- */
-public abstract class ServiceMain {
-
- private static final Logger LOG = LoggerFactory.getLogger(ServiceMain.class);
-
- static {
- // This is to work around detection of HADOOP_HOME (HADOOP-9422)
- if (!System.getenv().containsKey("HADOOP_HOME") && System.getProperty("hadoop.home.dir") == null) {
- System.setProperty("hadoop.home.dir", new File("").getAbsolutePath());
- }
- }
-
- protected final void doMain(final ZKClientService zkClientService,
- final Service service) throws ExecutionException, InterruptedException {
- configureLogger();
-
- final String serviceName = service.toString();
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- Services.chainStop(service, zkClientService);
- }
- });
-
- // Listener for state changes of the service
- ListenableFuture<Service.State> completion = Services.getCompletionFuture(service);
-
- // Starts the service
- LOG.info("Starting service {}.", serviceName);
- Futures.getUnchecked(Services.chainStart(zkClientService, service));
- LOG.info("Service {} started.", serviceName);
- try {
- completion.get();
- LOG.info("Service {} completed.", serviceName);
- } catch (Throwable t) {
- LOG.warn("Exception thrown from service {}.", serviceName, t);
- throw Throwables.propagate(t);
- } finally {
- ILoggerFactory loggerFactory = LoggerFactory.getILoggerFactory();
- if (loggerFactory instanceof LoggerContext) {
- ((LoggerContext) loggerFactory).stop();
- }
- }
- }
-
- protected abstract String getHostname();
-
- protected abstract String getKafkaZKConnect();
-
- /**
- * Returns the {@link Location} for the application based on the env {@link EnvKeys#TWILL_APP_DIR}.
- */
- protected static Location createAppLocation(Configuration conf) {
- // Note: It's a little bit hacky based on the uri schema to create the LocationFactory, refactor it later.
- URI appDir = URI.create(System.getenv(EnvKeys.TWILL_APP_DIR));
-
- try {
- if ("file".equals(appDir.getScheme())) {
- return new LocalLocationFactory().create(appDir);
- }
-
- if ("hdfs".equals(appDir.getScheme())) {
- if (UserGroupInformation.isSecurityEnabled()) {
- return new HDFSLocationFactory(FileSystem.get(conf)).create(appDir);
- }
-
- String fsUser = System.getenv(EnvKeys.TWILL_FS_USER);
- if (fsUser == null) {
- throw new IllegalStateException("Missing environment variable " + EnvKeys.TWILL_FS_USER);
- }
- return new HDFSLocationFactory(FileSystem.get(FileSystem.getDefaultUri(conf), conf, fsUser)).create(appDir);
- }
-
- LOG.warn("Unsupported location type {}.", appDir);
- throw new IllegalArgumentException("Unsupported location type " + appDir);
-
- } catch (Exception e) {
- LOG.error("Failed to create application location for {}.", appDir);
- throw Throwables.propagate(e);
- }
- }
-
- private void configureLogger() {
- // Check if SLF4J is bound to logback in the current environment
- ILoggerFactory loggerFactory = LoggerFactory.getILoggerFactory();
- if (!(loggerFactory instanceof LoggerContext)) {
- return;
- }
-
- LoggerContext context = (LoggerContext) loggerFactory;
- context.reset();
- JoranConfigurator configurator = new JoranConfigurator();
- configurator.setContext(context);
-
- try {
- File twillLogback = new File(Constants.Files.LOGBACK_TEMPLATE);
- if (twillLogback.exists()) {
- configurator.doConfigure(twillLogback);
- }
- new ContextInitializer(context).autoConfig();
- } catch (JoranException e) {
- throw Throwables.propagate(e);
- }
- doConfigure(configurator, getLogConfig(getLoggerLevel(context.getLogger(Logger.ROOT_LOGGER_NAME))));
- }
-
- private void doConfigure(JoranConfigurator configurator, String config) {
- try {
- configurator.doConfigure(new InputSource(new StringReader(config)));
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
- private String getLogConfig(String rootLevel) {
- return
- "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
- "<configuration>\n" +
- " <appender name=\"KAFKA\" class=\"" + KafkaAppender.class.getName() + "\">\n" +
- " <topic>" + Constants.LOG_TOPIC + "</topic>\n" +
- " <hostname>" + getHostname() + "</hostname>\n" +
- " <zookeeper>" + getKafkaZKConnect() + "</zookeeper>\n" +
- " </appender>\n" +
- " <logger name=\"org.apache.twill.internal.logging\" additivity=\"false\" />\n" +
- " <root level=\"" + rootLevel + "\">\n" +
- " <appender-ref ref=\"KAFKA\"/>\n" +
- " </root>\n" +
- "</configuration>";
- }
-
- private String getLoggerLevel(Logger logger) {
- if (logger instanceof ch.qos.logback.classic.Logger) {
- return ((ch.qos.logback.classic.Logger) logger).getLevel().toString();
- }
-
- if (logger.isTraceEnabled()) {
- return "TRACE";
- }
- if (logger.isDebugEnabled()) {
- return "DEBUG";
- }
- if (logger.isInfoEnabled()) {
- return "INFO";
- }
- if (logger.isWarnEnabled()) {
- return "WARN";
- }
- if (logger.isErrorEnabled()) {
- return "ERROR";
- }
- return "OFF";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java
deleted file mode 100644
index 028df7b..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.appmaster;
-
-/**
- * Represents data being stored in the live node of the application master.
- */
-public final class ApplicationMasterLiveNodeData {
-
- private final int appId;
- private final long appIdClusterTime;
- private final String containerId;
-
- public ApplicationMasterLiveNodeData(int appId, long appIdClusterTime, String containerId) {
- this.appId = appId;
- this.appIdClusterTime = appIdClusterTime;
- this.containerId = containerId;
- }
-
- public int getAppId() {
- return appId;
- }
-
- public long getAppIdClusterTime() {
- return appIdClusterTime;
- }
-
- public String getContainerId() {
- return containerId;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
deleted file mode 100644
index b34a7a2..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.appmaster;
-
-import org.apache.twill.api.RunId;
-import org.apache.twill.internal.Constants;
-import org.apache.twill.internal.EnvKeys;
-import org.apache.twill.internal.RunIds;
-import org.apache.twill.internal.ServiceMain;
-import org.apache.twill.internal.yarn.VersionDetectYarnAMClientFactory;
-import org.apache.twill.zookeeper.RetryStrategies;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.twill.zookeeper.ZKClientServices;
-import org.apache.twill.zookeeper.ZKClients;
-import com.google.common.util.concurrent.Service;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
-import java.io.File;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Main class for launching {@link ApplicationMasterService}.
- */
-public final class ApplicationMasterMain extends ServiceMain {
-
- private final String kafkaZKConnect;
-
- private ApplicationMasterMain(String kafkaZKConnect) {
- this.kafkaZKConnect = kafkaZKConnect;
- }
-
- /**
- * Starts the application master.
- */
- public static void main(String[] args) throws Exception {
- String zkConnect = System.getenv(EnvKeys.TWILL_ZK_CONNECT);
- File twillSpec = new File(Constants.Files.TWILL_SPEC);
- RunId runId = RunIds.fromString(System.getenv(EnvKeys.TWILL_RUN_ID));
-
- ZKClientService zkClientService =
- ZKClientServices.delegate(
- ZKClients.reWatchOnExpire(
- ZKClients.retryOnFailure(
- ZKClientService.Builder.of(zkConnect).build(),
- RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
-
- Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
- Service service = new ApplicationMasterService(runId, zkClientService, twillSpec,
- new VersionDetectYarnAMClientFactory(conf), createAppLocation(conf));
- new ApplicationMasterMain(String.format("%s/%s/kafka", zkConnect, runId.getId())).doMain(zkClientService, service);
- }
-
- @Override
- protected String getHostname() {
- try {
- return InetAddress.getLocalHost().getCanonicalHostName();
- } catch (UnknownHostException e) {
- return "unknown";
- }
- }
-
- @Override
- protected String getKafkaZKConnect() {
- return kafkaZKConnect;
- }
-}