You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2016/06/23 02:05:04 UTC

hadoop git commit: YARN-5171. Extend DistributedSchedulerProtocol to notify RM of containers allocated by the Node. (Inigo Goiri via asuresh)

Repository: hadoop
Updated Branches:
  refs/heads/trunk 79a728916 -> 99e5dd68d


YARN-5171. Extend DistributedSchedulerProtocol to notify RM of containers allocated by the Node. (Inigo Goiri via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/99e5dd68
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/99e5dd68
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/99e5dd68

Branch: refs/heads/trunk
Commit: 99e5dd68d0f44109c169d74824fa45a7396a5990
Parents: 79a7289
Author: Arun Suresh <as...@apache.org>
Authored: Wed Jun 22 19:04:54 2016 -0700
Committer: Arun Suresh <as...@apache.org>
Committed: Wed Jun 22 19:04:54 2016 -0700

----------------------------------------------------------------------
 .../api/impl/TestDistributedScheduling.java     | 123 +++++++++++-
 .../yarn/api/records/impl/pb/ProtoUtils.java    |  14 ++
 .../api/DistributedSchedulerProtocol.java       |   4 +-
 ...istributedSchedulerProtocolPBClientImpl.java |  17 +-
 ...stributedSchedulerProtocolPBServiceImpl.java |   7 +-
 .../DistSchedAllocateRequest.java               |  69 +++++++
 .../impl/pb/DistSchedAllocateRequestPBImpl.java | 185 +++++++++++++++++++
 .../proto/distributed_scheduler_protocol.proto  |   2 +-
 .../yarn_server_common_service_protos.proto     |   5 +
 .../amrmproxy/AbstractRequestInterceptor.java   |  12 +-
 .../amrmproxy/DefaultRequestInterceptor.java    |  11 +-
 .../nodemanager/scheduler/LocalScheduler.java   |  34 ++--
 .../scheduler/TestLocalScheduler.java           |   3 +-
 .../DistributedSchedulingService.java           |  29 ++-
 .../rmcontainer/RMContainer.java                |  11 ++
 .../rmcontainer/RMContainerImpl.java            |  31 ++++
 .../scheduler/AbstractYarnScheduler.java        |  18 +-
 .../scheduler/SchedulerApplicationAttempt.java  |  20 ++
 .../TestDistributedSchedulingService.java       |  24 ++-
 19 files changed, 572 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
index a556aa2..c649071 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
@@ -47,12 +47,15 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -64,10 +67,14 @@ import org.mockito.stubbing.Answer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -166,8 +173,18 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
     Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
     Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
 
-    RMApp rmApp =
-        cluster.getResourceManager().getRMContext().getRMApps().get(appId);
+    // Wait until the RM has been updated and verify
+    Map<ApplicationId, RMApp> rmApps =
+        cluster.getResourceManager().getRMContext().getRMApps();
+    boolean rmUpdated = false;
+    for (int i=0; i<10 && !rmUpdated; i++) {
+      sleep(100);
+      RMApp rmApp = rmApps.get(appId);
+      if (rmApp.getState() == RMAppState.RUNNING) {
+        rmUpdated = true;
+      }
+    }
+    RMApp rmApp = rmApps.get(appId);
     Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
 
     LOG.info("testDistributedSchedulingE2E - Allocate");
@@ -207,6 +224,17 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
           containerTokenIdentifier.getExecutionType());
     }
 
+    // Check that the RM sees OPPORTUNISTIC containers
+    ResourceScheduler scheduler = cluster.getResourceManager()
+        .getResourceScheduler();
+    for (Container allocatedContainer : allocResponse
+        .getAllocatedContainers()) {
+      ContainerId containerId = allocatedContainer.getId();
+      RMContainer rmContainer = scheduler.getRMContainer(containerId);
+      Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+          rmContainer.getExecutionType());
+    }
+
     LOG.info("testDistributedSchedulingE2E - Finish");
   }
 
@@ -512,6 +540,97 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
     }
   }
 
