You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/06/18 06:02:48 UTC

svn commit: r1494017 [2/3] - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ hadoop-yarn/hadoop-yarn-appl...

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,578 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.async.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+@Private
+@Unstable
+public class NMClientAsyncImpl extends NMClientAsync {
+
+  private static final Log LOG = LogFactory.getLog(NMClientAsyncImpl.class);
+
+  protected static final int INITIAL_THREAD_POOL_SIZE = 10;
+
+  protected ThreadPoolExecutor threadPool;
+  protected int maxThreadPoolSize;
+  protected Thread eventDispatcherThread;
+  protected AtomicBoolean stopped = new AtomicBoolean(false);
+  protected BlockingQueue<ContainerEvent> events =
+      new LinkedBlockingQueue<ContainerEvent>();
+
+  protected ConcurrentMap<ContainerId, StatefulContainer> containers =
+      new ConcurrentHashMap<ContainerId, StatefulContainer>();
+
+  public NMClientAsyncImpl(CallbackHandler callbackHandler) {
+    this (NMClientAsyncImpl.class.getName(), callbackHandler);
+  }
+
+  public NMClientAsyncImpl(String name, CallbackHandler callbackHandler) {
+    this (name, new NMClientImpl(), callbackHandler);
+  }
+
+  @Private
+  @VisibleForTesting
+  protected NMClientAsyncImpl(String name, NMClient client,
+      CallbackHandler callbackHandler) {
+    super(name, client, callbackHandler);
+    this.client = client;
+    this.callbackHandler = callbackHandler;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    this.maxThreadPoolSize = conf.getInt(
+        YarnConfiguration.NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE,
+        YarnConfiguration.DEFAULT_NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE);
+    LOG.info("Upper bound of the thread pool size is " + maxThreadPoolSize);
+
+    client.init(conf);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    client.start();
+
+    ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
+        this.getClass().getName() + " #%d").setDaemon(true).build();
+
+    // Start with a default core-pool size and change it dynamically.
+    int initSize = Math.min(INITIAL_THREAD_POOL_SIZE, maxThreadPoolSize);
+    threadPool = new ThreadPoolExecutor(initSize, Integer.MAX_VALUE, 1,
+        TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
+
+    eventDispatcherThread = new Thread() {
+      @Override
+      public void run() {
+        ContainerEvent event = null;
+        Set<String> allNodes = new HashSet<String>();
+
+        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+          try {
+            event = events.take();
+          } catch (InterruptedException e) {
+            if (!stopped.get()) {
+              LOG.error("Returning, thread interrupted", e);
+            }
+            return;
+          }
+
+          allNodes.add(event.getNodeId().toString());
+
+          int threadPoolSize = threadPool.getCorePoolSize();
+
+          // We can increase the pool size only if haven't reached the maximum
+          // limit yet.
+          if (threadPoolSize != maxThreadPoolSize) {
+
+            // nodes where containers will run at *this* point of time. This is
+            // *not* the cluster size and doesn't need to be.
+            int nodeNum = allNodes.size();
+            int idealThreadPoolSize = Math.min(maxThreadPoolSize, nodeNum);
+
+            if (threadPoolSize < idealThreadPoolSize) {
+              // Bump up the pool size to idealThreadPoolSize +
+              // INITIAL_POOL_SIZE, the later is just a buffer so we are not
+              // always increasing the pool-size
+              int newThreadPoolSize = Math.min(maxThreadPoolSize,
+                  idealThreadPoolSize + INITIAL_THREAD_POOL_SIZE);
+              LOG.info("Set NMClientAsync thread pool size to " +
+                  newThreadPoolSize + " as the number of nodes to talk to is "
+                  + nodeNum);
+              threadPool.setCorePoolSize(newThreadPoolSize);
+            }
+          }
+
+          // the events from the queue are handled in parallel with a thread
+          // pool
+          threadPool.execute(getContainerEventProcessor(event));
+
+          // TODO: Group launching of multiple containers to a single
+          // NodeManager into a single connection
+        }
+      }
+    };
+    eventDispatcherThread.setName("Container  Event Dispatcher");
+    eventDispatcherThread.setDaemon(false);
+    eventDispatcherThread.start();
+
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (stopped.getAndSet(true)) {
+      // return if already stopped
+      return;
+    }
+    if (eventDispatcherThread != null) {
+      eventDispatcherThread.interrupt();
+      try {
+        eventDispatcherThread.join();
+      } catch (InterruptedException e) {
+        LOG.error("The thread of " + eventDispatcherThread.getName() +
+                  " didn't finish normally.", e);
+      }
+    }
+    if (threadPool != null) {
+      threadPool.shutdownNow();
+    }
+    if (client != null) {
+      // If NMClientImpl doesn't stop running containers, the states doesn't
+      // need to be cleared.
+      if (!(client instanceof NMClientImpl) ||
+          ((NMClientImpl) client).getCleanupRunningContainers().get()) {
+        if (containers != null) {
+          containers.clear();
+        }
+      }
+      client.stop();
+    }
+    super.serviceStop();
+  }
+
+  public void startContainerAsync(
+      Container container, ContainerLaunchContext containerLaunchContext) {
+    if (containers.putIfAbsent(container.getId(),
+        new StatefulContainer(this, container.getId())) != null) {
+      callbackHandler.onStartContainerError(container.getId(),
+          RPCUtil.getRemoteException("Container " + container.getId() +
+              " is already started or scheduled to start"));
+    }
+    try {
+      events.put(new StartContainerEvent(container, containerLaunchContext));
+    } catch (InterruptedException e) {
+      LOG.warn("Exception when scheduling the event of starting Container " +
+          container.getId());
+      callbackHandler.onStartContainerError(container.getId(), e);
+    }
+  }
+
+  public void stopContainerAsync(ContainerId containerId, NodeId nodeId,
+      Token containerToken) {
+    if (containers.get(containerId) == null) {
+      callbackHandler.onStopContainerError(containerId,
+          RPCUtil.getRemoteException("Container " + containerId +
+              " is neither started nor scheduled to start"));
+    }
+    try {
+      events.put(new ContainerEvent(containerId, nodeId, containerToken,
+          ContainerEventType.STOP_CONTAINER));
+    } catch (InterruptedException e) {
+      LOG.warn("Exception when scheduling the event of stopping Container " +
+          containerId);
+      callbackHandler.onStopContainerError(containerId, e);
+    }
+  }
+
+  public void getContainerStatusAsync(ContainerId containerId, NodeId nodeId,
+      Token containerToken) {
+    try {
+      events.put(new ContainerEvent(containerId, nodeId, containerToken,
+          ContainerEventType.QUERY_CONTAINER));
+    } catch (InterruptedException e) {
+      LOG.warn("Exception when scheduling the event of querying the status" +
+          " of Container " + containerId);
+      callbackHandler.onGetContainerStatusError(containerId, e);
+    }
+  }
+
+  protected static enum ContainerState {
+    PREP, FAILED, RUNNING, DONE,
+  }
+
+  protected boolean isCompletelyDone(StatefulContainer container) {
+    return container.getState() == ContainerState.DONE ||
+        container.getState() == ContainerState.FAILED;
+  }
+
+  protected ContainerEventProcessor getContainerEventProcessor(
+      ContainerEvent event) {
+    return new ContainerEventProcessor(event);
+  }
+
+  /**
+   * The type of the event of interacting with a container
+   */
+  protected static enum ContainerEventType {
+    START_CONTAINER,
+    STOP_CONTAINER,
+    QUERY_CONTAINER
+  }
+
+  protected static class ContainerEvent
+      extends AbstractEvent<ContainerEventType>{
+    private ContainerId containerId;
+    private NodeId nodeId;
+    private Token containerToken;
+
+    public ContainerEvent(ContainerId containerId, NodeId nodeId,
+        Token containerToken, ContainerEventType type) {
+      super(type);
+      this.containerId = containerId;
+      this.nodeId = nodeId;
+      this.containerToken = containerToken;
+    }
+
+    public ContainerId getContainerId() {
+      return containerId;
+    }
+
+    public NodeId getNodeId() {
+      return nodeId;
+    }
+
+    public Token getContainerToken() {
+      return containerToken;
+    }
+  }
+
+  protected static class StartContainerEvent extends ContainerEvent {
+    private Container container;
+    private ContainerLaunchContext containerLaunchContext;
+
+    public StartContainerEvent(Container container,
+        ContainerLaunchContext containerLaunchContext) {
+      super(container.getId(), container.getNodeId(),
+          container.getContainerToken(), ContainerEventType.START_CONTAINER);
+      this.container = container;
+      this.containerLaunchContext = containerLaunchContext;
+    }
+
+    public Container getContainer() {
+      return container;
+    }
+
+    public ContainerLaunchContext getContainerLaunchContext() {
+      return containerLaunchContext;
+    }
+  }
+
+  protected static class StatefulContainer implements
+      EventHandler<ContainerEvent> {
+
+    protected final static StateMachineFactory<StatefulContainer,
+        ContainerState, ContainerEventType, ContainerEvent> stateMachineFactory
+            = new StateMachineFactory<StatefulContainer, ContainerState,
+                ContainerEventType, ContainerEvent>(ContainerState.PREP)
+
+            // Transitions from PREP state
+            .addTransition(ContainerState.PREP,
+                EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
+                ContainerEventType.START_CONTAINER,
+                new StartContainerTransition())
+            .addTransition(ContainerState.PREP, ContainerState.DONE,
+                ContainerEventType.STOP_CONTAINER, new OutOfOrderTransition())
+
+            // Transitions from RUNNING state
+            // RUNNING -> RUNNING should be the invalid transition
+            .addTransition(ContainerState.RUNNING,
+                EnumSet.of(ContainerState.DONE, ContainerState.FAILED),
+                ContainerEventType.STOP_CONTAINER,
+                new StopContainerTransition())
+
+            // Transition from DONE state
+            .addTransition(ContainerState.DONE, ContainerState.DONE,
+                EnumSet.of(ContainerEventType.START_CONTAINER,
+                    ContainerEventType.STOP_CONTAINER))
+
+            // Transition from FAILED state
+            .addTransition(ContainerState.FAILED, ContainerState.FAILED,
+                EnumSet.of(ContainerEventType.START_CONTAINER,
+                    ContainerEventType.STOP_CONTAINER));
+
+    protected static class StartContainerTransition implements
+        MultipleArcTransition<StatefulContainer, ContainerEvent,
+        ContainerState> {
+
+      @Override
+      public ContainerState transition(
+          StatefulContainer container, ContainerEvent event) {
+        ContainerId containerId = event.getContainerId();
+        try {
+          StartContainerEvent scEvent = null;
+          if (event instanceof StartContainerEvent) {
+            scEvent = (StartContainerEvent) event;
+          }
+          assert scEvent != null;
+          Map<String, ByteBuffer> allServiceResponse =
+              container.nmClientAsync.getClient().startContainer(
+                  scEvent.getContainer(), scEvent.getContainerLaunchContext());
+          try {
+            container.nmClientAsync.getCallbackHandler().onContainerStarted(
+                containerId, allServiceResponse);
+          } catch (Throwable thr) {
+            // Don't process user created unchecked exception
+            LOG.info("Unchecked exception is thrown from onContainerStarted for "
+                + "Container " + containerId, thr);
+          }
+          return ContainerState.RUNNING;
+        } catch (YarnException e) {
+          return onExceptionRaised(container, event, e);
+        } catch (IOException e) {
+          return onExceptionRaised(container, event, e);
+        } catch (Throwable t) {
+          return onExceptionRaised(container, event, t);
+        }
+      }
+
+      private ContainerState onExceptionRaised(StatefulContainer container,
+          ContainerEvent event, Throwable t) {
+        try {
+          container.nmClientAsync.getCallbackHandler().onStartContainerError(
+              event.getContainerId(), t);
+        } catch (Throwable thr) {
+          // Don't process user created unchecked exception
+          LOG.info(
+              "Unchecked exception is thrown from onStartContainerError for " +
+                  "Container " + event.getContainerId(), thr);
+        }
+        return ContainerState.FAILED;
+      }
+    }
+
+    protected static class StopContainerTransition implements
+        MultipleArcTransition<StatefulContainer, ContainerEvent,
+        ContainerState> {
+
+      @Override
+      public ContainerState transition(
+          StatefulContainer container, ContainerEvent event) {
+        ContainerId containerId = event.getContainerId();
+        try {
+          container.nmClientAsync.getClient().stopContainer(
+              containerId, event.getNodeId(), event.getContainerToken());
+          try {
+            container.nmClientAsync.getCallbackHandler().onContainerStopped(
+                event.getContainerId());
+          } catch (Throwable thr) {
+            // Don't process user created unchecked exception
+            LOG.info("Unchecked exception is thrown from onContainerStopped for "
+                + "Container " + event.getContainerId(), thr);
+          }
+          return ContainerState.DONE;
+        } catch (YarnException e) {
+          return onExceptionRaised(container, event, e);
+        } catch (IOException e) {
+          return onExceptionRaised(container, event, e);
+        } catch (Throwable t) {
+          return onExceptionRaised(container, event, t);
+        }
+      }
+
+      private ContainerState onExceptionRaised(StatefulContainer container,
+          ContainerEvent event, Throwable t) {
+        try {
+          container.nmClientAsync.getCallbackHandler().onStopContainerError(
+              event.getContainerId(), t);
+        } catch (Throwable thr) {
+          // Don't process user created unchecked exception
+          LOG.info("Unchecked exception is thrown from onStopContainerError for "
+              + "Container " + event.getContainerId(), thr);
+        }
+        return ContainerState.FAILED;
+      }
+    }
+
+    protected static class OutOfOrderTransition implements
+        SingleArcTransition<StatefulContainer, ContainerEvent> {
+
+      protected static final String STOP_BEFORE_START_ERROR_MSG =
+          "Container was killed before it was launched";
+
+      @Override
+      public void transition(StatefulContainer container, ContainerEvent event) {
+        try {
+          container.nmClientAsync.getCallbackHandler().onStartContainerError(
+              event.getContainerId(),
+              RPCUtil.getRemoteException(STOP_BEFORE_START_ERROR_MSG));
+        } catch (Throwable thr) {
+          // Don't process user created unchecked exception
+          LOG.info(
+              "Unchecked exception is thrown from onStartContainerError for " +
+                  "Container " + event.getContainerId(), thr);
+        }
+      }
+    }
+
+    private final NMClientAsync nmClientAsync;
+    private final ContainerId containerId;
+    private final StateMachine<ContainerState, ContainerEventType,
+        ContainerEvent> stateMachine;
+    private final ReadLock readLock;
+    private final WriteLock writeLock;
+
+    public StatefulContainer(NMClientAsync client, ContainerId containerId) {
+      this.nmClientAsync = client;
+      this.containerId = containerId;
+      stateMachine = stateMachineFactory.make(this);
+      ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+      readLock = lock.readLock();
+      writeLock = lock.writeLock();
+    }
+
+    @Override
+    public void handle(ContainerEvent event) {
+      writeLock.lock();
+      try {
+        try {
+          this.stateMachine.doTransition(event.getType(), event);
+        } catch (InvalidStateTransitonException e) {
+          LOG.error("Can't handle this event at current state", e);
+        }
+      } finally {
+        writeLock.unlock();
+      }
+    }
+
+    public ContainerId getContainerId() {
+      return containerId;
+    }
+
+    public ContainerState getState() {
+      readLock.lock();
+      try {
+        return stateMachine.getCurrentState();
+      } finally {
+        readLock.unlock();
+      }
+    }
+  }
+
+  protected class ContainerEventProcessor implements Runnable {
+    protected ContainerEvent event;
+
+    public ContainerEventProcessor(ContainerEvent event) {
+      this.event = event;
+    }
+
+    @Override
+    public void run() {
+      ContainerId containerId = event.getContainerId();
+      LOG.info("Processing Event " + event + " for Container " + containerId);
+      if (event.getType() == ContainerEventType.QUERY_CONTAINER) {
+        try {
+          ContainerStatus containerStatus = client.getContainerStatus(
+              containerId, event.getNodeId(), event.getContainerToken());
+          try {
+            callbackHandler.onContainerStatusReceived(
+                containerId, containerStatus);
+          } catch (Throwable thr) {
+            // Don't process user created unchecked exception
+            LOG.info(
+                "Unchecked exception is thrown from onContainerStatusReceived" +
+                    " for Container " + event.getContainerId(), thr);
+          }
+        } catch (YarnException e) {
+          onExceptionRaised(containerId, e);
+        } catch (IOException e) {
+          onExceptionRaised(containerId, e);
+        } catch (Throwable t) {
+          onExceptionRaised(containerId, t);
+        }
+      } else {
+        StatefulContainer container = containers.get(containerId);
+        if (container == null) {
+          LOG.info("Container " + containerId + " is already stopped or failed");
+        } else {
+          container.handle(event);
+          if (isCompletelyDone(container)) {
+            containers.remove(containerId);
+          }
+        }
+      }
+    }
+
+    private void onExceptionRaised(ContainerId containerId, Throwable t) {
+      try {
+        callbackHandler.onGetContainerStatusError(containerId, t);
+      } catch (Throwable thr) {
+        // Don't process user created unchecked exception
+        LOG.info("Unchecked exception is thrown from onGetContainerStatusError" +
+            " for Container " + containerId, thr);
+      }
+    }
+  }
+
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/package-info.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/package-info.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/package-info.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.client.api.async.impl;
+import org.apache.hadoop.classification.InterfaceAudience;
+

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/package-info.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/package-info.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/package-info.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.client.api.async;
+import org.apache.hadoop.classification.InterfaceAudience;
+

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,585 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.RackResolver;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+
+// TODO check inputs for null etc. YARN-654
+
+@Private
+@Unstable
+public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
+
+  private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class);
+  
+  private final RecordFactory recordFactory =
+      RecordFactoryProvider.getRecordFactory(null);
+  
+  private int lastResponseId = 0;
+  private ConcurrentHashMap<String, Token> nmTokens;
+
+  protected ApplicationMasterProtocol rmClient;
+  protected final ApplicationAttemptId appAttemptId;  
+  protected Resource clusterAvailableResources;
+  protected int clusterNodeCount;
+  
+  class ResourceRequestInfo {
+    ResourceRequest remoteRequest;
+    LinkedHashSet<T> containerRequests;
+    
+    ResourceRequestInfo(Priority priority, String resourceName,
+        Resource capability) {
+      remoteRequest = ResourceRequest.newInstance(priority, resourceName,
+          capability, 0);
+      containerRequests = new LinkedHashSet<T>();
+    }
+  }
+  
+  
+  /**
+   * Class compares Resource by memory then cpu in reverse order
+   */
+  class ResourceReverseMemoryThenCpuComparator implements Comparator<Resource> {
+    @Override
+    public int compare(Resource arg0, Resource arg1) {
+      int mem0 = arg0.getMemory();
+      int mem1 = arg1.getMemory();
+      int cpu0 = arg0.getVirtualCores();
+      int cpu1 = arg1.getVirtualCores();
+      if(mem0 == mem1) {
+        if(cpu0 == cpu1) {
+          return 0;
+        }
+        if(cpu0 < cpu1) {
+          return 1;
+        }
+        return -1;
+      }
+      if(mem0 < mem1) { 
+        return 1;
+      }
+      return -1;
+    }    
+  }
+  
+  static boolean canFit(Resource arg0, Resource arg1) {
+    int mem0 = arg0.getMemory();
+    int mem1 = arg1.getMemory();
+    int cpu0 = arg0.getVirtualCores();
+    int cpu1 = arg1.getVirtualCores();
+    
+    if(mem0 <= mem1 && cpu0 <= cpu1) { 
+      return true;
+    }
+    return false; 
+  }
+  
+  //Key -> Priority
+  //Value -> Map
+  //Key->ResourceName (e.g., nodename, rackname, *)
+  //Value->Map
+  //Key->Resource Capability
+  //Value->ResourceRequest
+  protected final 
+  Map<Priority, Map<String, TreeMap<Resource, ResourceRequestInfo>>>
+    remoteRequestsTable =
+    new TreeMap<Priority, Map<String, TreeMap<Resource, ResourceRequestInfo>>>();
+
+  protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
+      new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
+  protected final Set<ContainerId> release = new TreeSet<ContainerId>();
+  
+  public AMRMClientImpl(ApplicationAttemptId appAttemptId) {
+    super(AMRMClientImpl.class.getName());
+    this.appAttemptId = appAttemptId;
+    this.nmTokens = new ConcurrentHashMap<String, Token>();
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    RackResolver.init(conf);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    final YarnConfiguration conf = new YarnConfiguration(getConfig());
+    final YarnRPC rpc = YarnRPC.create(conf);
+    final InetSocketAddress rmAddress = conf.getSocketAddr(
+        YarnConfiguration.RM_SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+
+    UserGroupInformation currentUser;
+    try {
+      currentUser = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new YarnRuntimeException(e);
+    }
+
+    // CurrentUser should already have AMToken loaded.
+    rmClient = currentUser.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
+      @Override
+      public ApplicationMasterProtocol run() {
+        return (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol.class, rmAddress,
+            conf);
+      }
+    });
+    LOG.debug("Connecting to ResourceManager at " + rmAddress);
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (this.rmClient != null) {
+      RPC.stopProxy(this.rmClient);
+    }
+    super.serviceStop();
+  }
+  
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      String appHostName, int appHostPort, String appTrackingUrl)
+      throws YarnException, IOException {
+    // do this only once ???
+    RegisterApplicationMasterRequest request = recordFactory
+        .newRecordInstance(RegisterApplicationMasterRequest.class);
+    synchronized (this) {
+      request.setApplicationAttemptId(appAttemptId);      
+    }
+    request.setHost(appHostName);
+    request.setRpcPort(appHostPort);
+    if(appTrackingUrl != null) {
+      request.setTrackingUrl(appTrackingUrl);
+    }
+    RegisterApplicationMasterResponse response = rmClient
+        .registerApplicationMaster(request);
+    return response;
+  }
+
+  @Override
+  public AllocateResponse allocate(float progressIndicator) 
+      throws YarnException, IOException {
+    AllocateResponse allocateResponse = null;
+    ArrayList<ResourceRequest> askList = null;
+    ArrayList<ContainerId> releaseList = null;
+    AllocateRequest allocateRequest = null;
+    
+    try {
+      synchronized (this) {
+        askList = new ArrayList<ResourceRequest>(ask);
+        releaseList = new ArrayList<ContainerId>(release);
+        // optimistically clear this collection assuming no RPC failure
+        ask.clear();
+        release.clear();
+        allocateRequest =
+            AllocateRequest.newInstance(appAttemptId, lastResponseId,
+              progressIndicator, askList, releaseList, null);
+      }
+
+      allocateResponse = rmClient.allocate(allocateRequest);
+
+      synchronized (this) {
+        // update these on successful RPC
+        clusterNodeCount = allocateResponse.getNumClusterNodes();
+        lastResponseId = allocateResponse.getResponseId();
+        clusterAvailableResources = allocateResponse.getAvailableResources();
+        if (!allocateResponse.getNMTokens().isEmpty()) {
+          populateNMTokens(allocateResponse);
+        }
+      }
+    } finally {
+      // TODO how to differentiate remote yarn exception vs error in rpc
+      if(allocateResponse == null) {
+        // we hit an exception in allocate()
+        // preserve ask and release for next call to allocate()
+        synchronized (this) {
+          release.addAll(releaseList);
+          // requests could have been added or deleted during call to allocate
+          // If requests were added/removed then there is nothing to do since
+          // the ResourceRequest object in ask would have the actual new value.
+          // If ask does not have this ResourceRequest then it was unchanged and
+          // so we can add the value back safely.
+          // This assumes that there will no concurrent calls to allocate() and
+          // so we dont have to worry about ask being changed in the
+          // synchronized block at the beginning of this method.
+          for(ResourceRequest oldAsk : askList) {
+            if(!ask.contains(oldAsk)) {
+              ask.add(oldAsk);
+            }
+          }
+        }
+      }
+    }
+    return allocateResponse;
+  }
+
+  @Private
+  @VisibleForTesting
+  protected void populateNMTokens(AllocateResponse allocateResponse) {
+    for (NMToken token : allocateResponse.getNMTokens()) {
+      String nodeId = token.getNodeId().toString();
+      if (nmTokens.containsKey(nodeId)) {
+        LOG.debug("Replacing token for : " + nodeId);
+      } else {
+        LOG.debug("Received new token for : " + nodeId);
+      }
+      nmTokens.put(nodeId, token.getToken());
+    }
+  }
+
+  @Override
+  public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
+      String appMessage, String appTrackingUrl) throws YarnException,
+      IOException {
+    FinishApplicationMasterRequest request = recordFactory
+                  .newRecordInstance(FinishApplicationMasterRequest.class);
+    request.setAppAttemptId(appAttemptId);
+    request.setFinalApplicationStatus(appStatus);
+    if(appMessage != null) {
+      request.setDiagnostics(appMessage);
+    }
+    if(appTrackingUrl != null) {
+      request.setTrackingUrl(appTrackingUrl);
+    }
+    rmClient.finishApplicationMaster(request);
+  }
+  
+  @Override
+  public synchronized void addContainerRequest(T req) {
+    Set<String> allRacks = new HashSet<String>();
+    if (req.getRacks() != null) {
+      allRacks.addAll(req.getRacks());
+      if(req.getRacks().size() != allRacks.size()) {
+        Joiner joiner = Joiner.on(',');
+        LOG.warn("ContainerRequest has duplicate racks: "
+            + joiner.join(req.getRacks()));
+      }
+    }
+    allRacks.addAll(resolveRacks(req.getNodes()));
+    
+    if (req.getNodes() != null) {
+      HashSet<String> dedupedNodes = new HashSet<String>(req.getNodes());
+      if(dedupedNodes.size() != req.getNodes().size()) {
+        Joiner joiner = Joiner.on(',');
+        LOG.warn("ContainerRequest has duplicate nodes: "
+            + joiner.join(req.getNodes()));        
+      }
+      for (String node : dedupedNodes) {
+        // Ensure node requests are accompanied by requests for
+        // corresponding rack
+        addResourceRequest(req.getPriority(), node, req.getCapability(),
+            req.getContainerCount(), req);
+      }
+    }
+
+    for (String rack : allRacks) {
+      addResourceRequest(req.getPriority(), rack, req.getCapability(),
+          req.getContainerCount(), req);
+    }
+
+    // Off-switch
+    addResourceRequest(req.getPriority(), ResourceRequest.ANY, req.getCapability(),
+        req.getContainerCount(), req);
+  }
+
+  @Override
+  public synchronized void removeContainerRequest(T req) {
+    Set<String> allRacks = new HashSet<String>();
+    if (req.getRacks() != null) {
+      allRacks.addAll(req.getRacks());
+    }
+    allRacks.addAll(resolveRacks(req.getNodes()));
+
+    // Update resource requests
+    if (req.getNodes() != null) {
+      for (String node : new HashSet<String>(req.getNodes())) {
+        decResourceRequest(req.getPriority(), node, req.getCapability(),
+            req.getContainerCount(), req);
+      }
+    }
+
+    for (String rack : allRacks) {
+      decResourceRequest(req.getPriority(), rack, req.getCapability(),
+          req.getContainerCount(), req);
+    }
+
+    decResourceRequest(req.getPriority(), ResourceRequest.ANY, req.getCapability(),
+        req.getContainerCount(), req);
+  }
+
+  @Override
+  public synchronized void releaseAssignedContainer(ContainerId containerId) {
+    release.add(containerId);
+  }
+  
+  @Override
+  public synchronized Resource getClusterAvailableResources() {
+    return clusterAvailableResources;
+  }
+  
+  @Override
+  public synchronized int getClusterNodeCount() {
+    return clusterNodeCount;
+  }
+  
+  @Override
+  public synchronized List<? extends Collection<T>> getMatchingRequests(
+                                          Priority priority, 
+                                          String resourceName, 
+                                          Resource capability) {
+    List<LinkedHashSet<T>> list = new LinkedList<LinkedHashSet<T>>();
+    Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests = 
+        this.remoteRequestsTable.get(priority);
+    if (remoteRequests == null) {
+      return list;
+    }
+    TreeMap<Resource, ResourceRequestInfo> reqMap = remoteRequests
+        .get(resourceName);
+    if (reqMap == null) {
+      return list;
+    }
+
+    ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
+    if (resourceRequestInfo != null) {
+      list.add(resourceRequestInfo.containerRequests);
+      return list;
+    }
+    
+    // no exact match. Container may be larger than what was requested.
+    // get all resources <= capability. map is reverse sorted. 
+    SortedMap<Resource, ResourceRequestInfo> tailMap = 
+                                                  reqMap.tailMap(capability);
+    for(Map.Entry<Resource, ResourceRequestInfo> entry : tailMap.entrySet()) {
+      if(canFit(entry.getKey(), capability)) {
+        // match found that fits in the larger resource
+        list.add(entry.getValue().containerRequests);
+      }
+    }
+    
+    // no match found
+    return list;          
+  }
+  
+  private Set<String> resolveRacks(List<String> nodes) {
+    Set<String> racks = new HashSet<String>();    
+    if (nodes != null) {
+      for (String node : nodes) {
+        // Ensure node requests are accompanied by requests for
+        // corresponding rack
+        String rack = RackResolver.resolve(node).getNetworkLocation();
+        if (rack == null) {
+          LOG.warn("Failed to resolve rack for node " + node + ".");
+        } else {
+          racks.add(rack);
+        }
+      }
+    }
+    
+    return racks;
+  }
+  
+  private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
+    // This code looks weird but is needed because of the following scenario.
+    // A ResourceRequest is removed from the remoteRequestTable. A 0 container 
+    // request is added to 'ask' to notify the RM about not needing it any more.
+    // Before the call to allocate, the user now requests more containers. If 
+    // the locations of the 0 size request and the new request are the same
+    // (with the difference being only container count), then the set comparator
+    // will consider both to be the same and not add the new request to ask. So 
+    // we need to check for the "same" request being present and remove it and 
+    // then add it back. The comparator is container count agnostic.
+    // This should happen only rarely but we do need to guard against it.
+    if(ask.contains(remoteRequest)) {
+      ask.remove(remoteRequest);
+    }
+    ask.add(remoteRequest);
+  }
+
+  private void addResourceRequest(Priority priority, String resourceName,
+      Resource capability, int containerCount, T req) {
+    Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
+      this.remoteRequestsTable.get(priority);
+    if (remoteRequests == null) {
+      remoteRequests = 
+          new HashMap<String, TreeMap<Resource, ResourceRequestInfo>>();
+      this.remoteRequestsTable.put(priority, remoteRequests);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Added priority=" + priority);
+      }
+    }
+    TreeMap<Resource, ResourceRequestInfo> reqMap = 
+                                          remoteRequests.get(resourceName);
+    if (reqMap == null) {
+      // capabilities are stored in reverse sorted order. smallest last.
+      reqMap = new TreeMap<Resource, ResourceRequestInfo>(
+          new ResourceReverseMemoryThenCpuComparator());
+      remoteRequests.put(resourceName, reqMap);
+    }
+    ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
+    if (resourceRequestInfo == null) {
+      resourceRequestInfo =
+          new ResourceRequestInfo(priority, resourceName, capability);
+      reqMap.put(capability, resourceRequestInfo);
+    }
+    
+    resourceRequestInfo.remoteRequest.setNumContainers(
+         resourceRequestInfo.remoteRequest.getNumContainers() + containerCount);
+
+    if(req instanceof StoredContainerRequest) {
+      resourceRequestInfo.containerRequests.add(req);
+    }
+
+    // Note this down for next interaction with ResourceManager
+    addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("addResourceRequest:" + " applicationId="
+          + appAttemptId + " priority=" + priority.getPriority()
+          + " resourceName=" + resourceName + " numContainers="
+          + resourceRequestInfo.remoteRequest.getNumContainers() 
+          + " #asks=" + ask.size());
+    }
+  }
+
+  private void decResourceRequest(Priority priority, 
+                                   String resourceName,
+                                   Resource capability, 
+                                   int containerCount, 
+                                   T req) {
+    Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
+      this.remoteRequestsTable.get(priority);
+    
+    if(remoteRequests == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not decrementing resource as priority " + priority 
+            + " is not present in request table");
+      }
+      return;
+    }
+    
+    Map<Resource, ResourceRequestInfo> reqMap = remoteRequests.get(resourceName);
+    if (reqMap == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not decrementing resource as " + resourceName
+            + " is not present in request table");
+      }
+      return;
+    }
+    ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("BEFORE decResourceRequest:" + " applicationId="
+          + appAttemptId + " priority=" + priority.getPriority()
+          + " resourceName=" + resourceName + " numContainers="
+          + resourceRequestInfo.remoteRequest.getNumContainers() 
+          + " #asks=" + ask.size());
+    }
+
+    resourceRequestInfo.remoteRequest.setNumContainers(
+        resourceRequestInfo.remoteRequest.getNumContainers() - containerCount);
+
+    if(req instanceof StoredContainerRequest) {
+      resourceRequestInfo.containerRequests.remove(req);
+    }
+    
+    if(resourceRequestInfo.remoteRequest.getNumContainers() < 0) {
+      // guard against spurious removals
+      resourceRequestInfo.remoteRequest.setNumContainers(0);
+    }
+    // send the ResourceRequest to RM even if is 0 because it needs to override
+    // a previously sent value. If ResourceRequest was not sent previously then
+    // sending 0 aught to be a no-op on RM
+    addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
+
+    // delete entries from map if no longer needed
+    if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) {
+      reqMap.remove(capability);
+      if (reqMap.size() == 0) {
+        remoteRequests.remove(resourceName);
+      }
+      if (remoteRequests.size() == 0) {
+        remoteRequestsTable.remove(priority);
+      }
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.info("AFTER decResourceRequest:" + " applicationId="
+          + appAttemptId + " priority=" + priority.getPriority()
+          + " resourceName=" + resourceName + " numContainers="
+          + resourceRequestInfo.remoteRequest.getNumContainers() 
+          + " #asks=" + ask.size());
+    }
+  }
+
+  @Override
+  public ConcurrentHashMap<String, Token> getNMTokens() {
+    return nmTokens;
+  }
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,409 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.util.ProtoUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * <p>
+ * This class implements {@link NMClient}. All the APIs are blocking.
+ * </p>
+ *
+ * <p>
+ * By default, this client stops all the running containers that are started by
+ * it when it stops. It can be disabled via
+ * {@link #cleanupRunningContainersOnStop}, in which case containers will
+ * continue to run even after this client is stopped and till the application
+ * runs at which point ResourceManager will forcefully kill them.
+ * </p>
+ *
+ * <p>
+ * Note that the blocking APIs ensure the RPC calls to <code>NodeManager</code>
+ * are executed immediately, and the responses are received before these APIs
+ * return. However, when {@link #startContainer} or {@link #stopContainer}
+ * returns, <code>NodeManager</code> may still need some time to either start
+ * or stop the container because of its asynchronous implementation. Therefore,
+ * {@link #getContainerStatus} is likely to return a transit container status
+ * if it is executed immediately after {@link #startContainer} or
+ * {@link #stopContainer}.
+ * </p>
+ */
+@Private
+@Unstable
+public class NMClientImpl extends NMClient {
+
+  private static final Log LOG = LogFactory.getLog(NMClientImpl.class);
+
+  // The logically coherent operations on startedContainers is synchronized to
+  // ensure they are atomic
+  protected ConcurrentMap<ContainerId, StartedContainer> startedContainers =
+      new ConcurrentHashMap<ContainerId, StartedContainer>();
+
+  //enabled by default
+  private final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
+
+  public NMClientImpl() {
+    super(NMClientImpl.class.getName());
+  }
+
+  public NMClientImpl(String name) {
+    super(name);
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    // Usually, started-containers are stopped when this client stops. Unless
+    // the flag cleanupRunningContainers is set to false.
+    if (getCleanupRunningContainers().get()) {
+      cleanupRunningContainers();
+    }
+    super.serviceStop();
+  }
+
+  protected synchronized void cleanupRunningContainers() {
+    for (StartedContainer startedContainer : startedContainers.values()) {
+      try {
+        stopContainer(startedContainer.getContainerId(),
+            startedContainer.getNodeId(),
+            startedContainer.getContainerToken());
+      } catch (YarnException e) {
+        LOG.error("Failed to stop Container " +
+            startedContainer.getContainerId() +
+            "when stopping NMClientImpl");
+      } catch (IOException e) {
+        LOG.error("Failed to stop Container " +
+            startedContainer.getContainerId() +
+            "when stopping NMClientImpl");
+      }
+    }
+  }
+
+  @Override
+  public void cleanupRunningContainersOnStop(boolean enabled) {
+    getCleanupRunningContainers().set(enabled);
+  }
+
+  protected static class StartedContainer {
+    private ContainerId containerId;
+    private NodeId nodeId;
+    private Token containerToken;
+    private boolean stopped;
+
+    public StartedContainer(ContainerId containerId, NodeId nodeId,
+        Token containerToken) {
+      this.containerId = containerId;
+      this.nodeId = nodeId;
+      this.containerToken = containerToken;
+      stopped = false;
+    }
+
+    public ContainerId getContainerId() {
+      return containerId;
+    }
+
+    public NodeId getNodeId() {
+      return nodeId;
+    }
+
+    public Token getContainerToken() {
+      return containerToken;
+    }
+  }
+
+  protected static final class NMCommunicator extends AbstractService {
+    private ContainerId containerId;
+    private NodeId nodeId;
+    private Token containerToken;
+    private ContainerManagementProtocol containerManager;
+
+    public NMCommunicator(ContainerId containerId, NodeId nodeId,
+        Token containerToken) {
+      super(NMCommunicator.class.getName());
+      this.containerId = containerId;
+      this.nodeId = nodeId;
+      this.containerToken = containerToken;
+    }
+
+    @Override
+    protected synchronized void serviceStart() throws Exception {
+      final YarnRPC rpc = YarnRPC.create(getConfig());
+
+      final InetSocketAddress containerAddress =
+          NetUtils.createSocketAddr(nodeId.toString());
+
+      // the user in createRemoteUser in this context has to be ContainerId
+      UserGroupInformation currentUser =
+          UserGroupInformation.createRemoteUser(containerId.toString());
+
+      org.apache.hadoop.security.token.Token<ContainerTokenIdentifier> token =
+          ProtoUtils.convertFromProtoFormat(containerToken, containerAddress);
+      currentUser.addToken(token);
+
+      containerManager = currentUser
+          .doAs(new PrivilegedAction<ContainerManagementProtocol>() {
+            @Override
+            public ContainerManagementProtocol run() {
+              return (ContainerManagementProtocol) rpc.getProxy(ContainerManagementProtocol.class,
+                  containerAddress, getConfig());
+            }
+          });
+
+      LOG.debug("Connecting to ContainerManager at " + containerAddress);
+      super.serviceStart();
+    }
+
+    @Override
+    protected synchronized void serviceStop() throws Exception {
+      if (this.containerManager != null) {
+        RPC.stopProxy(this.containerManager);
+
+        if (LOG.isDebugEnabled()) {
+          InetSocketAddress containerAddress =
+              NetUtils.createSocketAddr(nodeId.toString());
+          LOG.debug("Disconnecting from ContainerManager at " +
+              containerAddress);
+        }
+      }
+      super.serviceStop();
+    }
+
+    public synchronized Map<String, ByteBuffer> startContainer(
+        Container container, ContainerLaunchContext containerLaunchContext)
+            throws YarnException, IOException {
+      if (!container.getId().equals(containerId)) {
+        throw new IllegalArgumentException(
+            "NMCommunicator's containerId  mismatches the given Container's");
+      }
+      StartContainerResponse startResponse = null;
+      try {
+        StartContainerRequest startRequest =
+            Records.newRecord(StartContainerRequest.class);
+        startRequest.setContainerToken(container.getContainerToken());
+        startRequest.setContainerLaunchContext(containerLaunchContext);
+        startResponse = containerManager.startContainer(startRequest);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Started Container " + containerId);
+        }
+      } catch (YarnException e) {
+        LOG.warn("Container " + containerId + " failed to start", e);
+        throw e;
+      } catch (IOException e) {
+        LOG.warn("Container " + containerId + " failed to start", e);
+        throw e;
+      }
+      return startResponse.getAllServiceResponse();
+    }
+
+    public synchronized void stopContainer() throws YarnException,
+        IOException {
+      try {
+        StopContainerRequest stopRequest =
+            Records.newRecord(StopContainerRequest.class);
+        stopRequest.setContainerId(containerId);
+        containerManager.stopContainer(stopRequest);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Stopped Container " + containerId);
+        }
+      } catch (YarnException e) {
+        LOG.warn("Container " + containerId + " failed to stop", e);
+        throw e;
+      } catch (IOException e) {
+        LOG.warn("Container " + containerId + " failed to stop", e);
+        throw e;
+      }
+    }
+
+    public synchronized ContainerStatus getContainerStatus()
+        throws YarnException, IOException {
+      GetContainerStatusResponse statusResponse = null;
+      try {
+        GetContainerStatusRequest statusRequest =
+            Records.newRecord(GetContainerStatusRequest.class);
+        statusRequest.setContainerId(containerId);
+        statusResponse = containerManager.getContainerStatus(statusRequest);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Got the status of Container " + containerId);
+        }
+      } catch (YarnException e) {
+        LOG.warn(
+            "Unable to get the status of Container " + containerId, e);
+        throw e;
+      } catch (IOException e) {
+        LOG.warn(
+            "Unable to get the status of Container " + containerId, e);
+        throw e;
+      }
+      return statusResponse.getStatus();
+    }
+  }
+
+  @Override
+  public Map<String, ByteBuffer> startContainer(
+      Container container, ContainerLaunchContext containerLaunchContext)
+          throws YarnException, IOException {
+    // Do synchronization on StartedContainer to prevent race condition
+    // between startContainer and stopContainer
+    synchronized (addStartedContainer(container)) {
+      Map<String, ByteBuffer> allServiceResponse;
+      NMCommunicator nmCommunicator = null;
+      try {
+        nmCommunicator = new NMCommunicator(container.getId(),
+            container.getNodeId(), container.getContainerToken());
+        nmCommunicator.init(getConfig());
+        nmCommunicator.start();
+        allServiceResponse =
+            nmCommunicator.startContainer(container, containerLaunchContext);
+      } catch (YarnException e) {
+        // Remove the started container if it failed to start
+        removeStartedContainer(container.getId());
+        throw e;
+      } catch (IOException e) {
+        removeStartedContainer(container.getId());
+        throw e;
+      } catch (Throwable t) {
+        removeStartedContainer(container.getId());
+        throw RPCUtil.getRemoteException(t);
+      } finally {
+        if (nmCommunicator != null) {
+          nmCommunicator.stop();
+        }
+      }
+      return allServiceResponse;
+    }
+
+    // Three choices:
+    // 1. starting and releasing the proxy before and after each interaction
+    // 2. starting the proxy when starting the container and releasing it when
+    // stopping the container
+    // 3. starting the proxy when starting the container and releasing it when
+    // stopping the client
+    // Adopt 1 currently
+  }
+
+  @Override
+  public void stopContainer(ContainerId containerId, NodeId nodeId,
+      Token containerToken) throws YarnException, IOException {
+    StartedContainer startedContainer = getStartedContainer(containerId);
+    if (startedContainer == null) {
+      throw RPCUtil.getRemoteException("Container " + containerId +
+          " is either not started yet or already stopped");
+    }
+    // Only allow one request of stopping the container to move forward
+    // When entering the block, check whether the precursor has already stopped
+    // the container
+    synchronized (startedContainer) {
+      if (startedContainer.stopped) {
+        return;
+      }
+      NMCommunicator nmCommunicator = null;
+      try {
+        nmCommunicator =
+            new NMCommunicator(containerId, nodeId, containerToken);
+        nmCommunicator.init(getConfig());
+        nmCommunicator.start();
+        nmCommunicator.stopContainer();
+      } finally {
+        if (nmCommunicator != null) {
+          nmCommunicator.stop();
+        }
+        startedContainer.stopped = true;
+        removeStartedContainer(containerId);
+      }
+    }
+  }
+
+  @Override
+  public ContainerStatus getContainerStatus(ContainerId containerId,
+      NodeId nodeId, Token containerToken)
+          throws YarnException, IOException {
+    NMCommunicator nmCommunicator = null;
+    try {
+      nmCommunicator = new NMCommunicator(containerId, nodeId, containerToken);
+      nmCommunicator.init(getConfig());
+      nmCommunicator.start();
+      ContainerStatus containerStatus = nmCommunicator.getContainerStatus();
+      return containerStatus;
+    } finally {
+      if (nmCommunicator != null) {
+        nmCommunicator.stop();
+      }
+    }
+  }
+
+  protected synchronized StartedContainer addStartedContainer(
+      Container container) throws YarnException, IOException {
+    if (startedContainers.containsKey(container.getId())) {
+      throw RPCUtil.getRemoteException("Container " + container.getId() +
+          " is already started");
+    }
+    StartedContainer startedContainer = new StartedContainer(container.getId(),
+        container.getNodeId(), container.getContainerToken());
+    startedContainers.put(startedContainer.getContainerId(), startedContainer);
+    return startedContainer;
+  }
+
+  protected synchronized void removeStartedContainer(ContainerId containerId) {
+    startedContainers.remove(containerId);
+  }
+
+  protected synchronized StartedContainer getStartedContainer(
+      ContainerId containerId) {
+    return startedContainers.get(containerId);
+  }
+
+  public AtomicBoolean getCleanupRunningContainers() {
+    return cleanupRunningContainers;
+  }
+
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,316 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.Records;
+
+import com.google.common.annotations.VisibleForTesting;
+
+@Private
+@Unstable
+public class YarnClientImpl extends YarnClient {
+
+  private static final Log LOG = LogFactory.getLog(YarnClientImpl.class);
+
+  protected ApplicationClientProtocol rmClient;
+  protected InetSocketAddress rmAddress;
+  protected long statePollIntervalMillis;
+
+  private static final String ROOT = "root";
+
+  public YarnClientImpl() {
+    this(null);
+  }
+  
+  public YarnClientImpl(InetSocketAddress rmAddress) {
+    this(YarnClientImpl.class.getName(), rmAddress);
+  }
+
+  public YarnClientImpl(String name, InetSocketAddress rmAddress) {
+    super(name);
+    this.rmAddress = rmAddress;
+  }
+
+  private static InetSocketAddress getRmAddress(Configuration conf) {
+    return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
+      YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    if (this.rmAddress == null) {
+      this.rmAddress = getRmAddress(conf);
+    }
+    statePollIntervalMillis = conf.getLong(
+        YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
+        YarnConfiguration.DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    YarnRPC rpc = YarnRPC.create(getConfig());
+
+    this.rmClient = (ApplicationClientProtocol) rpc.getProxy(
+        ApplicationClientProtocol.class, rmAddress, getConfig());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Connecting to ResourceManager at " + rmAddress);
+    }
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (this.rmClient != null) {
+      RPC.stopProxy(this.rmClient);
+    }
+    super.serviceStop();
+  }
+
+  @Override
+  public GetNewApplicationResponse getNewApplication()
+      throws YarnException, IOException {
+    GetNewApplicationRequest request =
+        Records.newRecord(GetNewApplicationRequest.class);
+    return rmClient.getNewApplication(request);
+  }
+
+  @Override
+  public ApplicationId
+      submitApplication(ApplicationSubmissionContext appContext)
+          throws YarnException, IOException {
+    ApplicationId applicationId = appContext.getApplicationId();
+    appContext.setApplicationId(applicationId);
+    SubmitApplicationRequest request =
+        Records.newRecord(SubmitApplicationRequest.class);
+    request.setApplicationSubmissionContext(appContext);
+    rmClient.submitApplication(request);
+
+    int pollCount = 0;
+    while (true) {
+      YarnApplicationState state =
+          getApplicationReport(applicationId).getYarnApplicationState();
+      if (!state.equals(YarnApplicationState.NEW) &&
+          !state.equals(YarnApplicationState.NEW_SAVING)) {
+        break;
+      }
+      // Notify the client through the log every 10 poll, in case the client
+      // is blocked here too long.
+      if (++pollCount % 10 == 0) {
+        LOG.info("Application submission is not finished, " +
+            "submitted application " + applicationId +
+            " is still in " + state);
+      }
+      try {
+        Thread.sleep(statePollIntervalMillis);
+      } catch (InterruptedException ie) {
+      }
+    }
+
+
+    LOG.info("Submitted application " + applicationId + " to ResourceManager"
+        + " at " + rmAddress);
+    return applicationId;
+  }
+
+  @Override
+  public void killApplication(ApplicationId applicationId)
+      throws YarnException, IOException {
+    LOG.info("Killing application " + applicationId);
+    KillApplicationRequest request =
+        Records.newRecord(KillApplicationRequest.class);
+    request.setApplicationId(applicationId);
+    rmClient.forceKillApplication(request);
+  }
+
+  @Override
+  public ApplicationReport getApplicationReport(ApplicationId appId)
+      throws YarnException, IOException {
+    GetApplicationReportRequest request =
+        Records.newRecord(GetApplicationReportRequest.class);
+    request.setApplicationId(appId);
+    GetApplicationReportResponse response =
+        rmClient.getApplicationReport(request);
+    return response.getApplicationReport();
+  }
+
+  @Override
+  public List<ApplicationReport> getApplicationList()
+      throws YarnException, IOException {
+    GetAllApplicationsRequest request =
+        Records.newRecord(GetAllApplicationsRequest.class);
+    GetAllApplicationsResponse response = rmClient.getAllApplications(request);
+    return response.getApplicationList();
+  }
+
+  @Override
+  public YarnClusterMetrics getYarnClusterMetrics() throws YarnException,
+      IOException {
+    GetClusterMetricsRequest request =
+        Records.newRecord(GetClusterMetricsRequest.class);
+    GetClusterMetricsResponse response = rmClient.getClusterMetrics(request);
+    return response.getClusterMetrics();
+  }
+
+  @Override
+  public List<NodeReport> getNodeReports() throws YarnException,
+      IOException {
+    GetClusterNodesRequest request =
+        Records.newRecord(GetClusterNodesRequest.class);
+    GetClusterNodesResponse response = rmClient.getClusterNodes(request);
+    return response.getNodeReports();
+  }
+
+  @Override
+  public Token getRMDelegationToken(Text renewer)
+      throws YarnException, IOException {
+    /* get the token from RM */
+    GetDelegationTokenRequest rmDTRequest =
+        Records.newRecord(GetDelegationTokenRequest.class);
+    rmDTRequest.setRenewer(renewer.toString());
+    GetDelegationTokenResponse response =
+        rmClient.getDelegationToken(rmDTRequest);
+    return response.getRMDelegationToken();
+  }
+
+
+  private GetQueueInfoRequest
+      getQueueInfoRequest(String queueName, boolean includeApplications,
+          boolean includeChildQueues, boolean recursive) {
+    GetQueueInfoRequest request = Records.newRecord(GetQueueInfoRequest.class);
+    request.setQueueName(queueName);
+    request.setIncludeApplications(includeApplications);
+    request.setIncludeChildQueues(includeChildQueues);
+    request.setRecursive(recursive);
+    return request;
+  }
+
+  @Override
+  public QueueInfo getQueueInfo(String queueName) throws YarnException,
+      IOException {
+    GetQueueInfoRequest request =
+        getQueueInfoRequest(queueName, true, false, false);
+    Records.newRecord(GetQueueInfoRequest.class);
+    return rmClient.getQueueInfo(request).getQueueInfo();
+  }
+
+  @Override
+  public List<QueueUserACLInfo> getQueueAclsInfo() throws YarnException,
+      IOException {
+    GetQueueUserAclsInfoRequest request =
+        Records.newRecord(GetQueueUserAclsInfoRequest.class);
+    return rmClient.getQueueUserAcls(request).getUserAclsInfoList();
+  }
+
+  @Override
+  public List<QueueInfo> getAllQueues() throws YarnException,
+      IOException {
+    List<QueueInfo> queues = new ArrayList<QueueInfo>();
+
+    QueueInfo rootQueue =
+        rmClient.getQueueInfo(getQueueInfoRequest(ROOT, false, true, true))
+          .getQueueInfo();
+    getChildQueues(rootQueue, queues, true);
+    return queues;
+  }
+
+  @Override
+  public List<QueueInfo> getRootQueueInfos() throws YarnException,
+      IOException {
+    List<QueueInfo> queues = new ArrayList<QueueInfo>();
+
+    QueueInfo rootQueue =
+        rmClient.getQueueInfo(getQueueInfoRequest(ROOT, false, true, true))
+          .getQueueInfo();
+    getChildQueues(rootQueue, queues, false);
+    return queues;
+  }
+
+  @Override
+  public List<QueueInfo> getChildQueueInfos(String parent)
+      throws YarnException, IOException {
+    List<QueueInfo> queues = new ArrayList<QueueInfo>();
+
+    QueueInfo parentQueue =
+        rmClient.getQueueInfo(getQueueInfoRequest(parent, false, true, false))
+          .getQueueInfo();
+    getChildQueues(parentQueue, queues, true);
+    return queues;
+  }
+
+  private void getChildQueues(QueueInfo parent, List<QueueInfo> queues,
+      boolean recursive) {
+    List<QueueInfo> childQueues = parent.getChildQueues();
+
+    for (QueueInfo child : childQueues) {
+      queues.add(child);
+      if (recursive) {
+        getChildQueues(child, queues, recursive);
+      }
+    }
+  }
+
+  @Private
+  @VisibleForTesting
+  public void setRMClient(ApplicationClientProtocol rmClient) {
+    this.rmClient = rmClient;
+  }
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.client.api.impl;
+import org.apache.hadoop.classification.InterfaceAudience;
+

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.client.api;
+import org.apache.hadoop.classification.InterfaceAudience;
+

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java?rev=1494017&r1=1494016&r2=1494017&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java Tue Jun 18 04:02:47 2013
@@ -27,12 +27,16 @@ import org.apache.commons.cli.CommandLin
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
+@Private
+@Unstable
 public class ApplicationCLI extends YarnCLI {
   private static final String APPLICATIONS_PATTERN =
     "%30s\t%20s\t%20s\t%10s\t%10s\t%18s\t%18s\t%15s\t%35s" +

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java?rev=1494017&r1=1494016&r2=1494017&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java Tue Jun 18 04:02:47 2013
@@ -28,12 +28,16 @@ import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.lang.time.DateFormatUtils;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
+@Private
+@Unstable
 public class NodeCLI extends YarnCLI {
   private static final String NODES_PATTERN = "%16s\t%10s\t%17s\t%18s" +
     System.getProperty("line.separator");

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java?rev=1494017&r1=1494016&r2=1494017&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java Tue Jun 18 04:02:47 2013
@@ -23,6 +23,8 @@ import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.util.Arrays;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.ipc.RemoteException;
@@ -42,6 +44,8 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 
+@Private
+@Unstable
 public class RMAdminCLI extends Configured implements Tool {
 
   private final RecordFactory recordFactory = 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java?rev=1494017&r1=1494016&r2=1494017&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java Tue Jun 18 04:02:47 2013
@@ -19,12 +19,15 @@ package org.apache.hadoop.yarn.client.cl
 
 import java.io.PrintStream;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.yarn.client.YarnClient;
-import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
+@Private
+@Unstable
 public abstract class YarnCLI extends Configured implements Tool {
 
   public static final String STATUS_CMD = "status";

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/package-info.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/package-info.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/package-info.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.client.cli;
+import org.apache.hadoop.classification.InterfaceAudience;
+