You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/06/18 06:02:48 UTC
svn commit: r1494017 [2/3] - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/
hadoop-yarn/hadoop-yarn-appl...
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,578 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.async.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+@Private
+@Unstable
+public class NMClientAsyncImpl extends NMClientAsync {
+
+ private static final Log LOG = LogFactory.getLog(NMClientAsyncImpl.class);
+
+ protected static final int INITIAL_THREAD_POOL_SIZE = 10;
+
+ protected ThreadPoolExecutor threadPool;
+ protected int maxThreadPoolSize;
+ protected Thread eventDispatcherThread;
+ protected AtomicBoolean stopped = new AtomicBoolean(false);
+ protected BlockingQueue<ContainerEvent> events =
+ new LinkedBlockingQueue<ContainerEvent>();
+
+ protected ConcurrentMap<ContainerId, StatefulContainer> containers =
+ new ConcurrentHashMap<ContainerId, StatefulContainer>();
+
+ public NMClientAsyncImpl(CallbackHandler callbackHandler) {
+ this (NMClientAsyncImpl.class.getName(), callbackHandler);
+ }
+
+ public NMClientAsyncImpl(String name, CallbackHandler callbackHandler) {
+ this (name, new NMClientImpl(), callbackHandler);
+ }
+
+ @Private
+ @VisibleForTesting
+ protected NMClientAsyncImpl(String name, NMClient client,
+ CallbackHandler callbackHandler) {
+ super(name, client, callbackHandler);
+ this.client = client;
+ this.callbackHandler = callbackHandler;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ this.maxThreadPoolSize = conf.getInt(
+ YarnConfiguration.NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE,
+ YarnConfiguration.DEFAULT_NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE);
+ LOG.info("Upper bound of the thread pool size is " + maxThreadPoolSize);
+
+ client.init(conf);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ client.start();
+
+ ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
+ this.getClass().getName() + " #%d").setDaemon(true).build();
+
+ // Start with a default core-pool size and change it dynamically.
+ int initSize = Math.min(INITIAL_THREAD_POOL_SIZE, maxThreadPoolSize);
+ threadPool = new ThreadPoolExecutor(initSize, Integer.MAX_VALUE, 1,
+ TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
+
+ eventDispatcherThread = new Thread() {
+ @Override
+ public void run() {
+ ContainerEvent event = null;
+ Set<String> allNodes = new HashSet<String>();
+
+ while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+ try {
+ event = events.take();
+ } catch (InterruptedException e) {
+ if (!stopped.get()) {
+ LOG.error("Returning, thread interrupted", e);
+ }
+ return;
+ }
+
+ allNodes.add(event.getNodeId().toString());
+
+ int threadPoolSize = threadPool.getCorePoolSize();
+
+ // We can increase the pool size only if haven't reached the maximum
+ // limit yet.
+ if (threadPoolSize != maxThreadPoolSize) {
+
+ // nodes where containers will run at *this* point of time. This is
+ // *not* the cluster size and doesn't need to be.
+ int nodeNum = allNodes.size();
+ int idealThreadPoolSize = Math.min(maxThreadPoolSize, nodeNum);
+
+ if (threadPoolSize < idealThreadPoolSize) {
+ // Bump up the pool size to idealThreadPoolSize +
+ // INITIAL_POOL_SIZE, the later is just a buffer so we are not
+ // always increasing the pool-size
+ int newThreadPoolSize = Math.min(maxThreadPoolSize,
+ idealThreadPoolSize + INITIAL_THREAD_POOL_SIZE);
+ LOG.info("Set NMClientAsync thread pool size to " +
+ newThreadPoolSize + " as the number of nodes to talk to is "
+ + nodeNum);
+ threadPool.setCorePoolSize(newThreadPoolSize);
+ }
+ }
+
+ // the events from the queue are handled in parallel with a thread
+ // pool
+ threadPool.execute(getContainerEventProcessor(event));
+
+ // TODO: Group launching of multiple containers to a single
+ // NodeManager into a single connection
+ }
+ }
+ };
+ eventDispatcherThread.setName("Container Event Dispatcher");
+ eventDispatcherThread.setDaemon(false);
+ eventDispatcherThread.start();
+
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (stopped.getAndSet(true)) {
+ // return if already stopped
+ return;
+ }
+ if (eventDispatcherThread != null) {
+ eventDispatcherThread.interrupt();
+ try {
+ eventDispatcherThread.join();
+ } catch (InterruptedException e) {
+ LOG.error("The thread of " + eventDispatcherThread.getName() +
+ " didn't finish normally.", e);
+ }
+ }
+ if (threadPool != null) {
+ threadPool.shutdownNow();
+ }
+ if (client != null) {
+ // If NMClientImpl doesn't stop running containers, the states doesn't
+ // need to be cleared.
+ if (!(client instanceof NMClientImpl) ||
+ ((NMClientImpl) client).getCleanupRunningContainers().get()) {
+ if (containers != null) {
+ containers.clear();
+ }
+ }
+ client.stop();
+ }
+ super.serviceStop();
+ }
+
+ public void startContainerAsync(
+ Container container, ContainerLaunchContext containerLaunchContext) {
+ if (containers.putIfAbsent(container.getId(),
+ new StatefulContainer(this, container.getId())) != null) {
+ callbackHandler.onStartContainerError(container.getId(),
+ RPCUtil.getRemoteException("Container " + container.getId() +
+ " is already started or scheduled to start"));
+ }
+ try {
+ events.put(new StartContainerEvent(container, containerLaunchContext));
+ } catch (InterruptedException e) {
+ LOG.warn("Exception when scheduling the event of starting Container " +
+ container.getId());
+ callbackHandler.onStartContainerError(container.getId(), e);
+ }
+ }
+
+ public void stopContainerAsync(ContainerId containerId, NodeId nodeId,
+ Token containerToken) {
+ if (containers.get(containerId) == null) {
+ callbackHandler.onStopContainerError(containerId,
+ RPCUtil.getRemoteException("Container " + containerId +
+ " is neither started nor scheduled to start"));
+ }
+ try {
+ events.put(new ContainerEvent(containerId, nodeId, containerToken,
+ ContainerEventType.STOP_CONTAINER));
+ } catch (InterruptedException e) {
+ LOG.warn("Exception when scheduling the event of stopping Container " +
+ containerId);
+ callbackHandler.onStopContainerError(containerId, e);
+ }
+ }
+
+ public void getContainerStatusAsync(ContainerId containerId, NodeId nodeId,
+ Token containerToken) {
+ try {
+ events.put(new ContainerEvent(containerId, nodeId, containerToken,
+ ContainerEventType.QUERY_CONTAINER));
+ } catch (InterruptedException e) {
+ LOG.warn("Exception when scheduling the event of querying the status" +
+ " of Container " + containerId);
+ callbackHandler.onGetContainerStatusError(containerId, e);
+ }
+ }
+
+ protected static enum ContainerState {
+ PREP, FAILED, RUNNING, DONE,
+ }
+
+ protected boolean isCompletelyDone(StatefulContainer container) {
+ return container.getState() == ContainerState.DONE ||
+ container.getState() == ContainerState.FAILED;
+ }
+
+ protected ContainerEventProcessor getContainerEventProcessor(
+ ContainerEvent event) {
+ return new ContainerEventProcessor(event);
+ }
+
+ /**
+ * The type of the event of interacting with a container
+ */
+ protected static enum ContainerEventType {
+ START_CONTAINER,
+ STOP_CONTAINER,
+ QUERY_CONTAINER
+ }
+
+ protected static class ContainerEvent
+ extends AbstractEvent<ContainerEventType>{
+ private ContainerId containerId;
+ private NodeId nodeId;
+ private Token containerToken;
+
+ public ContainerEvent(ContainerId containerId, NodeId nodeId,
+ Token containerToken, ContainerEventType type) {
+ super(type);
+ this.containerId = containerId;
+ this.nodeId = nodeId;
+ this.containerToken = containerToken;
+ }
+
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+
+ public NodeId getNodeId() {
+ return nodeId;
+ }
+
+ public Token getContainerToken() {
+ return containerToken;
+ }
+ }
+
+ protected static class StartContainerEvent extends ContainerEvent {
+ private Container container;
+ private ContainerLaunchContext containerLaunchContext;
+
+ public StartContainerEvent(Container container,
+ ContainerLaunchContext containerLaunchContext) {
+ super(container.getId(), container.getNodeId(),
+ container.getContainerToken(), ContainerEventType.START_CONTAINER);
+ this.container = container;
+ this.containerLaunchContext = containerLaunchContext;
+ }
+
+ public Container getContainer() {
+ return container;
+ }
+
+ public ContainerLaunchContext getContainerLaunchContext() {
+ return containerLaunchContext;
+ }
+ }
+
+ protected static class StatefulContainer implements
+ EventHandler<ContainerEvent> {
+
+ protected final static StateMachineFactory<StatefulContainer,
+ ContainerState, ContainerEventType, ContainerEvent> stateMachineFactory
+ = new StateMachineFactory<StatefulContainer, ContainerState,
+ ContainerEventType, ContainerEvent>(ContainerState.PREP)
+
+ // Transitions from PREP state
+ .addTransition(ContainerState.PREP,
+ EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
+ ContainerEventType.START_CONTAINER,
+ new StartContainerTransition())
+ .addTransition(ContainerState.PREP, ContainerState.DONE,
+ ContainerEventType.STOP_CONTAINER, new OutOfOrderTransition())
+
+ // Transitions from RUNNING state
+ // RUNNING -> RUNNING should be the invalid transition
+ .addTransition(ContainerState.RUNNING,
+ EnumSet.of(ContainerState.DONE, ContainerState.FAILED),
+ ContainerEventType.STOP_CONTAINER,
+ new StopContainerTransition())
+
+ // Transition from DONE state
+ .addTransition(ContainerState.DONE, ContainerState.DONE,
+ EnumSet.of(ContainerEventType.START_CONTAINER,
+ ContainerEventType.STOP_CONTAINER))
+
+ // Transition from FAILED state
+ .addTransition(ContainerState.FAILED, ContainerState.FAILED,
+ EnumSet.of(ContainerEventType.START_CONTAINER,
+ ContainerEventType.STOP_CONTAINER));
+
+ protected static class StartContainerTransition implements
+ MultipleArcTransition<StatefulContainer, ContainerEvent,
+ ContainerState> {
+
+ @Override
+ public ContainerState transition(
+ StatefulContainer container, ContainerEvent event) {
+ ContainerId containerId = event.getContainerId();
+ try {
+ StartContainerEvent scEvent = null;
+ if (event instanceof StartContainerEvent) {
+ scEvent = (StartContainerEvent) event;
+ }
+ assert scEvent != null;
+ Map<String, ByteBuffer> allServiceResponse =
+ container.nmClientAsync.getClient().startContainer(
+ scEvent.getContainer(), scEvent.getContainerLaunchContext());
+ try {
+ container.nmClientAsync.getCallbackHandler().onContainerStarted(
+ containerId, allServiceResponse);
+ } catch (Throwable thr) {
+ // Don't process user created unchecked exception
+ LOG.info("Unchecked exception is thrown from onContainerStarted for "
+ + "Container " + containerId, thr);
+ }
+ return ContainerState.RUNNING;
+ } catch (YarnException e) {
+ return onExceptionRaised(container, event, e);
+ } catch (IOException e) {
+ return onExceptionRaised(container, event, e);
+ } catch (Throwable t) {
+ return onExceptionRaised(container, event, t);
+ }
+ }
+
+ private ContainerState onExceptionRaised(StatefulContainer container,
+ ContainerEvent event, Throwable t) {
+ try {
+ container.nmClientAsync.getCallbackHandler().onStartContainerError(
+ event.getContainerId(), t);
+ } catch (Throwable thr) {
+ // Don't process user created unchecked exception
+ LOG.info(
+ "Unchecked exception is thrown from onStartContainerError for " +
+ "Container " + event.getContainerId(), thr);
+ }
+ return ContainerState.FAILED;
+ }
+ }
+
+ protected static class StopContainerTransition implements
+ MultipleArcTransition<StatefulContainer, ContainerEvent,
+ ContainerState> {
+
+ @Override
+ public ContainerState transition(
+ StatefulContainer container, ContainerEvent event) {
+ ContainerId containerId = event.getContainerId();
+ try {
+ container.nmClientAsync.getClient().stopContainer(
+ containerId, event.getNodeId(), event.getContainerToken());
+ try {
+ container.nmClientAsync.getCallbackHandler().onContainerStopped(
+ event.getContainerId());
+ } catch (Throwable thr) {
+ // Don't process user created unchecked exception
+ LOG.info("Unchecked exception is thrown from onContainerStopped for "
+ + "Container " + event.getContainerId(), thr);
+ }
+ return ContainerState.DONE;
+ } catch (YarnException e) {
+ return onExceptionRaised(container, event, e);
+ } catch (IOException e) {
+ return onExceptionRaised(container, event, e);
+ } catch (Throwable t) {
+ return onExceptionRaised(container, event, t);
+ }
+ }
+
+ private ContainerState onExceptionRaised(StatefulContainer container,
+ ContainerEvent event, Throwable t) {
+ try {
+ container.nmClientAsync.getCallbackHandler().onStopContainerError(
+ event.getContainerId(), t);
+ } catch (Throwable thr) {
+ // Don't process user created unchecked exception
+ LOG.info("Unchecked exception is thrown from onStopContainerError for "
+ + "Container " + event.getContainerId(), thr);
+ }
+ return ContainerState.FAILED;
+ }
+ }
+
+ protected static class OutOfOrderTransition implements
+ SingleArcTransition<StatefulContainer, ContainerEvent> {
+
+ protected static final String STOP_BEFORE_START_ERROR_MSG =
+ "Container was killed before it was launched";
+
+ @Override
+ public void transition(StatefulContainer container, ContainerEvent event) {
+ try {
+ container.nmClientAsync.getCallbackHandler().onStartContainerError(
+ event.getContainerId(),
+ RPCUtil.getRemoteException(STOP_BEFORE_START_ERROR_MSG));
+ } catch (Throwable thr) {
+ // Don't process user created unchecked exception
+ LOG.info(
+ "Unchecked exception is thrown from onStartContainerError for " +
+ "Container " + event.getContainerId(), thr);
+ }
+ }
+ }
+
+ private final NMClientAsync nmClientAsync;
+ private final ContainerId containerId;
+ private final StateMachine<ContainerState, ContainerEventType,
+ ContainerEvent> stateMachine;
+ private final ReadLock readLock;
+ private final WriteLock writeLock;
+
+ public StatefulContainer(NMClientAsync client, ContainerId containerId) {
+ this.nmClientAsync = client;
+ this.containerId = containerId;
+ stateMachine = stateMachineFactory.make(this);
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ readLock = lock.readLock();
+ writeLock = lock.writeLock();
+ }
+
+ @Override
+ public void handle(ContainerEvent event) {
+ writeLock.lock();
+ try {
+ try {
+ this.stateMachine.doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.error("Can't handle this event at current state", e);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+
+ public ContainerState getState() {
+ readLock.lock();
+ try {
+ return stateMachine.getCurrentState();
+ } finally {
+ readLock.unlock();
+ }
+ }
+ }
+
+ protected class ContainerEventProcessor implements Runnable {
+ protected ContainerEvent event;
+
+ public ContainerEventProcessor(ContainerEvent event) {
+ this.event = event;
+ }
+
+ @Override
+ public void run() {
+ ContainerId containerId = event.getContainerId();
+ LOG.info("Processing Event " + event + " for Container " + containerId);
+ if (event.getType() == ContainerEventType.QUERY_CONTAINER) {
+ try {
+ ContainerStatus containerStatus = client.getContainerStatus(
+ containerId, event.getNodeId(), event.getContainerToken());
+ try {
+ callbackHandler.onContainerStatusReceived(
+ containerId, containerStatus);
+ } catch (Throwable thr) {
+ // Don't process user created unchecked exception
+ LOG.info(
+ "Unchecked exception is thrown from onContainerStatusReceived" +
+ " for Container " + event.getContainerId(), thr);
+ }
+ } catch (YarnException e) {
+ onExceptionRaised(containerId, e);
+ } catch (IOException e) {
+ onExceptionRaised(containerId, e);
+ } catch (Throwable t) {
+ onExceptionRaised(containerId, t);
+ }
+ } else {
+ StatefulContainer container = containers.get(containerId);
+ if (container == null) {
+ LOG.info("Container " + containerId + " is already stopped or failed");
+ } else {
+ container.handle(event);
+ if (isCompletelyDone(container)) {
+ containers.remove(containerId);
+ }
+ }
+ }
+ }
+
+ private void onExceptionRaised(ContainerId containerId, Throwable t) {
+ try {
+ callbackHandler.onGetContainerStatusError(containerId, t);
+ } catch (Throwable thr) {
+ // Don't process user created unchecked exception
+ LOG.info("Unchecked exception is thrown from onGetContainerStatusError" +
+ " for Container " + containerId, thr);
+ }
+ }
+ }
+
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/package-info.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/package-info.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/package-info.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.client.api.async.impl;
+import org.apache.hadoop.classification.InterfaceAudience;
+
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/package-info.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/package-info.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/package-info.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.client.api.async;
+import org.apache.hadoop.classification.InterfaceAudience;
+
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,585 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.RackResolver;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+
+// TODO check inputs for null etc. YARN-654
+
+@Private
+@Unstable
+public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
+
+ private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class);
+
+ private final RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+
+ private int lastResponseId = 0;
+ private ConcurrentHashMap<String, Token> nmTokens;
+
+ protected ApplicationMasterProtocol rmClient;
+ protected final ApplicationAttemptId appAttemptId;
+ protected Resource clusterAvailableResources;
+ protected int clusterNodeCount;
+
+ class ResourceRequestInfo {
+ ResourceRequest remoteRequest;
+ LinkedHashSet<T> containerRequests;
+
+ ResourceRequestInfo(Priority priority, String resourceName,
+ Resource capability) {
+ remoteRequest = ResourceRequest.newInstance(priority, resourceName,
+ capability, 0);
+ containerRequests = new LinkedHashSet<T>();
+ }
+ }
+
+
+ /**
+ * Class compares Resource by memory then cpu in reverse order
+ */
+ class ResourceReverseMemoryThenCpuComparator implements Comparator<Resource> {
+ @Override
+ public int compare(Resource arg0, Resource arg1) {
+ int mem0 = arg0.getMemory();
+ int mem1 = arg1.getMemory();
+ int cpu0 = arg0.getVirtualCores();
+ int cpu1 = arg1.getVirtualCores();
+ if(mem0 == mem1) {
+ if(cpu0 == cpu1) {
+ return 0;
+ }
+ if(cpu0 < cpu1) {
+ return 1;
+ }
+ return -1;
+ }
+ if(mem0 < mem1) {
+ return 1;
+ }
+ return -1;
+ }
+ }
+
+ static boolean canFit(Resource arg0, Resource arg1) {
+ int mem0 = arg0.getMemory();
+ int mem1 = arg1.getMemory();
+ int cpu0 = arg0.getVirtualCores();
+ int cpu1 = arg1.getVirtualCores();
+
+ if(mem0 <= mem1 && cpu0 <= cpu1) {
+ return true;
+ }
+ return false;
+ }
+
+ //Key -> Priority
+ //Value -> Map
+ //Key->ResourceName (e.g., nodename, rackname, *)
+ //Value->Map
+ //Key->Resource Capability
+ //Value->ResourceRequest
+ protected final
+ Map<Priority, Map<String, TreeMap<Resource, ResourceRequestInfo>>>
+ remoteRequestsTable =
+ new TreeMap<Priority, Map<String, TreeMap<Resource, ResourceRequestInfo>>>();
+
+ protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
+ new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
+ protected final Set<ContainerId> release = new TreeSet<ContainerId>();
+
+ public AMRMClientImpl(ApplicationAttemptId appAttemptId) {
+ super(AMRMClientImpl.class.getName());
+ this.appAttemptId = appAttemptId;
+ this.nmTokens = new ConcurrentHashMap<String, Token>();
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ RackResolver.init(conf);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ final YarnConfiguration conf = new YarnConfiguration(getConfig());
+ final YarnRPC rpc = YarnRPC.create(conf);
+ final InetSocketAddress rmAddress = conf.getSocketAddr(
+ YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+
+ UserGroupInformation currentUser;
+ try {
+ currentUser = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new YarnRuntimeException(e);
+ }
+
+ // CurrentUser should already have AMToken loaded.
+ rmClient = currentUser.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
+ @Override
+ public ApplicationMasterProtocol run() {
+ return (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol.class, rmAddress,
+ conf);
+ }
+ });
+ LOG.debug("Connecting to ResourceManager at " + rmAddress);
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (this.rmClient != null) {
+ RPC.stopProxy(this.rmClient);
+ }
+ super.serviceStop();
+ }
+
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ String appHostName, int appHostPort, String appTrackingUrl)
+ throws YarnException, IOException {
+ // do this only once ???
+ RegisterApplicationMasterRequest request = recordFactory
+ .newRecordInstance(RegisterApplicationMasterRequest.class);
+ synchronized (this) {
+ request.setApplicationAttemptId(appAttemptId);
+ }
+ request.setHost(appHostName);
+ request.setRpcPort(appHostPort);
+ if(appTrackingUrl != null) {
+ request.setTrackingUrl(appTrackingUrl);
+ }
+ RegisterApplicationMasterResponse response = rmClient
+ .registerApplicationMaster(request);
+ return response;
+ }
+
+ @Override
+ public AllocateResponse allocate(float progressIndicator)
+ throws YarnException, IOException {
+ AllocateResponse allocateResponse = null;
+ ArrayList<ResourceRequest> askList = null;
+ ArrayList<ContainerId> releaseList = null;
+ AllocateRequest allocateRequest = null;
+
+ try {
+ synchronized (this) {
+ askList = new ArrayList<ResourceRequest>(ask);
+ releaseList = new ArrayList<ContainerId>(release);
+ // optimistically clear this collection assuming no RPC failure
+ ask.clear();
+ release.clear();
+ allocateRequest =
+ AllocateRequest.newInstance(appAttemptId, lastResponseId,
+ progressIndicator, askList, releaseList, null);
+ }
+
+ allocateResponse = rmClient.allocate(allocateRequest);
+
+ synchronized (this) {
+ // update these on successful RPC
+ clusterNodeCount = allocateResponse.getNumClusterNodes();
+ lastResponseId = allocateResponse.getResponseId();
+ clusterAvailableResources = allocateResponse.getAvailableResources();
+ if (!allocateResponse.getNMTokens().isEmpty()) {
+ populateNMTokens(allocateResponse);
+ }
+ }
+ } finally {
+ // TODO how to differentiate remote yarn exception vs error in rpc
+ if(allocateResponse == null) {
+ // we hit an exception in allocate()
+ // preserve ask and release for next call to allocate()
+ synchronized (this) {
+ release.addAll(releaseList);
+ // requests could have been added or deleted during call to allocate
+ // If requests were added/removed then there is nothing to do since
+ // the ResourceRequest object in ask would have the actual new value.
+ // If ask does not have this ResourceRequest then it was unchanged and
+ // so we can add the value back safely.
+ // This assumes that there will no concurrent calls to allocate() and
+ // so we dont have to worry about ask being changed in the
+ // synchronized block at the beginning of this method.
+ for(ResourceRequest oldAsk : askList) {
+ if(!ask.contains(oldAsk)) {
+ ask.add(oldAsk);
+ }
+ }
+ }
+ }
+ }
+ return allocateResponse;
+ }
+
+ @Private
+ @VisibleForTesting
+ protected void populateNMTokens(AllocateResponse allocateResponse) {
+ for (NMToken token : allocateResponse.getNMTokens()) {
+ String nodeId = token.getNodeId().toString();
+ if (nmTokens.containsKey(nodeId)) {
+ LOG.debug("Replacing token for : " + nodeId);
+ } else {
+ LOG.debug("Received new token for : " + nodeId);
+ }
+ nmTokens.put(nodeId, token.getToken());
+ }
+ }
+
+ @Override
+ public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
+ String appMessage, String appTrackingUrl) throws YarnException,
+ IOException {
+ FinishApplicationMasterRequest request = recordFactory
+ .newRecordInstance(FinishApplicationMasterRequest.class);
+ request.setAppAttemptId(appAttemptId);
+ request.setFinalApplicationStatus(appStatus);
+ if(appMessage != null) {
+ request.setDiagnostics(appMessage);
+ }
+ if(appTrackingUrl != null) {
+ request.setTrackingUrl(appTrackingUrl);
+ }
+ rmClient.finishApplicationMaster(request);
+ }
+
+ @Override
+ public synchronized void addContainerRequest(T req) {
+ Set<String> allRacks = new HashSet<String>();
+ if (req.getRacks() != null) {
+ allRacks.addAll(req.getRacks());
+ if(req.getRacks().size() != allRacks.size()) {
+ Joiner joiner = Joiner.on(',');
+ LOG.warn("ContainerRequest has duplicate racks: "
+ + joiner.join(req.getRacks()));
+ }
+ }
+ allRacks.addAll(resolveRacks(req.getNodes()));
+
+ if (req.getNodes() != null) {
+ HashSet<String> dedupedNodes = new HashSet<String>(req.getNodes());
+ if(dedupedNodes.size() != req.getNodes().size()) {
+ Joiner joiner = Joiner.on(',');
+ LOG.warn("ContainerRequest has duplicate nodes: "
+ + joiner.join(req.getNodes()));
+ }
+ for (String node : dedupedNodes) {
+ // Ensure node requests are accompanied by requests for
+ // corresponding rack
+ addResourceRequest(req.getPriority(), node, req.getCapability(),
+ req.getContainerCount(), req);
+ }
+ }
+
+ for (String rack : allRacks) {
+ addResourceRequest(req.getPriority(), rack, req.getCapability(),
+ req.getContainerCount(), req);
+ }
+
+ // Off-switch
+ addResourceRequest(req.getPriority(), ResourceRequest.ANY, req.getCapability(),
+ req.getContainerCount(), req);
+ }
+
+ @Override
+ public synchronized void removeContainerRequest(T req) {
+ Set<String> allRacks = new HashSet<String>();
+ if (req.getRacks() != null) {
+ allRacks.addAll(req.getRacks());
+ }
+ allRacks.addAll(resolveRacks(req.getNodes()));
+
+ // Update resource requests
+ if (req.getNodes() != null) {
+ for (String node : new HashSet<String>(req.getNodes())) {
+ decResourceRequest(req.getPriority(), node, req.getCapability(),
+ req.getContainerCount(), req);
+ }
+ }
+
+ for (String rack : allRacks) {
+ decResourceRequest(req.getPriority(), rack, req.getCapability(),
+ req.getContainerCount(), req);
+ }
+
+ decResourceRequest(req.getPriority(), ResourceRequest.ANY, req.getCapability(),
+ req.getContainerCount(), req);
+ }
+
+ @Override
+ public synchronized void releaseAssignedContainer(ContainerId containerId) {
+ release.add(containerId);
+ }
+
+ @Override
+ public synchronized Resource getClusterAvailableResources() {
+ return clusterAvailableResources;
+ }
+
+ @Override
+ public synchronized int getClusterNodeCount() {
+ return clusterNodeCount;
+ }
+
+ @Override
+ public synchronized List<? extends Collection<T>> getMatchingRequests(
+ Priority priority,
+ String resourceName,
+ Resource capability) {
+ List<LinkedHashSet<T>> list = new LinkedList<LinkedHashSet<T>>();
+ Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
+ this.remoteRequestsTable.get(priority);
+ if (remoteRequests == null) {
+ return list;
+ }
+ TreeMap<Resource, ResourceRequestInfo> reqMap = remoteRequests
+ .get(resourceName);
+ if (reqMap == null) {
+ return list;
+ }
+
+ ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
+ if (resourceRequestInfo != null) {
+ list.add(resourceRequestInfo.containerRequests);
+ return list;
+ }
+
+ // no exact match. Container may be larger than what was requested.
+ // get all resources <= capability. map is reverse sorted.
+ SortedMap<Resource, ResourceRequestInfo> tailMap =
+ reqMap.tailMap(capability);
+ for(Map.Entry<Resource, ResourceRequestInfo> entry : tailMap.entrySet()) {
+ if(canFit(entry.getKey(), capability)) {
+ // match found that fits in the larger resource
+ list.add(entry.getValue().containerRequests);
+ }
+ }
+
+ // no match found
+ return list;
+ }
+
+ private Set<String> resolveRacks(List<String> nodes) {
+ Set<String> racks = new HashSet<String>();
+ if (nodes != null) {
+ for (String node : nodes) {
+ // Ensure node requests are accompanied by requests for
+ // corresponding rack
+ String rack = RackResolver.resolve(node).getNetworkLocation();
+ if (rack == null) {
+ LOG.warn("Failed to resolve rack for node " + node + ".");
+ } else {
+ racks.add(rack);
+ }
+ }
+ }
+
+ return racks;
+ }
+
+ private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
+ // This code looks weird but is needed because of the following scenario.
+ // A ResourceRequest is removed from the remoteRequestTable. A 0 container
+ // request is added to 'ask' to notify the RM about not needing it any more.
+ // Before the call to allocate, the user now requests more containers. If
+ // the locations of the 0 size request and the new request are the same
+ // (with the difference being only container count), then the set comparator
+ // will consider both to be the same and not add the new request to ask. So
+ // we need to check for the "same" request being present and remove it and
+ // then add it back. The comparator is container count agnostic.
+ // This should happen only rarely but we do need to guard against it.
+ if(ask.contains(remoteRequest)) {
+ ask.remove(remoteRequest);
+ }
+ ask.add(remoteRequest);
+ }
+
+ private void addResourceRequest(Priority priority, String resourceName,
+ Resource capability, int containerCount, T req) {
+ Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
+ this.remoteRequestsTable.get(priority);
+ if (remoteRequests == null) {
+ remoteRequests =
+ new HashMap<String, TreeMap<Resource, ResourceRequestInfo>>();
+ this.remoteRequestsTable.put(priority, remoteRequests);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added priority=" + priority);
+ }
+ }
+ TreeMap<Resource, ResourceRequestInfo> reqMap =
+ remoteRequests.get(resourceName);
+ if (reqMap == null) {
+ // capabilities are stored in reverse sorted order. smallest last.
+ reqMap = new TreeMap<Resource, ResourceRequestInfo>(
+ new ResourceReverseMemoryThenCpuComparator());
+ remoteRequests.put(resourceName, reqMap);
+ }
+ ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
+ if (resourceRequestInfo == null) {
+ resourceRequestInfo =
+ new ResourceRequestInfo(priority, resourceName, capability);
+ reqMap.put(capability, resourceRequestInfo);
+ }
+
+ resourceRequestInfo.remoteRequest.setNumContainers(
+ resourceRequestInfo.remoteRequest.getNumContainers() + containerCount);
+
+ if(req instanceof StoredContainerRequest) {
+ resourceRequestInfo.containerRequests.add(req);
+ }
+
+ // Note this down for next interaction with ResourceManager
+ addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("addResourceRequest:" + " applicationId="
+ + appAttemptId + " priority=" + priority.getPriority()
+ + " resourceName=" + resourceName + " numContainers="
+ + resourceRequestInfo.remoteRequest.getNumContainers()
+ + " #asks=" + ask.size());
+ }
+ }
+
+ private void decResourceRequest(Priority priority,
+ String resourceName,
+ Resource capability,
+ int containerCount,
+ T req) {
+ Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
+ this.remoteRequestsTable.get(priority);
+
+ if(remoteRequests == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not decrementing resource as priority " + priority
+ + " is not present in request table");
+ }
+ return;
+ }
+
+ Map<Resource, ResourceRequestInfo> reqMap = remoteRequests.get(resourceName);
+ if (reqMap == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not decrementing resource as " + resourceName
+ + " is not present in request table");
+ }
+ return;
+ }
+ ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("BEFORE decResourceRequest:" + " applicationId="
+ + appAttemptId + " priority=" + priority.getPriority()
+ + " resourceName=" + resourceName + " numContainers="
+ + resourceRequestInfo.remoteRequest.getNumContainers()
+ + " #asks=" + ask.size());
+ }
+
+ resourceRequestInfo.remoteRequest.setNumContainers(
+ resourceRequestInfo.remoteRequest.getNumContainers() - containerCount);
+
+ if(req instanceof StoredContainerRequest) {
+ resourceRequestInfo.containerRequests.remove(req);
+ }
+
+ if(resourceRequestInfo.remoteRequest.getNumContainers() < 0) {
+ // guard against spurious removals
+ resourceRequestInfo.remoteRequest.setNumContainers(0);
+ }
+ // send the ResourceRequest to RM even if is 0 because it needs to override
+ // a previously sent value. If ResourceRequest was not sent previously then
+ // sending 0 aught to be a no-op on RM
+ addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
+
+ // delete entries from map if no longer needed
+ if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) {
+ reqMap.remove(capability);
+ if (reqMap.size() == 0) {
+ remoteRequests.remove(resourceName);
+ }
+ if (remoteRequests.size() == 0) {
+ remoteRequestsTable.remove(priority);
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.info("AFTER decResourceRequest:" + " applicationId="
+ + appAttemptId + " priority=" + priority.getPriority()
+ + " resourceName=" + resourceName + " numContainers="
+ + resourceRequestInfo.remoteRequest.getNumContainers()
+ + " #asks=" + ask.size());
+ }
+ }
+
+ @Override
+ public ConcurrentHashMap<String, Token> getNMTokens() {
+ return nmTokens;
+ }
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,409 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.util.ProtoUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * <p>
+ * This class implements {@link NMClient}. All the APIs are blocking.
+ * </p>
+ *
+ * <p>
+ * By default, this client stops all the running containers that are started by
+ * it when it stops. It can be disabled via
+ * {@link #cleanupRunningContainersOnStop}, in which case containers will
+ * continue to run even after this client is stopped and till the application
+ * runs at which point ResourceManager will forcefully kill them.
+ * </p>
+ *
+ * <p>
+ * Note that the blocking APIs ensure the RPC calls to <code>NodeManager</code>
+ * are executed immediately, and the responses are received before these APIs
+ * return. However, when {@link #startContainer} or {@link #stopContainer}
+ * returns, <code>NodeManager</code> may still need some time to either start
+ * or stop the container because of its asynchronous implementation. Therefore,
+ * {@link #getContainerStatus} is likely to return a transit container status
+ * if it is executed immediately after {@link #startContainer} or
+ * {@link #stopContainer}.
+ * </p>
+ */
+@Private
+@Unstable
+public class NMClientImpl extends NMClient {
+
+ private static final Log LOG = LogFactory.getLog(NMClientImpl.class);
+
+ // The logically coherent operations on startedContainers is synchronized to
+ // ensure they are atomic
+ protected ConcurrentMap<ContainerId, StartedContainer> startedContainers =
+ new ConcurrentHashMap<ContainerId, StartedContainer>();
+
+ //enabled by default
+ private final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
+
+ public NMClientImpl() {
+ super(NMClientImpl.class.getName());
+ }
+
+ public NMClientImpl(String name) {
+ super(name);
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ // Usually, started-containers are stopped when this client stops. Unless
+ // the flag cleanupRunningContainers is set to false.
+ if (getCleanupRunningContainers().get()) {
+ cleanupRunningContainers();
+ }
+ super.serviceStop();
+ }
+
+ protected synchronized void cleanupRunningContainers() {
+ for (StartedContainer startedContainer : startedContainers.values()) {
+ try {
+ stopContainer(startedContainer.getContainerId(),
+ startedContainer.getNodeId(),
+ startedContainer.getContainerToken());
+ } catch (YarnException e) {
+ LOG.error("Failed to stop Container " +
+ startedContainer.getContainerId() +
+ "when stopping NMClientImpl");
+ } catch (IOException e) {
+ LOG.error("Failed to stop Container " +
+ startedContainer.getContainerId() +
+ "when stopping NMClientImpl");
+ }
+ }
+ }
+
+ @Override
+ public void cleanupRunningContainersOnStop(boolean enabled) {
+ getCleanupRunningContainers().set(enabled);
+ }
+
+ protected static class StartedContainer {
+ private ContainerId containerId;
+ private NodeId nodeId;
+ private Token containerToken;
+ private boolean stopped;
+
+ public StartedContainer(ContainerId containerId, NodeId nodeId,
+ Token containerToken) {
+ this.containerId = containerId;
+ this.nodeId = nodeId;
+ this.containerToken = containerToken;
+ stopped = false;
+ }
+
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+
+ public NodeId getNodeId() {
+ return nodeId;
+ }
+
+ public Token getContainerToken() {
+ return containerToken;
+ }
+ }
+
+ protected static final class NMCommunicator extends AbstractService {
+ private ContainerId containerId;
+ private NodeId nodeId;
+ private Token containerToken;
+ private ContainerManagementProtocol containerManager;
+
+ public NMCommunicator(ContainerId containerId, NodeId nodeId,
+ Token containerToken) {
+ super(NMCommunicator.class.getName());
+ this.containerId = containerId;
+ this.nodeId = nodeId;
+ this.containerToken = containerToken;
+ }
+
+ @Override
+ protected synchronized void serviceStart() throws Exception {
+ final YarnRPC rpc = YarnRPC.create(getConfig());
+
+ final InetSocketAddress containerAddress =
+ NetUtils.createSocketAddr(nodeId.toString());
+
+ // the user in createRemoteUser in this context has to be ContainerId
+ UserGroupInformation currentUser =
+ UserGroupInformation.createRemoteUser(containerId.toString());
+
+ org.apache.hadoop.security.token.Token<ContainerTokenIdentifier> token =
+ ProtoUtils.convertFromProtoFormat(containerToken, containerAddress);
+ currentUser.addToken(token);
+
+ containerManager = currentUser
+ .doAs(new PrivilegedAction<ContainerManagementProtocol>() {
+ @Override
+ public ContainerManagementProtocol run() {
+ return (ContainerManagementProtocol) rpc.getProxy(ContainerManagementProtocol.class,
+ containerAddress, getConfig());
+ }
+ });
+
+ LOG.debug("Connecting to ContainerManager at " + containerAddress);
+ super.serviceStart();
+ }
+
+ @Override
+ protected synchronized void serviceStop() throws Exception {
+ if (this.containerManager != null) {
+ RPC.stopProxy(this.containerManager);
+
+ if (LOG.isDebugEnabled()) {
+ InetSocketAddress containerAddress =
+ NetUtils.createSocketAddr(nodeId.toString());
+ LOG.debug("Disconnecting from ContainerManager at " +
+ containerAddress);
+ }
+ }
+ super.serviceStop();
+ }
+
+ public synchronized Map<String, ByteBuffer> startContainer(
+ Container container, ContainerLaunchContext containerLaunchContext)
+ throws YarnException, IOException {
+ if (!container.getId().equals(containerId)) {
+ throw new IllegalArgumentException(
+ "NMCommunicator's containerId mismatches the given Container's");
+ }
+ StartContainerResponse startResponse = null;
+ try {
+ StartContainerRequest startRequest =
+ Records.newRecord(StartContainerRequest.class);
+ startRequest.setContainerToken(container.getContainerToken());
+ startRequest.setContainerLaunchContext(containerLaunchContext);
+ startResponse = containerManager.startContainer(startRequest);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Started Container " + containerId);
+ }
+ } catch (YarnException e) {
+ LOG.warn("Container " + containerId + " failed to start", e);
+ throw e;
+ } catch (IOException e) {
+ LOG.warn("Container " + containerId + " failed to start", e);
+ throw e;
+ }
+ return startResponse.getAllServiceResponse();
+ }
+
+ public synchronized void stopContainer() throws YarnException,
+ IOException {
+ try {
+ StopContainerRequest stopRequest =
+ Records.newRecord(StopContainerRequest.class);
+ stopRequest.setContainerId(containerId);
+ containerManager.stopContainer(stopRequest);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stopped Container " + containerId);
+ }
+ } catch (YarnException e) {
+ LOG.warn("Container " + containerId + " failed to stop", e);
+ throw e;
+ } catch (IOException e) {
+ LOG.warn("Container " + containerId + " failed to stop", e);
+ throw e;
+ }
+ }
+
+ public synchronized ContainerStatus getContainerStatus()
+ throws YarnException, IOException {
+ GetContainerStatusResponse statusResponse = null;
+ try {
+ GetContainerStatusRequest statusRequest =
+ Records.newRecord(GetContainerStatusRequest.class);
+ statusRequest.setContainerId(containerId);
+ statusResponse = containerManager.getContainerStatus(statusRequest);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got the status of Container " + containerId);
+ }
+ } catch (YarnException e) {
+ LOG.warn(
+ "Unable to get the status of Container " + containerId, e);
+ throw e;
+ } catch (IOException e) {
+ LOG.warn(
+ "Unable to get the status of Container " + containerId, e);
+ throw e;
+ }
+ return statusResponse.getStatus();
+ }
+ }
+
+ @Override
+ public Map<String, ByteBuffer> startContainer(
+ Container container, ContainerLaunchContext containerLaunchContext)
+ throws YarnException, IOException {
+ // Do synchronization on StartedContainer to prevent race condition
+ // between startContainer and stopContainer
+ synchronized (addStartedContainer(container)) {
+ Map<String, ByteBuffer> allServiceResponse;
+ NMCommunicator nmCommunicator = null;
+ try {
+ nmCommunicator = new NMCommunicator(container.getId(),
+ container.getNodeId(), container.getContainerToken());
+ nmCommunicator.init(getConfig());
+ nmCommunicator.start();
+ allServiceResponse =
+ nmCommunicator.startContainer(container, containerLaunchContext);
+ } catch (YarnException e) {
+ // Remove the started container if it failed to start
+ removeStartedContainer(container.getId());
+ throw e;
+ } catch (IOException e) {
+ removeStartedContainer(container.getId());
+ throw e;
+ } catch (Throwable t) {
+ removeStartedContainer(container.getId());
+ throw RPCUtil.getRemoteException(t);
+ } finally {
+ if (nmCommunicator != null) {
+ nmCommunicator.stop();
+ }
+ }
+ return allServiceResponse;
+ }
+
+ // Three choices:
+ // 1. starting and releasing the proxy before and after each interaction
+ // 2. starting the proxy when starting the container and releasing it when
+ // stopping the container
+ // 3. starting the proxy when starting the container and releasing it when
+ // stopping the client
+ // Adopt 1 currently
+ }
+
+ @Override
+ public void stopContainer(ContainerId containerId, NodeId nodeId,
+ Token containerToken) throws YarnException, IOException {
+ StartedContainer startedContainer = getStartedContainer(containerId);
+ if (startedContainer == null) {
+ throw RPCUtil.getRemoteException("Container " + containerId +
+ " is either not started yet or already stopped");
+ }
+ // Only allow one request of stopping the container to move forward
+ // When entering the block, check whether the precursor has already stopped
+ // the container
+ synchronized (startedContainer) {
+ if (startedContainer.stopped) {
+ return;
+ }
+ NMCommunicator nmCommunicator = null;
+ try {
+ nmCommunicator =
+ new NMCommunicator(containerId, nodeId, containerToken);
+ nmCommunicator.init(getConfig());
+ nmCommunicator.start();
+ nmCommunicator.stopContainer();
+ } finally {
+ if (nmCommunicator != null) {
+ nmCommunicator.stop();
+ }
+ startedContainer.stopped = true;
+ removeStartedContainer(containerId);
+ }
+ }
+ }
+
+ @Override
+ public ContainerStatus getContainerStatus(ContainerId containerId,
+ NodeId nodeId, Token containerToken)
+ throws YarnException, IOException {
+ NMCommunicator nmCommunicator = null;
+ try {
+ nmCommunicator = new NMCommunicator(containerId, nodeId, containerToken);
+ nmCommunicator.init(getConfig());
+ nmCommunicator.start();
+ ContainerStatus containerStatus = nmCommunicator.getContainerStatus();
+ return containerStatus;
+ } finally {
+ if (nmCommunicator != null) {
+ nmCommunicator.stop();
+ }
+ }
+ }
+
+ protected synchronized StartedContainer addStartedContainer(
+ Container container) throws YarnException, IOException {
+ if (startedContainers.containsKey(container.getId())) {
+ throw RPCUtil.getRemoteException("Container " + container.getId() +
+ " is already started");
+ }
+ StartedContainer startedContainer = new StartedContainer(container.getId(),
+ container.getNodeId(), container.getContainerToken());
+ startedContainers.put(startedContainer.getContainerId(), startedContainer);
+ return startedContainer;
+ }
+
+ protected synchronized void removeStartedContainer(ContainerId containerId) {
+ startedContainers.remove(containerId);
+ }
+
+ protected synchronized StartedContainer getStartedContainer(
+ ContainerId containerId) {
+ return startedContainers.get(containerId);
+ }
+
+ public AtomicBoolean getCleanupRunningContainers() {
+ return cleanupRunningContainers;
+ }
+
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,316 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.Records;
+
+import com.google.common.annotations.VisibleForTesting;
+
+@Private
+@Unstable
+public class YarnClientImpl extends YarnClient {
+
+ private static final Log LOG = LogFactory.getLog(YarnClientImpl.class);
+
+ protected ApplicationClientProtocol rmClient;
+ protected InetSocketAddress rmAddress;
+ protected long statePollIntervalMillis;
+
+ private static final String ROOT = "root";
+
+ public YarnClientImpl() {
+ this(null);
+ }
+
+ public YarnClientImpl(InetSocketAddress rmAddress) {
+ this(YarnClientImpl.class.getName(), rmAddress);
+ }
+
+ public YarnClientImpl(String name, InetSocketAddress rmAddress) {
+ super(name);
+ this.rmAddress = rmAddress;
+ }
+
+ private static InetSocketAddress getRmAddress(Configuration conf) {
+ return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ if (this.rmAddress == null) {
+ this.rmAddress = getRmAddress(conf);
+ }
+ statePollIntervalMillis = conf.getLong(
+ YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ YarnRPC rpc = YarnRPC.create(getConfig());
+
+ this.rmClient = (ApplicationClientProtocol) rpc.getProxy(
+ ApplicationClientProtocol.class, rmAddress, getConfig());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connecting to ResourceManager at " + rmAddress);
+ }
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (this.rmClient != null) {
+ RPC.stopProxy(this.rmClient);
+ }
+ super.serviceStop();
+ }
+
+ @Override
+ public GetNewApplicationResponse getNewApplication()
+ throws YarnException, IOException {
+ GetNewApplicationRequest request =
+ Records.newRecord(GetNewApplicationRequest.class);
+ return rmClient.getNewApplication(request);
+ }
+
+ @Override
+ public ApplicationId
+ submitApplication(ApplicationSubmissionContext appContext)
+ throws YarnException, IOException {
+ ApplicationId applicationId = appContext.getApplicationId();
+ appContext.setApplicationId(applicationId);
+ SubmitApplicationRequest request =
+ Records.newRecord(SubmitApplicationRequest.class);
+ request.setApplicationSubmissionContext(appContext);
+ rmClient.submitApplication(request);
+
+ int pollCount = 0;
+ while (true) {
+ YarnApplicationState state =
+ getApplicationReport(applicationId).getYarnApplicationState();
+ if (!state.equals(YarnApplicationState.NEW) &&
+ !state.equals(YarnApplicationState.NEW_SAVING)) {
+ break;
+ }
+ // Notify the client through the log every 10 poll, in case the client
+ // is blocked here too long.
+ if (++pollCount % 10 == 0) {
+ LOG.info("Application submission is not finished, " +
+ "submitted application " + applicationId +
+ " is still in " + state);
+ }
+ try {
+ Thread.sleep(statePollIntervalMillis);
+ } catch (InterruptedException ie) {
+ }
+ }
+
+
+ LOG.info("Submitted application " + applicationId + " to ResourceManager"
+ + " at " + rmAddress);
+ return applicationId;
+ }
+
+ @Override
+ public void killApplication(ApplicationId applicationId)
+ throws YarnException, IOException {
+ LOG.info("Killing application " + applicationId);
+ KillApplicationRequest request =
+ Records.newRecord(KillApplicationRequest.class);
+ request.setApplicationId(applicationId);
+ rmClient.forceKillApplication(request);
+ }
+
+ @Override
+ public ApplicationReport getApplicationReport(ApplicationId appId)
+ throws YarnException, IOException {
+ GetApplicationReportRequest request =
+ Records.newRecord(GetApplicationReportRequest.class);
+ request.setApplicationId(appId);
+ GetApplicationReportResponse response =
+ rmClient.getApplicationReport(request);
+ return response.getApplicationReport();
+ }
+
+ @Override
+ public List<ApplicationReport> getApplicationList()
+ throws YarnException, IOException {
+ GetAllApplicationsRequest request =
+ Records.newRecord(GetAllApplicationsRequest.class);
+ GetAllApplicationsResponse response = rmClient.getAllApplications(request);
+ return response.getApplicationList();
+ }
+
+ @Override
+ public YarnClusterMetrics getYarnClusterMetrics() throws YarnException,
+ IOException {
+ GetClusterMetricsRequest request =
+ Records.newRecord(GetClusterMetricsRequest.class);
+ GetClusterMetricsResponse response = rmClient.getClusterMetrics(request);
+ return response.getClusterMetrics();
+ }
+
+ @Override
+ public List<NodeReport> getNodeReports() throws YarnException,
+ IOException {
+ GetClusterNodesRequest request =
+ Records.newRecord(GetClusterNodesRequest.class);
+ GetClusterNodesResponse response = rmClient.getClusterNodes(request);
+ return response.getNodeReports();
+ }
+
+ @Override
+ public Token getRMDelegationToken(Text renewer)
+ throws YarnException, IOException {
+ /* get the token from RM */
+ GetDelegationTokenRequest rmDTRequest =
+ Records.newRecord(GetDelegationTokenRequest.class);
+ rmDTRequest.setRenewer(renewer.toString());
+ GetDelegationTokenResponse response =
+ rmClient.getDelegationToken(rmDTRequest);
+ return response.getRMDelegationToken();
+ }
+
+
+ private GetQueueInfoRequest
+ getQueueInfoRequest(String queueName, boolean includeApplications,
+ boolean includeChildQueues, boolean recursive) {
+ GetQueueInfoRequest request = Records.newRecord(GetQueueInfoRequest.class);
+ request.setQueueName(queueName);
+ request.setIncludeApplications(includeApplications);
+ request.setIncludeChildQueues(includeChildQueues);
+ request.setRecursive(recursive);
+ return request;
+ }
+
+ @Override
+ public QueueInfo getQueueInfo(String queueName) throws YarnException,
+ IOException {
+ GetQueueInfoRequest request =
+ getQueueInfoRequest(queueName, true, false, false);
+ Records.newRecord(GetQueueInfoRequest.class);
+ return rmClient.getQueueInfo(request).getQueueInfo();
+ }
+
+ @Override
+ public List<QueueUserACLInfo> getQueueAclsInfo() throws YarnException,
+ IOException {
+ GetQueueUserAclsInfoRequest request =
+ Records.newRecord(GetQueueUserAclsInfoRequest.class);
+ return rmClient.getQueueUserAcls(request).getUserAclsInfoList();
+ }
+
+ @Override
+ public List<QueueInfo> getAllQueues() throws YarnException,
+ IOException {
+ List<QueueInfo> queues = new ArrayList<QueueInfo>();
+
+ QueueInfo rootQueue =
+ rmClient.getQueueInfo(getQueueInfoRequest(ROOT, false, true, true))
+ .getQueueInfo();
+ getChildQueues(rootQueue, queues, true);
+ return queues;
+ }
+
+ @Override
+ public List<QueueInfo> getRootQueueInfos() throws YarnException,
+ IOException {
+ List<QueueInfo> queues = new ArrayList<QueueInfo>();
+
+ QueueInfo rootQueue =
+ rmClient.getQueueInfo(getQueueInfoRequest(ROOT, false, true, true))
+ .getQueueInfo();
+ getChildQueues(rootQueue, queues, false);
+ return queues;
+ }
+
+ @Override
+ public List<QueueInfo> getChildQueueInfos(String parent)
+ throws YarnException, IOException {
+ List<QueueInfo> queues = new ArrayList<QueueInfo>();
+
+ QueueInfo parentQueue =
+ rmClient.getQueueInfo(getQueueInfoRequest(parent, false, true, false))
+ .getQueueInfo();
+ getChildQueues(parentQueue, queues, true);
+ return queues;
+ }
+
+ private void getChildQueues(QueueInfo parent, List<QueueInfo> queues,
+ boolean recursive) {
+ List<QueueInfo> childQueues = parent.getChildQueues();
+
+ for (QueueInfo child : childQueues) {
+ queues.add(child);
+ if (recursive) {
+ getChildQueues(child, queues, recursive);
+ }
+ }
+ }
+
+ @Private
+ @VisibleForTesting
+ public void setRMClient(ApplicationClientProtocol rmClient) {
+ this.rmClient = rmClient;
+ }
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.client.api.impl;
+import org.apache.hadoop.classification.InterfaceAudience;
+
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.client.api;
+import org.apache.hadoop.classification.InterfaceAudience;
+
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java?rev=1494017&r1=1494016&r2=1494017&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java Tue Jun 18 04:02:47 2013
@@ -27,12 +27,16 @@ import org.apache.commons.cli.CommandLin
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
+@Private
+@Unstable
public class ApplicationCLI extends YarnCLI {
private static final String APPLICATIONS_PATTERN =
"%30s\t%20s\t%20s\t%10s\t%10s\t%18s\t%18s\t%15s\t%35s" +
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java?rev=1494017&r1=1494016&r2=1494017&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java Tue Jun 18 04:02:47 2013
@@ -28,12 +28,16 @@ import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.lang.time.DateFormatUtils;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
+@Private
+@Unstable
public class NodeCLI extends YarnCLI {
private static final String NODES_PATTERN = "%16s\t%10s\t%17s\t%18s" +
System.getProperty("line.separator");
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java?rev=1494017&r1=1494016&r2=1494017&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java Tue Jun 18 04:02:47 2013
@@ -23,6 +23,8 @@ import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.Arrays;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.ipc.RemoteException;
@@ -42,6 +44,8 @@ import org.apache.hadoop.yarn.factories.
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
+@Private
+@Unstable
public class RMAdminCLI extends Configured implements Tool {
private final RecordFactory recordFactory =
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java?rev=1494017&r1=1494016&r2=1494017&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java Tue Jun 18 04:02:47 2013
@@ -19,12 +19,15 @@ package org.apache.hadoop.yarn.client.cl
import java.io.PrintStream;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.yarn.client.YarnClient;
-import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+@Private
+@Unstable
public abstract class YarnCLI extends Configured implements Tool {
public static final String STATUS_CMD = "status";
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/package-info.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/package-info.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/package-info.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.client.cli;
+import org.apache.hadoop.classification.InterfaceAudience;
+