You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2016/06/23 02:05:04 UTC
hadoop git commit: YARN-5171. Extend DistributedSchedulerProtocol to
notify RM of containers allocated by the Node. (Inigo Goiri via asuresh)
Repository: hadoop
Updated Branches:
refs/heads/trunk 79a728916 -> 99e5dd68d
YARN-5171. Extend DistributedSchedulerProtocol to notify RM of containers allocated by the Node. (Inigo Goiri via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/99e5dd68
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/99e5dd68
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/99e5dd68
Branch: refs/heads/trunk
Commit: 99e5dd68d0f44109c169d74824fa45a7396a5990
Parents: 79a7289
Author: Arun Suresh <as...@apache.org>
Authored: Wed Jun 22 19:04:54 2016 -0700
Committer: Arun Suresh <as...@apache.org>
Committed: Wed Jun 22 19:04:54 2016 -0700
----------------------------------------------------------------------
.../api/impl/TestDistributedScheduling.java | 123 +++++++++++-
.../yarn/api/records/impl/pb/ProtoUtils.java | 14 ++
.../api/DistributedSchedulerProtocol.java | 4 +-
...istributedSchedulerProtocolPBClientImpl.java | 17 +-
...stributedSchedulerProtocolPBServiceImpl.java | 7 +-
.../DistSchedAllocateRequest.java | 69 +++++++
.../impl/pb/DistSchedAllocateRequestPBImpl.java | 185 +++++++++++++++++++
.../proto/distributed_scheduler_protocol.proto | 2 +-
.../yarn_server_common_service_protos.proto | 5 +
.../amrmproxy/AbstractRequestInterceptor.java | 12 +-
.../amrmproxy/DefaultRequestInterceptor.java | 11 +-
.../nodemanager/scheduler/LocalScheduler.java | 34 ++--
.../scheduler/TestLocalScheduler.java | 3 +-
.../DistributedSchedulingService.java | 29 ++-
.../rmcontainer/RMContainer.java | 11 ++
.../rmcontainer/RMContainerImpl.java | 31 ++++
.../scheduler/AbstractYarnScheduler.java | 18 +-
.../scheduler/SchedulerApplicationAttempt.java | 20 ++
.../TestDistributedSchedulingService.java | 24 ++-
19 files changed, 572 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
index a556aa2..c649071 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
@@ -47,12 +47,15 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Assert;
@@ -64,10 +67,14 @@ import org.mockito.stubbing.Answer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -166,8 +173,18 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
- RMApp rmApp =
- cluster.getResourceManager().getRMContext().getRMApps().get(appId);
+ // Wait until the RM has been updated and verify
+ Map<ApplicationId, RMApp> rmApps =
+ cluster.getResourceManager().getRMContext().getRMApps();
+ boolean rmUpdated = false;
+ for (int i=0; i<10 && !rmUpdated; i++) {
+ sleep(100);
+ RMApp rmApp = rmApps.get(appId);
+ if (rmApp.getState() == RMAppState.RUNNING) {
+ rmUpdated = true;
+ }
+ }
+ RMApp rmApp = rmApps.get(appId);
Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
LOG.info("testDistributedSchedulingE2E - Allocate");
@@ -207,6 +224,17 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
containerTokenIdentifier.getExecutionType());
}
+ // Check that the RM sees OPPORTUNISTIC containers
+ ResourceScheduler scheduler = cluster.getResourceManager()
+ .getResourceScheduler();
+ for (Container allocatedContainer : allocResponse
+ .getAllocatedContainers()) {
+ ContainerId containerId = allocatedContainer.getId();
+ RMContainer rmContainer = scheduler.getRMContainer(containerId);
+ Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+ rmContainer.getExecutionType());
+ }
+
LOG.info("testDistributedSchedulingE2E - Finish");
}
@@ -512,6 +540,97 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
}
}
+ /**
+ * Check if an AM can ask for opportunistic containers and get them.
+ * @throws Exception
+ */
+ @Test
+ public void testAMOpportunistic() throws Exception {
+ // Basic container to request
+ Resource capability = Resource.newInstance(1024, 1);
+ Priority priority = Priority.newInstance(1);
+
+ // Get the cluster topology
+ List<NodeReport> nodeReports = rmClient.getNodeReports(NodeState.RUNNING);
+ String node = nodeReports.get(0).getNodeId().getHost();
+ String rack = nodeReports.get(0).getRackName();
+ String[] nodes = new String[]{node};
+ String[] racks = new String[]{rack};
+
+ // Create an AM to request resources
+ AMRMClient<AMRMClient.ContainerRequest> amClient = null;
+ try {
+ amClient = new AMRMClientImpl<AMRMClient.ContainerRequest>(client);
+ amClient.init(yarnConf);
+ amClient.start();
+ amClient.registerApplicationMaster(NetUtils.getHostname(), 1024, "");
+
+ // AM requests an opportunistic container
+ ExecutionTypeRequest execTypeRequest =
+ ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true);
+ ContainerRequest containerRequest = new AMRMClient.ContainerRequest(
+ capability, nodes, racks, priority, true, null, execTypeRequest);
+ amClient.addContainerRequest(containerRequest);
+
+ // Wait until the container is allocated
+ ContainerId opportunisticContainerId = null;
+ for (int i=0; i<10 && opportunisticContainerId == null; i++) {
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ List<Container> allocatedContainers =
+ allocResponse.getAllocatedContainers();
+ for (Container allocatedContainer : allocatedContainers) {
+ // Check that this is the container we required
+ assertEquals(ExecutionType.OPPORTUNISTIC,
+ allocatedContainer.getExecutionType());
+ opportunisticContainerId = allocatedContainer.getId();
+ }
+ sleep(100);
+ }
+ assertNotNull(opportunisticContainerId);
+
+ // The RM sees the container as OPPORTUNISTIC
+ ResourceScheduler scheduler = cluster.getResourceManager()
+ .getResourceScheduler();
+ RMContainer rmContainer = scheduler.getRMContainer(
+ opportunisticContainerId);
+ assertEquals(ExecutionType.OPPORTUNISTIC,
+ rmContainer.getExecutionType());
+
+ // Release the opportunistic container
+ amClient.releaseAssignedContainer(opportunisticContainerId);
+ // Wait for the release container to appear
+ boolean released = false;
+ for (int i=0; i<10 && !released; i++) {
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ List<ContainerStatus> completedContainers =
+ allocResponse.getCompletedContainersStatuses();
+ for (ContainerStatus completedContainer : completedContainers) {
+ ContainerId completedContainerId =
+ completedContainer.getContainerId();
+ assertEquals(completedContainerId, opportunisticContainerId);
+ released = true;
+ }
+ if (!released) {
+ sleep(100);
+ }
+ }
+ assertTrue(released);
+
+ // The RM shouldn't see the container anymore
+ rmContainer = scheduler.getRMContainer(opportunisticContainerId);
+ assertNull(rmContainer);
+
+ // Clean the AM
+ amClient.unregisterApplicationMaster(
+ FinalApplicationStatus.SUCCEEDED, null, null);
+ } finally {
+ if (amClient != null &&
+ amClient.getServiceState() == Service.STATE.STARTED) {
+ amClient.close();
+ }
+ }
+ }
+
private void sleep(int sleepTime) {
try {
Thread.sleep(sleepTime);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
index 1a0f30a..4b62358 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
@@ -339,4 +340,17 @@ public class ProtoUtils {
ExecutionTypeRequestProto e) {
return new ExecutionTypeRequestPBImpl(e);
}
+
+ /*
+ * Container
+ */
+ public static YarnProtos.ContainerProto convertToProtoFormat(
+ Container t) {
+ return ((ContainerPBImpl)t).getProto();
+ }
+
+ public static ContainerStatusPBImpl convertFromProtoFormat(
+ YarnProtos.ContainerStatusProto p) {
+ return new ContainerStatusPBImpl(p);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
index 490c25b..26faa8f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
@@ -74,5 +74,5 @@ public interface DistributedSchedulerProtocol
@Unstable
@Idempotent
DistSchedAllocateResponse allocateForDistributedScheduling(
- AllocateRequest request) throws YarnException, IOException;
+ DistSchedAllocateRequest request) throws YarnException, IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
index c1dd9e5..0ca61df 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
@@ -22,9 +22,11 @@ import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -35,6 +37,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
@@ -77,9 +80,9 @@ public class DistributedSchedulerProtocolPBClientImpl implements
@Override
public DistSchedRegisterResponse
- registerApplicationMasterForDistributedScheduling
- (RegisterApplicationMasterRequest request) throws YarnException,
- IOException {
+ registerApplicationMasterForDistributedScheduling(
+ RegisterApplicationMasterRequest request)
+ throws YarnException, IOException {
YarnServiceProtos.RegisterApplicationMasterRequestProto requestProto =
((RegisterApplicationMasterRequestPBImpl) request).getProto();
try {
@@ -93,10 +96,10 @@ public class DistributedSchedulerProtocolPBClientImpl implements
}
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling
- (AllocateRequest request) throws YarnException, IOException {
- YarnServiceProtos.AllocateRequestProto requestProto =
- ((AllocateRequestPBImpl) request).getProto();
+ public DistSchedAllocateResponse allocateForDistributedScheduling(
+ DistSchedAllocateRequest request) throws YarnException, IOException {
+ YarnServerCommonServiceProtos.DistSchedAllocateRequestProto requestProto =
+ ((DistSchedAllocateRequestPBImpl) request).getProto();
try {
return new DistSchedAllocateResponsePBImpl(
proxy.allocateForDistributedScheduling(null, requestProto));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
index 8be2893..2763259 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.protocolrecords
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
@@ -77,8 +78,10 @@ public class DistributedSchedulerProtocolPBServiceImpl implements
@Override
public YarnServerCommonServiceProtos.DistSchedAllocateResponseProto
allocateForDistributedScheduling(RpcController controller,
- AllocateRequestProto proto) throws ServiceException {
- AllocateRequestPBImpl request = new AllocateRequestPBImpl(proto);
+ YarnServerCommonServiceProtos.DistSchedAllocateRequestProto proto)
+ throws ServiceException {
+ DistSchedAllocateRequestPBImpl request =
+ new DistSchedAllocateRequestPBImpl(proto);
try {
DistSchedAllocateResponse response = real
.allocateForDistributedScheduling(request);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateRequest.java
new file mode 100644
index 0000000..10ff95b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateRequest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.util.List;
+
+/**
+ * Request for a distributed scheduler to notify allocation of containers to
+ * the Resource Manager.
+ */
+@Public
+@Evolving
+public abstract class DistSchedAllocateRequest {
+
+ /**
+ * Get the underlying <code>AllocateRequest</code> object.
+ * @return Allocate request
+ */
+ @Public
+ @Evolving
+ public abstract AllocateRequest getAllocateRequest();
+
+ /**
+ * Set the underlying <code>AllocateRequest</code> object.
+ * @param allocateRequest Allocate request
+ */
+ @Public
+ @Evolving
+ public abstract void setAllocateRequest(AllocateRequest allocateRequest);
+
+ /**
+ * Get the list of <em>newly allocated</em> <code>Container</code> by the
+ * Distributed Scheduling component on the NodeManager.
+ * @return list of <em>newly allocated</em> <code>Container</code>
+ */
+ @Public
+ @Evolving
+ public abstract List<Container> getAllocatedContainers();
+
+ /**
+ * Set the list of <em>newly allocated</em> <code>Container</code> by the
+ * Distributed Scheduling component on the NodeManager.
+ * @param containers list of <em>newly allocated</em> <code>Container</code>
+ */
+ @Public
+ @Evolving
+ public abstract void setAllocatedContainers(List<Container> containers);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateRequestPBImpl.java
new file mode 100644
index 0000000..be386b6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateRequestPBImpl.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.DistSchedAllocateRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.DistSchedAllocateRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Implementation of {@link DistSchedAllocateRequest} for a distributed
+ * scheduler to notify about the allocation of containers to the Resource
+ * Manager.
+ */
+public class DistSchedAllocateRequestPBImpl extends DistSchedAllocateRequest {
+ private DistSchedAllocateRequestProto.Builder builder = null;
+ private boolean viaProto = false;
+
+ private DistSchedAllocateRequestProto proto;
+ private AllocateRequest allocateRequest;
+ private List<Container> containers;
+
+ public DistSchedAllocateRequestPBImpl() {
+ builder = DistSchedAllocateRequestProto.newBuilder();
+ }
+
+ public DistSchedAllocateRequestPBImpl(DistSchedAllocateRequestProto proto) {
+ this.proto = proto;
+ this.viaProto = true;
+ }
+
+ @Override
+ public AllocateRequest getAllocateRequest() {
+ DistSchedAllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.allocateRequest != null) {
+ return this.allocateRequest;
+ }
+ if (!p.hasAllocateRequest()) {
+ return null;
+ }
+ this.allocateRequest = convertFromProtoFormat(p.getAllocateRequest());
+ return this.allocateRequest;
+ }
+
+ @Override
+ public void setAllocateRequest(AllocateRequest pAllocateRequest) {
+ maybeInitBuilder();
+ if (allocateRequest == null) {
+ builder.clearAllocateRequest();
+ }
+ this.allocateRequest = pAllocateRequest;
+ }
+
+ @Override
+ public List<Container> getAllocatedContainers() {
+ if (this.containers != null) {
+ return this.containers;
+ }
+ initAllocatedContainers();
+ return containers;
+ }
+
+ private void initAllocatedContainers() {
+ DistSchedAllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
+ List<ContainerProto> list = p.getAllocatedContainersList();
+ this.containers = new ArrayList<Container>();
+ for (ContainerProto c : list) {
+ this.containers.add(convertFromProtoFormat(c));
+ }
+ }
+
+ @Override
+ public void setAllocatedContainers(List<Container> pContainers) {
+ maybeInitBuilder();
+ if (pContainers == null || pContainers.isEmpty()) {
+ if (this.containers != null) {
+ this.containers.clear();
+ }
+ builder.clearAllocatedContainers();
+ return;
+ }
+ this.containers = new ArrayList<>();
+ this.containers.addAll(pContainers);
+ }
+
+ public DistSchedAllocateRequestProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = DistSchedAllocateRequestProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto) {
+ maybeInitBuilder();
+ }
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.containers != null) {
+ builder.clearAllocatedContainers();
+ Iterable<ContainerProto> iterable =
+ getContainerProtoIterable(this.containers);
+ builder.addAllAllocatedContainers(iterable);
+ }
+ if (this.allocateRequest != null) {
+ builder.setAllocateRequest(
+ ((AllocateRequestPBImpl)this.allocateRequest).getProto());
+ }
+ }
+
+ private Iterable<ContainerProto> getContainerProtoIterable(
+ final List<Container> newContainersList) {
+ maybeInitBuilder();
+ return new Iterable<ContainerProto>() {
+ @Override
+ public synchronized Iterator<ContainerProto> iterator() {
+ return new Iterator<ContainerProto>() {
+ Iterator<Container> iter = newContainersList.iterator();
+
+ @Override
+ public synchronized boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public synchronized ContainerProto next() {
+ return ProtoUtils.convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public synchronized void remove() {
+ throw new UnsupportedOperationException();
+
+ }
+ };
+ }
+ };
+ }
+
+ private ContainerPBImpl convertFromProtoFormat(ContainerProto p) {
+ return new ContainerPBImpl(p);
+ }
+
+ private AllocateRequestPBImpl convertFromProtoFormat(AllocateRequestProto p) {
+ return new AllocateRequestPBImpl(p);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
index b94656c..818eb4a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
@@ -35,5 +35,5 @@ import "yarn_server_common_service_protos.proto";
service DistributedSchedulerProtocolService {
rpc registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequestProto) returns (DistSchedRegisterResponseProto);
rpc finishApplicationMaster (FinishApplicationMasterRequestProto) returns (FinishApplicationMasterResponseProto);
- rpc allocateForDistributedScheduling (AllocateRequestProto) returns (DistSchedAllocateResponseProto);
+ rpc allocateForDistributedScheduling (DistSchedAllocateRequestProto) returns (DistSchedAllocateResponseProto);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index a7e5a86..3e3cb82 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -41,6 +41,11 @@ message DistSchedAllocateResponseProto {
repeated NodeIdProto nodes_for_scheduling = 2;
}
+message DistSchedAllocateRequestProto {
+ optional AllocateRequestProto allocate_request = 1;
+ repeated ContainerProto allocated_containers = 2;
+}
+
message NodeLabelsProto {
repeated NodeLabelProto nodeLabels = 1;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
index 2b2a2f6..55c65f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
@@ -21,10 +21,10 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import org.apache.hadoop.conf.Configuration;
import com.google.common.base.Preconditions;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords
.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
@@ -118,8 +118,8 @@ public abstract class AbstractRequestInterceptor implements
* @throws IOException
*/
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling
- (AllocateRequest request) throws YarnException, IOException {
+ public DistSchedAllocateResponse allocateForDistributedScheduling(
+ DistSchedAllocateRequest request) throws YarnException, IOException {
return (this.nextInterceptor != null) ?
this.nextInterceptor.allocateForDistributedScheduling(request) : null;
}
@@ -135,9 +135,9 @@ public abstract class AbstractRequestInterceptor implements
*/
@Override
public DistSchedRegisterResponse
- registerApplicationMasterForDistributedScheduling
- (RegisterApplicationMasterRequest request) throws YarnException,
- IOException {
+ registerApplicationMasterForDistributedScheduling(
+ RegisterApplicationMasterRequest request)
+ throws YarnException, IOException {
return (this.nextInterceptor != null) ? this.nextInterceptor
.registerApplicationMasterForDistributedScheduling(request) : null;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
index 1637682..debff76 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
import org.apache.hadoop.yarn.server.api.ServerRMProxy;
import org.apache.hadoop.yarn.server.api.protocolrecords
.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -133,8 +134,8 @@ public final class DefaultRequestInterceptor extends
}
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling
- (AllocateRequest request) throws YarnException, IOException {
+ public DistSchedAllocateResponse allocateForDistributedScheduling(
+ DistSchedAllocateRequest request) throws YarnException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Forwarding allocateForDistributedScheduling request" +
"to the real YARN RM");
@@ -212,9 +213,9 @@ public final class DefaultRequestInterceptor extends
}
@Override
- public DistSchedAllocateResponse
- allocateForDistributedScheduling(AllocateRequest request) throws
- YarnException, IOException {
+ public DistSchedAllocateResponse allocateForDistributedScheduling(
+ DistSchedAllocateRequest request)
+ throws YarnException, IOException {
throw new IOException("Not Supported !!");
}
};
http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
index 8e2ceb0..10c1361 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
@@ -21,6 +21,9 @@ package org.apache.hadoop.yarn.server.nodemanager.scheduler;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords
@@ -99,6 +102,9 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
private static final Logger LOG = LoggerFactory
.getLogger(LocalScheduler.class);
+ private final static RecordFactory RECORD_FACTORY =
+ RecordFactoryProvider.getRecordFactory(null);
+
// Currently just used to keep track of allocated Containers
// Can be used for reporting stats later
private Set<ContainerId> containersAllocated = new HashSet<>();
@@ -176,7 +182,10 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
@Override
public AllocateResponse allocate(AllocateRequest request) throws
YarnException, IOException {
- return allocateForDistributedScheduling(request).getAllocateResponse();
+ DistSchedAllocateRequest distRequest =
+ RECORD_FACTORY.newRecordInstance(DistSchedAllocateRequest.class);
+ distRequest.setAllocateRequest(request);
+ return allocateForDistributedScheduling(distRequest).getAllocateResponse();
}
@Override
@@ -324,9 +333,9 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
@Override
public DistSchedRegisterResponse
- registerApplicationMasterForDistributedScheduling
- (RegisterApplicationMasterRequest request) throws YarnException,
- IOException {
+ registerApplicationMasterForDistributedScheduling(
+ RegisterApplicationMasterRequest request)
+ throws YarnException, IOException {
LOG.info("Forwarding registration request to the" +
"Distributed Scheduler Service on YARN RM");
DistSchedRegisterResponse dsResp = getNextInterceptor()
@@ -336,17 +345,18 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
}
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling
- (AllocateRequest request) throws YarnException, IOException {
+ public DistSchedAllocateResponse allocateForDistributedScheduling(
+ DistSchedAllocateRequest request) throws YarnException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Forwarding allocate request to the" +
"Distributed Scheduler Service on YARN RM");
}
// Partition requests into GUARANTEED and OPPORTUNISTIC reqs
- PartitionedResourceRequests partitionedAsks = partitionAskList(request
- .getAskList());
+ PartitionedResourceRequests partitionedAsks = partitionAskList(
+ request.getAllocateRequest().getAskList());
- List<ContainerId> releasedContainers = request.getReleaseList();
+ List<ContainerId> releasedContainers =
+ request.getAllocateRequest().getReleaseList();
int numReleasedContainers = releasedContainers.size();
if (numReleasedContainers > 0) {
LOG.info("AttemptID: " + applicationAttemptId + " released: "
@@ -355,7 +365,8 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
}
// Also, update black list
- ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest();
+ ResourceBlacklistRequest rbr =
+ request.getAllocateRequest().getResourceBlacklistRequest();
if (rbr != null) {
blacklist.removeAll(rbr.getBlacklistRemovals());
blacklist.addAll(rbr.getBlacklistAdditions());
@@ -381,9 +392,10 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
allocatedContainers.addAll(e.getValue());
}
}
+ request.setAllocatedContainers(allocatedContainers);
// Send all the GUARANTEED Reqs to RM
- request.setAskList(partitionedAsks.getGuaranteed());
+ request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed());
DistSchedAllocateResponse dsResp =
getNextInterceptor().allocateForDistributedScheduling(request);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
index a1d39f7..31f8085 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords
@@ -126,7 +127,7 @@ public class TestLocalScheduler {
Mockito.when(
finalReqIntcptr.allocateForDistributedScheduling(
- Mockito.any(AllocateRequest.class)))
+ Mockito.any(DistSchedAllocateRequest.class)))
.thenAnswer(new Answer<DistSchedAllocateResponse>() {
@Override
public DistSchedAllocateResponse answer(InvocationOnMock
http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
index a93f683..5aabddc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
@@ -32,6 +33,7 @@ import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBSer
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -45,6 +47,12 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor;
@@ -60,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretMan
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -229,9 +238,23 @@ public class DistributedSchedulingService extends ApplicationMasterService
}
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling
- (AllocateRequest request) throws YarnException, IOException {
- AllocateResponse response = allocate(request);
+ public DistSchedAllocateResponse allocateForDistributedScheduling(
+ DistSchedAllocateRequest request) throws YarnException, IOException {
+ List<Container> distAllocContainers = request.getAllocatedContainers();
+ for (Container container : distAllocContainers) {
+ // Create RMContainer
+ SchedulerApplicationAttempt appAttempt =
+ ((AbstractYarnScheduler) rmContext.getScheduler())
+ .getCurrentAttemptForContainer(container.getId());
+ RMContainer rmContainer = new RMContainerImpl(container,
+ appAttempt.getApplicationAttemptId(), container.getNodeId(),
+ appAttempt.getUser(), rmContext, true);
+ appAttempt.addRMContainer(container.getId(), rmContainer);
+ rmContainer.handle(
+ new RMContainerEvent(container.getId(),
+ RMContainerEventType.LAUNCHED));
+ }
+ AllocateResponse response = allocate(request.getAllocateRequest());
DistSchedAllocateResponse dsResp = recordFactory.newRecordInstance
(DistSchedAllocateResponse.class);
dsResp.setAllocateResponse(response);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
index f37923f..504c973 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -91,4 +92,14 @@ public interface RMContainer extends EventHandler<RMContainerEvent> {
void cancelIncreaseReservation();
String getQueueName();
+
+ ExecutionType getExecutionType();
+
+ /**
+ * If the container was allocated by a container other than the Resource
+ * Manager (e.g., the distributed scheduler in the NM
+ * <code>LocalScheduler</code>).
+ * @return If the container was allocated remotely.
+ */
+ boolean isRemotelyAllocated();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 95f81d4..ed819a0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -79,6 +80,8 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
RMContainerEventType.KILL)
.addTransition(RMContainerState.NEW, RMContainerState.RESERVED,
RMContainerEventType.RESERVED, new ContainerReservedTransition())
+ .addTransition(RMContainerState.NEW, RMContainerState.RUNNING,
+ RMContainerEventType.LAUNCHED)
.addTransition(RMContainerState.NEW,
EnumSet.of(RMContainerState.RUNNING, RMContainerState.COMPLETED),
RMContainerEventType.RECOVER, new ContainerRecoveredTransition())
@@ -183,6 +186,8 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
private Resource lastConfirmedResource;
private volatile String queueName;
+ private boolean isExternallyAllocated;
+
public RMContainerImpl(Container container,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
RMContext rmContext) {
@@ -190,6 +195,13 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
.currentTimeMillis(), "");
}
+ public RMContainerImpl(Container container,
+ ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
+ RMContext rmContext, boolean isExternallyAllocated) {
+ this(container, appAttemptId, nodeId, user, rmContext, System
+ .currentTimeMillis(), "", isExternallyAllocated);
+ }
+
private boolean saveNonAMContainerMetaInfo;
public RMContainerImpl(Container container,
@@ -202,6 +214,14 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
public RMContainerImpl(Container container,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
RMContext rmContext, long creationTime, String nodeLabelExpression) {
+ this(container, appAttemptId, nodeId, user, rmContext, creationTime,
+ nodeLabelExpression, false);
+ }
+
+ public RMContainerImpl(Container container,
+ ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
+ RMContext rmContext, long creationTime, String nodeLabelExpression,
+ boolean isExternallyAllocated) {
this.stateMachine = stateMachineFactory.make(this);
this.containerId = container.getId();
this.nodeId = nodeId;
@@ -216,6 +236,7 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
this.resourceRequests = null;
this.nodeLabelExpression = nodeLabelExpression;
this.lastConfirmedResource = container.getResource();
+ this.isExternallyAllocated = isExternallyAllocated;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
@@ -827,4 +848,14 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
public String getQueueName() {
return queueName;
}
+
+ @Override
+ public ExecutionType getExecutionType() {
+ return container.getExecutionType();
+ }
+
+ @Override
+ public boolean isRemotelyAllocated() {
+ return isExternallyAllocated;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 3066339..64eb777 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -516,7 +516,23 @@ public abstract class AbstractYarnScheduler
return;
}
- completedContainerInternal(rmContainer, containerStatus, event);
+ if (!rmContainer.isRemotelyAllocated()) {
+ completedContainerInternal(rmContainer, containerStatus, event);
+ } else {
+ ContainerId containerId = rmContainer.getContainerId();
+ // Inform the container
+ rmContainer.handle(
+ new RMContainerFinishedEvent(containerId, containerStatus, event));
+ SchedulerApplicationAttempt schedulerAttempt =
+ getCurrentAttemptForContainer(containerId);
+ if (schedulerAttempt != null) {
+ schedulerAttempt.removeRMContainer(containerId);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Completed container: " + rmContainer.getContainerId() +
+ " in state: " + rmContainer.getState() + " event:" + event);
+ }
+ }
// If the container is getting killed in ACQUIRED state, the requester (AM
// for regular containers and RM itself for AM container) will not know what
http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index b48b272..aae0770 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -112,6 +112,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
private boolean isAttemptRecovering;
protected ResourceUsage attemptResourceUsage = new ResourceUsage();
+ /** Scheduled by a remote scheduler. */
+ protected ResourceUsage attemptResourceUsageAllocatedRemotely =
+ new ResourceUsage();
private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0);
private AtomicLong firstContainerAllocatedTime = new AtomicLong(0);
@@ -288,6 +291,23 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
return liveContainers.get(id);
}
+ public synchronized void addRMContainer(
+ ContainerId id, RMContainer rmContainer) {
+ liveContainers.put(id, rmContainer);
+ if (rmContainer.isRemotelyAllocated()) {
+ this.attemptResourceUsageAllocatedRemotely.incUsed(
+ rmContainer.getAllocatedResource());
+ }
+ }
+
+ public synchronized void removeRMContainer(ContainerId containerId) {
+ RMContainer rmContainer = liveContainers.remove(containerId);
+ if (rmContainer != null && rmContainer.isRemotelyAllocated()) {
+ this.attemptResourceUsageAllocatedRemotely.decUsed(
+ rmContainer.getAllocatedResource());
+ }
+ }
+
protected synchronized void resetReReservations(Priority priority) {
reReservations.setCount(priority, 0);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5dd68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
index 7d2ed33..4716bab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
@@ -44,15 +44,17 @@ import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -175,10 +177,15 @@ public class TestDistributedSchedulingService {
Assert.assertEquals(2,
dsRegResp.getIncrAllocatableCapabilty().getVirtualCores());
+ DistSchedAllocateRequestPBImpl distAllReq =
+ (DistSchedAllocateRequestPBImpl)factory.newRecordInstance(
+ DistSchedAllocateRequest.class);
+ distAllReq.setAllocateRequest(allReq);
+ distAllReq.setAllocatedContainers(Arrays.asList(c));
DistSchedAllocateResponse dsAllocResp =
new DistSchedAllocateResponsePBImpl(
dsProxy.allocateForDistributedScheduling(null,
- ((AllocateRequestPBImpl)allReq).getProto()));
+ distAllReq.getProto()));
Assert.assertEquals(
"h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
@@ -243,8 +250,13 @@ public class TestDistributedSchedulingService {
@Override
public DistSchedAllocateResponse allocateForDistributedScheduling(
- AllocateRequest request) throws YarnException, IOException {
- List<ResourceRequest> askList = request.getAskList();
+ DistSchedAllocateRequest request) throws YarnException, IOException {
+ List<ResourceRequest> askList =
+ request.getAllocateRequest().getAskList();
+ List<Container> allocatedContainers = request.getAllocatedContainers();
+ Assert.assertEquals(1, allocatedContainers.size());
+ Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+ allocatedContainers.get(0).getExecutionType());
Assert.assertEquals(1, askList.size());
Assert.assertTrue(askList.get(0)
.getExecutionTypeRequest().getEnforceExecutionType());
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org