You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2016/06/01 19:55:15 UTC
[14/39] hadoop git commit: YARN-5127. Expose ExecutionType in
Container api record. (Hitesh Sharma via asuresh)
YARN-5127. Expose ExecutionType in Container api record. (Hitesh Sharma via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/aa975bc7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/aa975bc7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/aa975bc7
Branch: refs/heads/HDFS-1312
Commit: aa975bc7811fc7c52b814ad9635bff8c2d34655b
Parents: 5ea6fd8
Author: Arun Suresh <as...@apache.org>
Authored: Fri May 27 14:06:32 2016 -0700
Committer: Arun Suresh <as...@apache.org>
Committed: Fri May 27 14:06:32 2016 -0700
----------------------------------------------------------------------
.../hadoop/yarn/api/records/Container.java | 26 ++++
.../src/main/proto/yarn_protos.proto | 1 +
.../api/records/impl/pb/ContainerPBImpl.java | 26 +++-
.../hadoop/yarn/server/utils/BuilderUtils.java | 10 +-
.../OpportunisticContainerAllocator.java | 3 +-
.../TestDistributedSchedulingService.java | 135 +++++++++++--------
6 files changed, 141 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa975bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java
index 38fa8b9..9a62935 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java
@@ -66,6 +66,15 @@ public abstract class Container implements Comparable<Container> {
public static Container newInstance(ContainerId containerId, NodeId nodeId,
String nodeHttpAddress, Resource resource, Priority priority,
Token containerToken) {
+ return newInstance(containerId, nodeId, nodeHttpAddress, resource, priority,
+ containerToken, ExecutionType.GUARANTEED);
+ }
+
+ @Private
+ @Unstable
+ public static Container newInstance(ContainerId containerId, NodeId nodeId,
+ String nodeHttpAddress, Resource resource, Priority priority,
+ Token containerToken, ExecutionType executionType) {
Container container = Records.newRecord(Container.class);
container.setId(containerId);
container.setNodeId(nodeId);
@@ -73,6 +82,7 @@ public abstract class Container implements Comparable<Container> {
container.setResource(resource);
container.setPriority(priority);
container.setContainerToken(containerToken);
+ container.setExecutionType(executionType);
return container;
}
@@ -163,4 +173,20 @@ public abstract class Container implements Comparable<Container> {
@Private
@Unstable
public abstract void setContainerToken(Token containerToken);
+
+ /**
+ * Get the <code>ExecutionType</code> for the container.
+ * @return <code>ExecutionType</code> for the container.
+ */
+ @Private
+ @Unstable
+ public abstract ExecutionType getExecutionType();
+
+ /**
+ * Set the <code>ExecutionType</code> for the container.
+ * @param executionType ExecutionType
+ */
+ @Private
+ @Unstable
+ public abstract void setExecutionType(ExecutionType executionType);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa975bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 60cdfd1..814c5bb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -92,6 +92,7 @@ message ContainerProto {
optional ResourceProto resource = 4;
optional PriorityProto priority = 5;
optional hadoop.common.TokenProto container_token = 6;
+ optional ExecutionTypeProto execution_type = 7 [default = GUARANTEED];
}
message ContainerReportProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa975bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java
index 1700068..bd2d937 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -33,6 +34,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto;
@Private
@Unstable
@@ -47,7 +49,7 @@ public class ContainerPBImpl extends Container {
private Resource resource = null;
private Priority priority = null;
private Token containerToken = null;
-
+
public ContainerPBImpl() {
builder = ContainerProto.newBuilder();
}
@@ -248,6 +250,18 @@ public class ContainerPBImpl extends Container {
this.containerToken = containerToken;
}
+ @Override
+ public ExecutionType getExecutionType() {
+ ContainerProtoOrBuilder p = viaProto ? proto : builder;
+ return convertFromProtoFormat(p.getExecutionType());
+ }
+
+ @Override
+ public void setExecutionType(ExecutionType executionType) {
+ maybeInitBuilder();
+ builder.setExecutionType(convertToProtoFormat(executionType));
+ }
+
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
return new ContainerIdPBImpl(p);
}
@@ -288,6 +302,15 @@ public class ContainerPBImpl extends Container {
return ((TokenPBImpl)t).getProto();
}
+ private ExecutionType convertFromProtoFormat(
+ ExecutionTypeProto e) {
+ return ProtoUtils.convertFromProtoFormat(e);
+ }
+
+ private ExecutionTypeProto convertToProtoFormat(ExecutionType e) {
+ return ProtoUtils.convertToProtoFormat(e);
+ }
+
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Container: [");
@@ -297,6 +320,7 @@ public class ContainerPBImpl extends Container {
sb.append("Resource: ").append(getResource()).append(", ");
sb.append("Priority: ").append(getPriority()).append(", ");
sb.append("Token: ").append(getContainerToken()).append(", ");
+ sb.append("ExecutionType: ").append(getExecutionType()).append(", ");
sb.append("]");
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa975bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
index a70d143..b97f935 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
@@ -236,7 +236,7 @@ public class BuilderUtils {
public static Container newContainer(ContainerId containerId, NodeId nodeId,
String nodeHttpAddress, Resource resource, Priority priority,
- Token containerToken) {
+ Token containerToken, ExecutionType executionType) {
Container container = recordFactory.newRecordInstance(Container.class);
container.setId(containerId);
container.setNodeId(nodeId);
@@ -244,9 +244,17 @@ public class BuilderUtils {
container.setResource(resource);
container.setPriority(priority);
container.setContainerToken(containerToken);
+ container.setExecutionType(executionType);
return container;
}
+ public static Container newContainer(ContainerId containerId, NodeId nodeId,
+ String nodeHttpAddress, Resource resource, Priority priority,
+ Token containerToken) {
+ return newContainer(containerId, nodeId, nodeHttpAddress, resource,
+ priority, containerToken, ExecutionType.GUARANTEED);
+ }
+
public static <T extends Token> T newToken(Class<T> tokenClass,
byte[] identifier, String kind, byte[] password, String service) {
T token = recordFactory.newRecordInstance(tokenClass);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa975bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
index e33c389..22a6a24 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
@@ -160,7 +160,8 @@ public class OpportunisticContainerAllocator {
containerTokenIdentifier);
Container container = BuilderUtils.newContainer(
cId, nodeId, nodeId.getHost() + ":" + webpagePort,
- capability, rr.getPriority(), containerToken);
+ capability, rr.getPriority(), containerToken,
+ containerTokenIdentifier.getExecutionType());
return container;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa975bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
index 47563d5..1982776 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
@@ -35,6 +35,11 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
.RegisterApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
.RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -66,6 +71,7 @@ import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
+import java.util.List;
public class TestDistributedSchedulingService {
@@ -92,63 +98,13 @@ public class TestDistributedSchedulingService {
return new YarnConfiguration();
}
};
- DistributedSchedulingService service =
- new DistributedSchedulingService(rmContext, null) {
- @Override
- public RegisterApplicationMasterResponse registerApplicationMaster
- (RegisterApplicationMasterRequest request) throws
- YarnException, IOException {
- RegisterApplicationMasterResponse resp = factory.newRecordInstance(
- RegisterApplicationMasterResponse.class);
- // Dummy Entry to Assert that we get this object back
- resp.setQueue("dummyQueue");
- return resp;
- }
-
- @Override
- public FinishApplicationMasterResponse finishApplicationMaster
- (FinishApplicationMasterRequest request) throws YarnException,
- IOException {
- FinishApplicationMasterResponse resp = factory.newRecordInstance(
- FinishApplicationMasterResponse.class);
- // Dummy Entry to Assert that we get this object back
- resp.setIsUnregistered(false);
- return resp;
- }
-
- @Override
- public AllocateResponse allocate(AllocateRequest request) throws
- YarnException, IOException {
- AllocateResponse response = factory.newRecordInstance
- (AllocateResponse.class);
- response.setNumClusterNodes(12345);
- return response;
- }
-
- @Override
- public DistSchedRegisterResponse
- registerApplicationMasterForDistributedScheduling
- (RegisterApplicationMasterRequest request) throws
- YarnException, IOException {
- DistSchedRegisterResponse resp = factory.newRecordInstance(
- DistSchedRegisterResponse.class);
- resp.setContainerIdStart(54321l);
- resp.setMaxAllocatableCapabilty(Resource.newInstance(4096, 4));
- resp.setMinAllocatableCapabilty(Resource.newInstance(1024, 1));
- resp.setIncrAllocatableCapabilty(Resource.newInstance(2048, 2));
- return resp;
- }
-
- @Override
- public DistSchedAllocateResponse allocateForDistributedScheduling
- (AllocateRequest request) throws YarnException, IOException {
- DistSchedAllocateResponse resp =
- factory.newRecordInstance(DistSchedAllocateResponse.class);
- resp.setNodesForScheduling(
- Arrays.asList(NodeId.newInstance("h1", 1234)));
- return resp;
- }
- };
+ Container c = factory.newRecordInstance(Container.class);
+ c.setExecutionType(ExecutionType.OPPORTUNISTIC);
+ c.setId(
+ ContainerId.newContainerId(
+ ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(12345, 1), 2), 3));
+ DistributedSchedulingService service = createService(factory, rmContext, c);
Server server = service.getServer(rpc, conf, addr, null);
server.start();
@@ -180,6 +136,10 @@ public class TestDistributedSchedulingService {
((AllocateRequestPBImpl)factory
.newRecordInstance(AllocateRequest.class)).getProto())
);
+ List<Container> allocatedContainers = allocResp.getAllocatedContainers();
+ Assert.assertEquals(1, allocatedContainers.size());
+ Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+ allocatedContainers.get(0).getExecutionType());
Assert.assertEquals(12345, allocResp.getNumClusterNodes());
@@ -222,4 +182,65 @@ public class TestDistributedSchedulingService {
Assert.assertEquals(
false, dsfinishResp.getIsUnregistered());
}
+
+ private DistributedSchedulingService createService(final RecordFactory
+ factory, final RMContext rmContext, final Container c) {
+ return new DistributedSchedulingService(rmContext, null) {
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ RegisterApplicationMasterRequest request) throws
+ YarnException, IOException {
+ RegisterApplicationMasterResponse resp = factory.newRecordInstance(
+ RegisterApplicationMasterResponse.class);
+ // Dummy Entry to Assert that we get this object back
+ resp.setQueue("dummyQueue");
+ return resp;
+ }
+
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ FinishApplicationMasterRequest request) throws YarnException,
+ IOException {
+ FinishApplicationMasterResponse resp = factory.newRecordInstance(
+ FinishApplicationMasterResponse.class);
+ // Dummy Entry to Assert that we get this object back
+ resp.setIsUnregistered(false);
+ return resp;
+ }
+
+ @Override
+ public AllocateResponse allocate(AllocateRequest request) throws
+ YarnException, IOException {
+ AllocateResponse response = factory.newRecordInstance(
+ AllocateResponse.class);
+ response.setNumClusterNodes(12345);
+ response.setAllocatedContainers(Arrays.asList(c));
+ return response;
+ }
+
+ @Override
+ public DistSchedRegisterResponse
+ registerApplicationMasterForDistributedScheduling(
+ RegisterApplicationMasterRequest request) throws
+ YarnException, IOException {
+ DistSchedRegisterResponse resp = factory.newRecordInstance(
+ DistSchedRegisterResponse.class);
+ resp.setContainerIdStart(54321L);
+ resp.setMaxAllocatableCapabilty(Resource.newInstance(4096, 4));
+ resp.setMinAllocatableCapabilty(Resource.newInstance(1024, 1));
+ resp.setIncrAllocatableCapabilty(Resource.newInstance(2048, 2));
+ return resp;
+ }
+
+ @Override
+ public DistSchedAllocateResponse allocateForDistributedScheduling(
+ AllocateRequest request) throws YarnException, IOException {
+ DistSchedAllocateResponse resp =
+ factory.newRecordInstance(DistSchedAllocateResponse.class);
+ resp.setNodesForScheduling(
+ Arrays.asList(NodeId.newInstance("h1", 1234)));
+ return resp;
+ }
+ };
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org