+  /**
+   * Check if an AM can ask for opportunistic containers and get them.
+   * @throws Exception
+   */
+  @Test
+  public void testAMOpportunistic() throws Exception {
+    // Basic container to request
+    Resource capability = Resource.newInstance(1024, 1);
+    Priority priority = Priority.newInstance(1);
+
+    // Get the cluster topology
+    List<NodeReport> nodeReports = rmClient.getNodeReports(NodeState.RUNNING);
+    String node = nodeReports.get(0).getNodeId().getHost();
+    String rack = nodeReports.get(0).getRackName();
+    String[] nodes = new String[]{node};
+    String[] racks = new String[]{rack};
+
+    // Create an AM to request resources
+    AMRMClient<AMRMClient.ContainerRequest> amClient = null;
+    try {
+      amClient = new AMRMClientImpl<AMRMClient.ContainerRequest>(client);
+      amClient.init(yarnConf);
+      amClient.start();
+      amClient.registerApplicationMaster(NetUtils.getHostname(), 1024, "");
+
+      // AM requests an opportunistic container
+      ExecutionTypeRequest execTypeRequest =
+          ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true);
+      ContainerRequest containerRequest = new AMRMClient.ContainerRequest(
+          capability, nodes, racks, priority, true, null, execTypeRequest);
+      amClient.addContainerRequest(containerRequest);
+
+      // Wait until the container is allocated
+      ContainerId opportunisticContainerId = null;
+      for (int i=0; i<10 && opportunisticContainerId == null; i++) {
+        AllocateResponse allocResponse = amClient.allocate(0.1f);
+        List<Container> allocatedContainers =
+            allocResponse.getAllocatedContainers();
+        for (Container allocatedContainer : allocatedContainers) {
+          // Check that this is the container we required
+          assertEquals(ExecutionType.OPPORTUNISTIC,
+              allocatedContainer.getExecutionType());
+          opportunisticContainerId = allocatedContainer.getId();
+        }
+        sleep(100);
+      }
+      assertNotNull(opportunisticContainerId);
+
+      // The RM sees the container as OPPORTUNISTIC
+      ResourceScheduler scheduler = cluster.getResourceManager()
+          .getResourceScheduler();
+      RMContainer rmContainer = scheduler.getRMContainer(
+          opportunisticContainerId);
+      assertEquals(ExecutionType.OPPORTUNISTIC,
+          rmContainer.getExecutionType());
+
+      // Release the opportunistic container
+      amClient.releaseAssignedContainer(opportunisticContainerId);
+      // Wait for the release container to appear
+      boolean released = false;
+      for (int i=0; i<10 && !released; i++) {
+        AllocateResponse allocResponse = amClient.allocate(0.1f);
+        List<ContainerStatus> completedContainers =
+            allocResponse.getCompletedContainersStatuses();
+        for (ContainerStatus completedContainer : completedContainers) {
+          ContainerId completedContainerId =
+              completedContainer.getContainerId();
+          assertEquals(completedContainerId, opportunisticContainerId);
+          released = true;
+        }
+        if (!released) {
+          sleep(100);
+        }
+      }
+      assertTrue(released);
+
+      // The RM shouldn't see the container anymore
+      rmContainer = scheduler.getRMContainer(opportunisticContainerId);
+      assertNull(rmContainer);
+
+      // Clean the AM
+      amClient.unregisterApplicationMaster(
+          FinalApplicationStatus.SUCCEEDED, null, null);
+    } finally {
+      if (amClient != null &&
+          amClient.getServiceState() == Service.STATE.STARTED) {
+        amClient.close();
+      }
+    }
+  }
+
   private void sleep(int sleepTime) {
     try {
       Thread.sleep(sleepTime);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
index 1a0f30a..4b62358 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
 import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
@@ -339,4 +340,17 @@ public class ProtoUtils {
       ExecutionTypeRequestProto e) {
     return new ExecutionTypeRequestPBImpl(e);
   }
+
+  /*
+   * Container
+   */
+  public static YarnProtos.ContainerProto convertToProtoFormat(
+      Container t) {
+    return ((ContainerPBImpl)t).getProto();
+  }
+
+  public static ContainerStatusPBImpl convertFromProtoFormat(
+      YarnProtos.ContainerStatusProto p) {
+    return new ContainerStatusPBImpl(p);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
index 490c25b..26faa8f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.io.retry.Idempotent;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
@@ -74,5 +74,5 @@ public interface DistributedSchedulerProtocol
   @Unstable
   @Idempotent
   DistSchedAllocateResponse allocateForDistributedScheduling(
-      AllocateRequest request) throws YarnException, IOException;
+      DistSchedAllocateRequest request) throws YarnException, IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
index c1dd9e5..0ca61df 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
@@ -22,9 +22,11 @@ import com.google.protobuf.ServiceException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
 import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -35,6 +37,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
 
 
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
@@ -77,9 +80,9 @@ public class DistributedSchedulerProtocolPBClientImpl implements
 
   @Override
   public DistSchedRegisterResponse
-  registerApplicationMasterForDistributedScheduling
-      (RegisterApplicationMasterRequest request) throws YarnException,
-      IOException {
+      registerApplicationMasterForDistributedScheduling(
+          RegisterApplicationMasterRequest request)
+          throws YarnException, IOException {
     YarnServiceProtos.RegisterApplicationMasterRequestProto requestProto =
         ((RegisterApplicationMasterRequestPBImpl) request).getProto();
     try {
@@ -93,10 +96,10 @@ public class DistributedSchedulerProtocolPBClientImpl implements
   }
 
   @Override
-  public DistSchedAllocateResponse allocateForDistributedScheduling
-      (AllocateRequest request) throws YarnException, IOException {
-    YarnServiceProtos.AllocateRequestProto requestProto =
-        ((AllocateRequestPBImpl) request).getProto();
+  public DistSchedAllocateResponse allocateForDistributedScheduling(
+      DistSchedAllocateRequest request) throws YarnException, IOException {
+    YarnServerCommonServiceProtos.DistSchedAllocateRequestProto requestProto =
+        ((DistSchedAllocateRequestPBImpl) request).getProto();
     try {
       return new DistSchedAllocateResponsePBImpl(
           proxy.allocateForDistributedScheduling(null, requestProto));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
index 8be2893..2763259 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.protocolrecords
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
@@ -77,8 +78,10 @@ public class DistributedSchedulerProtocolPBServiceImpl implements
   @Override
   public YarnServerCommonServiceProtos.DistSchedAllocateResponseProto
   allocateForDistributedScheduling(RpcController controller,
-      AllocateRequestProto proto) throws ServiceException {
-    AllocateRequestPBImpl request = new AllocateRequestPBImpl(proto);
+      YarnServerCommonServiceProtos.DistSchedAllocateRequestProto proto)
+      throws ServiceException {
+    DistSchedAllocateRequestPBImpl request =
+        new DistSchedAllocateRequestPBImpl(proto);
     try {
       DistSchedAllocateResponse response = real
           .allocateForDistributedScheduling(request);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateRequest.java
new file mode 100644
index 0000000..10ff95b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateRequest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.util.List;
+
+/**
+ * Request for a distributed scheduler to notify allocation of containers to
+ * the Resource Manager.
+ */
+@Public
+@Evolving
+public abstract class DistSchedAllocateRequest {
+
+  /**
+   * Get the underlying <code>AllocateRequest</code> object.
+   * @return Allocate request
+   */
+  @Public
+  @Evolving
+  public abstract AllocateRequest getAllocateRequest();
+
+  /**
+   * Set the underlying <code>AllocateRequest</code> object.
+   * @param allocateRequest  Allocate request
+   */
+  @Public
+  @Evolving
+  public abstract void  setAllocateRequest(AllocateRequest allocateRequest);
+
+  /**
+   * Get the list of <em>newly allocated</em> <code>Container</code> by the
+   * Distributed Scheduling component on the NodeManager.
+   * @return list of <em>newly allocated</em> <code>Container</code>
+   */
+  @Public
+  @Evolving
+  public abstract List<Container> getAllocatedContainers();
+
+  /**
+   * Set the list of <em>newly allocated</em> <code>Container</code> by the
+   * Distributed Scheduling component on the NodeManager.
+   * @param containers list of <em>newly allocated</em> <code>Container</code>
+   */
+  @Public
+  @Evolving
+  public abstract void setAllocatedContainers(List<Container> containers);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateRequestPBImpl.java
new file mode 100644
index 0000000..be386b6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateRequestPBImpl.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.DistSchedAllocateRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.DistSchedAllocateRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Implementation of {@link DistSchedAllocateRequest} for a distributed
+ * scheduler to notify about the allocation of containers to the Resource
+ * Manager.
+ */
+public class DistSchedAllocateRequestPBImpl extends DistSchedAllocateRequest {
+  private DistSchedAllocateRequestProto.Builder builder = null;
+  private boolean viaProto = false;
+
+  private DistSchedAllocateRequestProto proto;
+  private AllocateRequest allocateRequest;
+  private List<Container> containers;
+
+  public DistSchedAllocateRequestPBImpl() {
+    builder = DistSchedAllocateRequestProto.newBuilder();
+  }
+
+  public DistSchedAllocateRequestPBImpl(DistSchedAllocateRequestProto proto) {
+    this.proto = proto;
+    this.viaProto = true;
+  }
+
+  @Override
+  public AllocateRequest getAllocateRequest() {
+    DistSchedAllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.allocateRequest != null) {
+      return this.allocateRequest;
+    }
+    if (!p.hasAllocateRequest()) {
+      return null;
+    }
+    this.allocateRequest = convertFromProtoFormat(p.getAllocateRequest());
+    return this.allocateRequest;
+  }
+
+  @Override
+  public void setAllocateRequest(AllocateRequest pAllocateRequest) {
+    maybeInitBuilder();
+    if (allocateRequest == null) {
+      builder.clearAllocateRequest();
+    }
+    this.allocateRequest = pAllocateRequest;
+  }
+
+  @Override
+  public List<Container> getAllocatedContainers() {
+    if (this.containers != null) {
+      return this.containers;
+    }
+    initAllocatedContainers();
+    return containers;
+  }
+
+  private void initAllocatedContainers() {
+    DistSchedAllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
+    List<ContainerProto> list = p.getAllocatedContainersList();
+    this.containers = new ArrayList<Container>();
+    for (ContainerProto c : list) {
+      this.containers.add(convertFromProtoFormat(c));
+    }
+  }
+
+  @Override
+  public void setAllocatedContainers(List<Container> pContainers) {
+    maybeInitBuilder();
+    if (pContainers == null || pContainers.isEmpty()) {
+      if (this.containers != null) {
+        this.containers.clear();
+      }
+      builder.clearAllocatedContainers();
+      return;
+    }
+    this.containers = new ArrayList<>();
+    this.containers.addAll(pContainers);
+  }
+
+  public DistSchedAllocateRequestProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = DistSchedAllocateRequestProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto) {
+      maybeInitBuilder();
+    }
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void mergeLocalToBuilder() {
+    if (this.containers != null) {
+      builder.clearAllocatedContainers();
+      Iterable<ContainerProto> iterable =
+          getContainerProtoIterable(this.containers);
+      builder.addAllAllocatedContainers(iterable);
+    }
+    if (this.allocateRequest != null) {
+      builder.setAllocateRequest(
+          ((AllocateRequestPBImpl)this.allocateRequest).getProto());
+    }
+  }
+
+  private Iterable<ContainerProto> getContainerProtoIterable(
+      final List<Container> newContainersList) {
+    maybeInitBuilder();
+    return new Iterable<ContainerProto>() {
+      @Override
+      public synchronized Iterator<ContainerProto> iterator() {
+        return new Iterator<ContainerProto>() {
+          Iterator<Container> iter = newContainersList.iterator();
+
+          @Override
+          public synchronized boolean hasNext() {
+            return iter.hasNext();
+          }
+
+          @Override
+          public synchronized ContainerProto next() {
+            return ProtoUtils.convertToProtoFormat(iter.next());
+          }
+
+          @Override
+          public synchronized void remove() {
+            throw new UnsupportedOperationException();
+
+          }
+        };
+      }
+    };
+  }
+
+  private ContainerPBImpl convertFromProtoFormat(ContainerProto p) {
+    return new ContainerPBImpl(p);
+  }
+
+  private AllocateRequestPBImpl convertFromProtoFormat(AllocateRequestProto p) {
+    return new AllocateRequestPBImpl(p);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
index b94656c..818eb4a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
@@ -35,5 +35,5 @@ import "yarn_server_common_service_protos.proto";
 service DistributedSchedulerProtocolService {
   rpc registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequestProto) returns (DistSchedRegisterResponseProto);
   rpc finishApplicationMaster (FinishApplicationMasterRequestProto) returns (FinishApplicationMasterResponseProto);
-  rpc allocateForDistributedScheduling (AllocateRequestProto) returns (DistSchedAllocateResponseProto);
+  rpc allocateForDistributedScheduling (DistSchedAllocateRequestProto) returns (DistSchedAllocateResponseProto);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index a7e5a86..3e3cb82 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -41,6 +41,11 @@ message DistSchedAllocateResponseProto {
   repeated NodeIdProto nodes_for_scheduling = 2;
 }
 
+message DistSchedAllocateRequestProto {
+  optional AllocateRequestProto allocate_request = 1;
+  repeated ContainerProto allocated_containers = 2;
+}
+
 message NodeLabelsProto {
   repeated NodeLabelProto nodeLabels = 1;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
index 2b2a2f6..55c65f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
@@ -21,10 +21,10 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
 import org.apache.hadoop.conf.Configuration;
 
 import com.google.common.base.Preconditions;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords
     .RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
 
@@ -118,8 +118,8 @@ public abstract class AbstractRequestInterceptor implements
    * @throws IOException
    */
   @Override
-  public DistSchedAllocateResponse allocateForDistributedScheduling
-      (AllocateRequest request) throws YarnException, IOException {
+  public DistSchedAllocateResponse allocateForDistributedScheduling(
+      DistSchedAllocateRequest request) throws YarnException, IOException {
     return (this.nextInterceptor != null) ?
         this.nextInterceptor.allocateForDistributedScheduling(request) : null;
   }
@@ -135,9 +135,9 @@ public abstract class AbstractRequestInterceptor implements
    */
   @Override
   public DistSchedRegisterResponse
-  registerApplicationMasterForDistributedScheduling
-      (RegisterApplicationMasterRequest request) throws YarnException,
-      IOException {
+      registerApplicationMasterForDistributedScheduling(
+          RegisterApplicationMasterRequest request)
+          throws YarnException, IOException {
     return (this.nextInterceptor != null) ? this.nextInterceptor
         .registerApplicationMasterForDistributedScheduling(request) : null;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
index 1637682..debff76 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
 import org.apache.hadoop.yarn.server.api.ServerRMProxy;
 import org.apache.hadoop.yarn.server.api.protocolrecords
     .DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -133,8 +134,8 @@ public final class DefaultRequestInterceptor extends
   }
 
   @Override
-  public DistSchedAllocateResponse allocateForDistributedScheduling
-      (AllocateRequest request) throws YarnException, IOException {
+  public DistSchedAllocateResponse allocateForDistributedScheduling(
+      DistSchedAllocateRequest request) throws YarnException, IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Forwarding allocateForDistributedScheduling request" +
           "to the real YARN RM");
@@ -212,9 +213,9 @@ public final class DefaultRequestInterceptor extends
         }
 
         @Override
-        public DistSchedAllocateResponse
-        allocateForDistributedScheduling(AllocateRequest request) throws
-            YarnException, IOException {
+        public DistSchedAllocateResponse allocateForDistributedScheduling(
+            DistSchedAllocateRequest request)
+                throws YarnException, IOException {
           throw new IOException("Not Supported !!");
         }
       };

http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
index 8e2ceb0..10c1361 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
@@ -21,6 +21,9 @@ package org.apache.hadoop.yarn.server.nodemanager.scheduler;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords
@@ -99,6 +102,9 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
   private static final Logger LOG = LoggerFactory
       .getLogger(LocalScheduler.class);
 
+  private final static RecordFactory RECORD_FACTORY =
+      RecordFactoryProvider.getRecordFactory(null);
+
   // Currently just used to keep track of allocated Containers
   // Can be used for reporting stats later
   private Set<ContainerId> containersAllocated = new HashSet<>();
@@ -176,7 +182,10 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
   @Override
   public AllocateResponse allocate(AllocateRequest request) throws
       YarnException, IOException {
-    return allocateForDistributedScheduling(request).getAllocateResponse();
+    DistSchedAllocateRequest distRequest =
+        RECORD_FACTORY.newRecordInstance(DistSchedAllocateRequest.class);
+    distRequest.setAllocateRequest(request);
+    return allocateForDistributedScheduling(distRequest).getAllocateResponse();
   }
 
   @Override
@@ -324,9 +333,9 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
 
   @Override
   public DistSchedRegisterResponse
-  registerApplicationMasterForDistributedScheduling
-      (RegisterApplicationMasterRequest request) throws YarnException,
-      IOException {
+      registerApplicationMasterForDistributedScheduling(
+          RegisterApplicationMasterRequest request)
+              throws YarnException, IOException {
     LOG.info("Forwarding registration request to the" +
         "Distributed Scheduler Service on YARN RM");
     DistSchedRegisterResponse dsResp = getNextInterceptor()
@@ -336,17 +345,18 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
   }
 
   @Override
-  public DistSchedAllocateResponse allocateForDistributedScheduling
-      (AllocateRequest request) throws YarnException, IOException {
+  public DistSchedAllocateResponse allocateForDistributedScheduling(
+      DistSchedAllocateRequest request) throws YarnException, IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Forwarding allocate request to the" +
           "Distributed Scheduler Service on YARN RM");
     }
     // Partition requests into GUARANTEED and OPPORTUNISTIC reqs
-    PartitionedResourceRequests partitionedAsks = partitionAskList(request
-        .getAskList());
+    PartitionedResourceRequests partitionedAsks = partitionAskList(
+        request.getAllocateRequest().getAskList());
 
-    List<ContainerId> releasedContainers = request.getReleaseList();
+    List<ContainerId> releasedContainers =
+        request.getAllocateRequest().getReleaseList();
     int numReleasedContainers = releasedContainers.size();
     if (numReleasedContainers > 0) {
       LOG.info("AttemptID: " + applicationAttemptId + " released: "
@@ -355,7 +365,8 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
     }
 
     // Also, update black list
-    ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest();
+    ResourceBlacklistRequest rbr =
+        request.getAllocateRequest().getResourceBlacklistRequest();
     if (rbr != null) {
       blacklist.removeAll(rbr.getBlacklistRemovals());
       blacklist.addAll(rbr.getBlacklistAdditions());
@@ -381,9 +392,10 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
         allocatedContainers.addAll(e.getValue());
       }
     }
+    request.setAllocatedContainers(allocatedContainers);
 
     // Send all the GUARANTEED Reqs to RM
-    request.setAskList(partitionedAsks.getGuaranteed());
+    request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed());
     DistSchedAllocateResponse dsResp =
         getNextInterceptor().allocateForDistributedScheduling(request);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
index a1d39f7..31f8085 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords
@@ -126,7 +127,7 @@ public class TestLocalScheduler {
 
     Mockito.when(
         finalReqIntcptr.allocateForDistributedScheduling(
-            Mockito.any(AllocateRequest.class)))
+            Mockito.any(DistSchedAllocateRequest.class)))
         .thenAnswer(new Answer<DistSchedAllocateResponse>() {
           @Override
           public DistSchedAllocateResponse answer(InvocationOnMock

http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
index a93f683..5aabddc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
@@ -32,6 +33,7 @@ import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBSer
 
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -45,6 +47,12 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
 
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor;
 
@@ -60,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretMan
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -229,9 +238,23 @@ public class DistributedSchedulingService extends ApplicationMasterService
   }
 
   @Override
-  public DistSchedAllocateResponse allocateForDistributedScheduling
-      (AllocateRequest request) throws YarnException, IOException {
-    AllocateResponse response = allocate(request);
+  public DistSchedAllocateResponse allocateForDistributedScheduling(
+      DistSchedAllocateRequest request) throws YarnException, IOException {
+    List<Container> distAllocContainers = request.getAllocatedContainers();
+    for (Container container : distAllocContainers) {
+      // Create RMContainer
+      SchedulerApplicationAttempt appAttempt =
+          ((AbstractYarnScheduler) rmContext.getScheduler())
+              .getCurrentAttemptForContainer(container.getId());
+      RMContainer rmContainer = new RMContainerImpl(container,
+          appAttempt.getApplicationAttemptId(), container.getNodeId(),
+          appAttempt.getUser(), rmContext, true);
+      appAttempt.addRMContainer(container.getId(), rmContainer);
+      rmContainer.handle(
+          new RMContainerEvent(container.getId(),
+              RMContainerEventType.LAUNCHED));
+    }
+    AllocateResponse response = allocate(request.getAllocateRequest());
     DistSchedAllocateResponse dsResp = recordFactory.newRecordInstance
         (DistSchedAllocateResponse.class);
     dsResp.setAllocateResponse(response);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
index f37923f..504c973 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerReport;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -91,4 +92,14 @@ public interface RMContainer extends EventHandler<RMContainerEvent> {
   void cancelIncreaseReservation();
 
   String getQueueName();
+
+  ExecutionType getExecutionType();
+
+  /**
+   * If the container was allocated by a container other than the Resource
+   * Manager (e.g., the distributed scheduler in the NM
+   * <code>LocalScheduler</code>).
+   * @return If the container was allocated remotely.
+   */
+  boolean isRemotelyAllocated();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 95f81d4..ed819a0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerReport;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -79,6 +80,8 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
         RMContainerEventType.KILL)
     .addTransition(RMContainerState.NEW, RMContainerState.RESERVED,
         RMContainerEventType.RESERVED, new ContainerReservedTransition())
+    .addTransition(RMContainerState.NEW, RMContainerState.RUNNING,
+        RMContainerEventType.LAUNCHED)
     .addTransition(RMContainerState.NEW,
         EnumSet.of(RMContainerState.RUNNING, RMContainerState.COMPLETED),
         RMContainerEventType.RECOVER, new ContainerRecoveredTransition())
@@ -183,6 +186,8 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
   private Resource lastConfirmedResource;
   private volatile String queueName;
 
+  private boolean isExternallyAllocated;
+
   public RMContainerImpl(Container container,
       ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
       RMContext rmContext) {
@@ -190,6 +195,13 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
         .currentTimeMillis(), "");
   }
 
+  public RMContainerImpl(Container container,
+      ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
+      RMContext rmContext, boolean isExternallyAllocated) {
+    this(container, appAttemptId, nodeId, user, rmContext, System
+        .currentTimeMillis(), "", isExternallyAllocated);
+  }
+
   private boolean saveNonAMContainerMetaInfo;
 
   public RMContainerImpl(Container container,
@@ -202,6 +214,14 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
   public RMContainerImpl(Container container,
       ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
       RMContext rmContext, long creationTime, String nodeLabelExpression) {
+    this(container, appAttemptId, nodeId, user, rmContext, creationTime,
+        nodeLabelExpression, false);
+  }
+
+  public RMContainerImpl(Container container,
+      ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
+      RMContext rmContext, long creationTime, String nodeLabelExpression,
+      boolean isExternallyAllocated) {
     this.stateMachine = stateMachineFactory.make(this);
     this.containerId = container.getId();
     this.nodeId = nodeId;
@@ -216,6 +236,7 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
     this.resourceRequests = null;
     this.nodeLabelExpression = nodeLabelExpression;
     this.lastConfirmedResource = container.getResource();
+    this.isExternallyAllocated = isExternallyAllocated;
 
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     this.readLock = lock.readLock();
@@ -827,4 +848,14 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
   public String getQueueName() {
     return queueName;
   }
+
+  @Override
+  public ExecutionType getExecutionType() {
+    return container.getExecutionType();
+  }
+
+  @Override
+  public boolean isRemotelyAllocated() {
+    return isExternallyAllocated;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 3066339..64eb777 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -516,7 +516,23 @@ public abstract class AbstractYarnScheduler
       return;
     }
 
-    completedContainerInternal(rmContainer, containerStatus, event);
+    if (!rmContainer.isRemotelyAllocated()) {
+      completedContainerInternal(rmContainer, containerStatus, event);
+    } else {
+      ContainerId containerId = rmContainer.getContainerId();
+      // Inform the container
+      rmContainer.handle(
+          new RMContainerFinishedEvent(containerId, containerStatus, event));
+      SchedulerApplicationAttempt schedulerAttempt =
+          getCurrentAttemptForContainer(containerId);
+      if (schedulerAttempt != null) {
+        schedulerAttempt.removeRMContainer(containerId);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Completed container: " + rmContainer.getContainerId() +
+            " in state: " + rmContainer.getState() + " event:" + event);
+      }
+    }
 
     // If the container is getting killed in ACQUIRED state, the requester (AM
     // for regular containers and RM itself for AM container) will not know what

http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index b48b272..aae0770 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -112,6 +112,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
   private boolean isAttemptRecovering;
 
   protected ResourceUsage attemptResourceUsage = new ResourceUsage();
+  /** Scheduled by a remote scheduler. */
+  protected ResourceUsage attemptResourceUsageAllocatedRemotely =
+      new ResourceUsage();
   private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0);
   private AtomicLong firstContainerAllocatedTime = new AtomicLong(0);
 
@@ -288,6 +291,23 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     return liveContainers.get(id);
   }
 
+  public synchronized void addRMContainer(
+      ContainerId id, RMContainer rmContainer) {
+    liveContainers.put(id, rmContainer);
+    if (rmContainer.isRemotelyAllocated()) {
+      this.attemptResourceUsageAllocatedRemotely.incUsed(
+          rmContainer.getAllocatedResource());
+    }
+  }
+
+  public synchronized void removeRMContainer(ContainerId containerId) {
+    RMContainer rmContainer = liveContainers.remove(containerId);
+    if (rmContainer != null && rmContainer.isRemotelyAllocated()) {
+      this.attemptResourceUsageAllocatedRemotely.decUsed(
+          rmContainer.getAllocatedResource());
+    }
+  }
+
   protected synchronized void resetReReservations(Priority priority) {
     reReservations.setCount(priority, 0);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
index 7d2ed33..4716bab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
@@ -44,15 +44,17 @@ import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl;
 
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -175,10 +177,15 @@ public class TestDistributedSchedulingService {
     Assert.assertEquals(2,
         dsRegResp.getIncrAllocatableCapabilty().getVirtualCores());
 
+    DistSchedAllocateRequestPBImpl distAllReq =
+        (DistSchedAllocateRequestPBImpl)factory.newRecordInstance(
+            DistSchedAllocateRequest.class);
+    distAllReq.setAllocateRequest(allReq);
+    distAllReq.setAllocatedContainers(Arrays.asList(c));
     DistSchedAllocateResponse dsAllocResp =
         new DistSchedAllocateResponsePBImpl(
             dsProxy.allocateForDistributedScheduling(null,
-                ((AllocateRequestPBImpl)allReq).getProto()));
+                distAllReq.getProto()));
     Assert.assertEquals(
         "h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
 
@@ -243,8 +250,13 @@ public class TestDistributedSchedulingService {
 
       @Override
       public DistSchedAllocateResponse allocateForDistributedScheduling(
-          AllocateRequest request) throws YarnException, IOException {
-        List<ResourceRequest> askList = request.getAskList();
+          DistSchedAllocateRequest request) throws YarnException, IOException {
+        List<ResourceRequest> askList =
+            request.getAllocateRequest().getAskList();
+        List<Container> allocatedContainers = request.getAllocatedContainers();
+        Assert.assertEquals(1, allocatedContainers.size());
+        Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+            allocatedContainers.get(0).getExecutionType());
         Assert.assertEquals(1, askList.size());
         Assert.assertTrue(askList.get(0)
             .getExecutionTypeRequest().getEnforceExecutionType());


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org