You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2016/06/01 19:55:15 UTC

[14/39] hadoop git commit: YARN-5127. Expose ExecutionType in Container api record. (Hitesh Sharma via asuresh)

YARN-5127. Expose ExecutionType in Container api record. (Hitesh Sharma via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/aa975bc7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/aa975bc7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/aa975bc7

Branch: refs/heads/HDFS-1312
Commit: aa975bc7811fc7c52b814ad9635bff8c2d34655b
Parents: 5ea6fd8
Author: Arun Suresh <as...@apache.org>
Authored: Fri May 27 14:06:32 2016 -0700
Committer: Arun Suresh <as...@apache.org>
Committed: Fri May 27 14:06:32 2016 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/api/records/Container.java      |  26 ++++
 .../src/main/proto/yarn_protos.proto            |   1 +
 .../api/records/impl/pb/ContainerPBImpl.java    |  26 +++-
 .../hadoop/yarn/server/utils/BuilderUtils.java  |  10 +-
 .../OpportunisticContainerAllocator.java        |   3 +-
 .../TestDistributedSchedulingService.java       | 135 +++++++++++--------
 6 files changed, 141 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa975bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java
index 38fa8b9..9a62935 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java
@@ -66,6 +66,15 @@ public abstract class Container implements Comparable<Container> {
   public static Container newInstance(ContainerId containerId, NodeId nodeId,
       String nodeHttpAddress, Resource resource, Priority priority,
       Token containerToken) {
+    return newInstance(containerId, nodeId, nodeHttpAddress, resource, priority,
+        containerToken, ExecutionType.GUARANTEED);
+  }
+
+  @Private
+  @Unstable
+  public static Container newInstance(ContainerId containerId, NodeId nodeId,
+      String nodeHttpAddress, Resource resource, Priority priority,
+      Token containerToken, ExecutionType executionType) {
     Container container = Records.newRecord(Container.class);
     container.setId(containerId);
     container.setNodeId(nodeId);
@@ -73,6 +82,7 @@ public abstract class Container implements Comparable<Container> {
     container.setResource(resource);
     container.setPriority(priority);
     container.setContainerToken(containerToken);
+    container.setExecutionType(executionType);
     return container;
   }
 
@@ -163,4 +173,20 @@ public abstract class Container implements Comparable<Container> {
   @Private
   @Unstable
   public abstract void setContainerToken(Token containerToken);
+
+  /**
+   * Get the <code>ExecutionType</code> for the container.
+   * @return <code>ExecutionType</code> for the container.
+   */
+  @Private
+  @Unstable
+  public abstract ExecutionType getExecutionType();
+
+  /**
+   * Set the <code>ExecutionType</code> for the container.
+   * @param executionType ExecutionType
+   */
+  @Private
+  @Unstable
+  public abstract void setExecutionType(ExecutionType executionType);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa975bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 60cdfd1..814c5bb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -92,6 +92,7 @@ message ContainerProto {
   optional ResourceProto resource = 4;
   optional PriorityProto priority = 5;
   optional hadoop.common.TokenProto container_token = 6;
+  optional ExecutionTypeProto execution_type = 7 [default = GUARANTEED];
 }
 
 message ContainerReportProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa975bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java
index 1700068..bd2d937 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -33,6 +34,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProtoOrBuilder;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto;
 
 @Private
 @Unstable
@@ -47,7 +49,7 @@ public class ContainerPBImpl extends Container {
   private Resource resource = null;
   private Priority priority = null;
   private Token containerToken = null;
-  
+
   public ContainerPBImpl() {
     builder = ContainerProto.newBuilder();
   }
@@ -248,6 +250,18 @@ public class ContainerPBImpl extends Container {
     this.containerToken = containerToken;
   }
 
+  @Override
+  public ExecutionType getExecutionType() {
+    ContainerProtoOrBuilder p = viaProto ? proto : builder;
+    return convertFromProtoFormat(p.getExecutionType());
+  }
+
+  @Override
+  public void setExecutionType(ExecutionType executionType) {
+    maybeInitBuilder();
+    builder.setExecutionType(convertToProtoFormat(executionType));
+  }
+
   private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
     return new ContainerIdPBImpl(p);
   }
@@ -288,6 +302,15 @@ public class ContainerPBImpl extends Container {
     return ((TokenPBImpl)t).getProto();
   }
 
+  private ExecutionType convertFromProtoFormat(
+      ExecutionTypeProto e) {
+    return ProtoUtils.convertFromProtoFormat(e);
+  }
+
+  private ExecutionTypeProto convertToProtoFormat(ExecutionType e) {
+    return ProtoUtils.convertToProtoFormat(e);
+  }
+
   public String toString() {
     StringBuilder sb = new StringBuilder();
     sb.append("Container: [");
@@ -297,6 +320,7 @@ public class ContainerPBImpl extends Container {
     sb.append("Resource: ").append(getResource()).append(", ");
     sb.append("Priority: ").append(getPriority()).append(", ");
     sb.append("Token: ").append(getContainerToken()).append(", ");
+    sb.append("ExecutionType: ").append(getExecutionType()).append(", ");
     sb.append("]");
     return sb.toString();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa975bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
index a70d143..b97f935 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
@@ -236,7 +236,7 @@ public class BuilderUtils {
 
   public static Container newContainer(ContainerId containerId, NodeId nodeId,
       String nodeHttpAddress, Resource resource, Priority priority,
-      Token containerToken) {
+      Token containerToken, ExecutionType executionType) {
     Container container = recordFactory.newRecordInstance(Container.class);
     container.setId(containerId);
     container.setNodeId(nodeId);
@@ -244,9 +244,17 @@ public class BuilderUtils {
     container.setResource(resource);
     container.setPriority(priority);
     container.setContainerToken(containerToken);
+    container.setExecutionType(executionType);
     return container;
   }
 
+  public static Container newContainer(ContainerId containerId, NodeId nodeId,
+      String nodeHttpAddress, Resource resource, Priority priority,
+      Token containerToken) {
+    return newContainer(containerId, nodeId, nodeHttpAddress, resource,
+        priority, containerToken, ExecutionType.GUARANTEED);
+  }
+
   public static <T extends Token> T newToken(Class<T> tokenClass,
       byte[] identifier, String kind, byte[] password, String service) {
     T token = recordFactory.newRecordInstance(tokenClass);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa975bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
index e33c389..22a6a24 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
@@ -160,7 +160,8 @@ public class OpportunisticContainerAllocator {
         containerTokenIdentifier);
     Container container = BuilderUtils.newContainer(
         cId, nodeId, nodeId.getHost() + ":" + webpagePort,
-        capability, rr.getPriority(), containerToken);
+        capability, rr.getPriority(), containerToken,
+        containerTokenIdentifier.getExecutionType());
     return container;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa975bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
index 47563d5..1982776 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
@@ -35,6 +35,11 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
     .RegisterApplicationMasterRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
     .RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -66,6 +71,7 @@ import org.junit.Test;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
+import java.util.List;
 
 public class TestDistributedSchedulingService {
 
@@ -92,63 +98,13 @@ public class TestDistributedSchedulingService {
         return new YarnConfiguration();
       }
     };
-    DistributedSchedulingService service =
-        new DistributedSchedulingService(rmContext, null) {
-          @Override
-          public RegisterApplicationMasterResponse registerApplicationMaster
-              (RegisterApplicationMasterRequest request) throws
-              YarnException, IOException {
-            RegisterApplicationMasterResponse resp = factory.newRecordInstance(
-                RegisterApplicationMasterResponse.class);
-            // Dummy Entry to Assert that we get this object back
-            resp.setQueue("dummyQueue");
-            return resp;
-          }
-
-          @Override
-          public FinishApplicationMasterResponse finishApplicationMaster
-              (FinishApplicationMasterRequest request) throws YarnException,
-              IOException {
-            FinishApplicationMasterResponse resp = factory.newRecordInstance(
-                FinishApplicationMasterResponse.class);
-            // Dummy Entry to Assert that we get this object back
-            resp.setIsUnregistered(false);
-            return resp;
-          }
-
-          @Override
-          public AllocateResponse allocate(AllocateRequest request) throws
-              YarnException, IOException {
-            AllocateResponse response = factory.newRecordInstance
-                (AllocateResponse.class);
-            response.setNumClusterNodes(12345);
-            return response;
-          }
-
-          @Override
-          public DistSchedRegisterResponse
-          registerApplicationMasterForDistributedScheduling
-              (RegisterApplicationMasterRequest request) throws
-              YarnException, IOException {
-            DistSchedRegisterResponse resp = factory.newRecordInstance(
-                DistSchedRegisterResponse.class);
-            resp.setContainerIdStart(54321l);
-            resp.setMaxAllocatableCapabilty(Resource.newInstance(4096, 4));
-            resp.setMinAllocatableCapabilty(Resource.newInstance(1024, 1));
-            resp.setIncrAllocatableCapabilty(Resource.newInstance(2048, 2));
-            return resp;
-          }
-
-          @Override
-          public DistSchedAllocateResponse allocateForDistributedScheduling
-              (AllocateRequest request) throws YarnException, IOException {
-            DistSchedAllocateResponse resp =
-                factory.newRecordInstance(DistSchedAllocateResponse.class);
-            resp.setNodesForScheduling(
-                Arrays.asList(NodeId.newInstance("h1", 1234)));
-            return resp;
-          }
-        };
+    Container c = factory.newRecordInstance(Container.class);
+    c.setExecutionType(ExecutionType.OPPORTUNISTIC);
+    c.setId(
+        ContainerId.newContainerId(
+            ApplicationAttemptId.newInstance(
+                ApplicationId.newInstance(12345, 1), 2), 3));
+    DistributedSchedulingService service = createService(factory, rmContext, c);
     Server server = service.getServer(rpc, conf, addr, null);
     server.start();
 
@@ -180,6 +136,10 @@ public class TestDistributedSchedulingService {
                 ((AllocateRequestPBImpl)factory
                     .newRecordInstance(AllocateRequest.class)).getProto())
         );
+    List<Container> allocatedContainers = allocResp.getAllocatedContainers();
+    Assert.assertEquals(1, allocatedContainers.size());
+    Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+        allocatedContainers.get(0).getExecutionType());
     Assert.assertEquals(12345, allocResp.getNumClusterNodes());
 
 
@@ -222,4 +182,65 @@ public class TestDistributedSchedulingService {
     Assert.assertEquals(
         false, dsfinishResp.getIsUnregistered());
   }
+
+  private DistributedSchedulingService createService(final RecordFactory
+      factory, final RMContext rmContext, final Container c) {
+    return new DistributedSchedulingService(rmContext, null) {
+      @Override
+      public RegisterApplicationMasterResponse registerApplicationMaster(
+          RegisterApplicationMasterRequest request) throws
+          YarnException, IOException {
+        RegisterApplicationMasterResponse resp = factory.newRecordInstance(
+            RegisterApplicationMasterResponse.class);
+        // Dummy Entry to Assert that we get this object back
+        resp.setQueue("dummyQueue");
+        return resp;
+      }
+
+      @Override
+      public FinishApplicationMasterResponse finishApplicationMaster(
+          FinishApplicationMasterRequest request) throws YarnException,
+          IOException {
+        FinishApplicationMasterResponse resp = factory.newRecordInstance(
+            FinishApplicationMasterResponse.class);
+        // Dummy Entry to Assert that we get this object back
+        resp.setIsUnregistered(false);
+        return resp;
+      }
+
+      @Override
+      public AllocateResponse allocate(AllocateRequest request) throws
+          YarnException, IOException {
+        AllocateResponse response = factory.newRecordInstance(
+            AllocateResponse.class);
+        response.setNumClusterNodes(12345);
+        response.setAllocatedContainers(Arrays.asList(c));
+        return response;
+      }
+
+      @Override
+      public DistSchedRegisterResponse
+          registerApplicationMasterForDistributedScheduling(
+          RegisterApplicationMasterRequest request) throws
+          YarnException, IOException {
+        DistSchedRegisterResponse resp = factory.newRecordInstance(
+            DistSchedRegisterResponse.class);
+        resp.setContainerIdStart(54321L);
+        resp.setMaxAllocatableCapabilty(Resource.newInstance(4096, 4));
+        resp.setMinAllocatableCapabilty(Resource.newInstance(1024, 1));
+        resp.setIncrAllocatableCapabilty(Resource.newInstance(2048, 2));
+        return resp;
+      }
+
+      @Override
+      public DistSchedAllocateResponse allocateForDistributedScheduling(
+          AllocateRequest request) throws YarnException, IOException {
+        DistSchedAllocateResponse resp =
+            factory.newRecordInstance(DistSchedAllocateResponse.class);
+        resp.setNodesForScheduling(
+            Arrays.asList(NodeId.newInstance("h1", 1234)));
+        return resp;
+      }
+    };
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org