You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2017/09/12 03:54:30 UTC
hadoop git commit: YARN-7173. Container update RM-NM communication
fix for backward compatibility. (Arun Suresh via wangda)
Repository: hadoop
Updated Branches:
refs/heads/trunk fa531788f -> e74d1be04
YARN-7173. Container update RM-NM communication fix for backward compatibility. (Arun Suresh via wangda)
Change-Id: I1c39ed5c59dee739ba5044b61b3ef5ed203b79c1
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e74d1be0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e74d1be0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e74d1be0
Branch: refs/heads/trunk
Commit: e74d1be04be47969943b0501a4f335b0b5188287
Parents: fa53178
Author: Wangda Tan <wa...@apache.org>
Authored: Mon Sep 11 20:46:41 2017 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Mon Sep 11 20:52:08 2017 -0700
----------------------------------------------------------------------
.../protocolrecords/NodeHeartbeatResponse.java | 5 ++
.../impl/pb/NodeHeartbeatResponsePBImpl.java | 65 ++++++++++++++++++++
.../rmcontainer/RMContainerImpl.java | 4 +-
.../resourcemanager/rmnode/RMNodeImpl.java | 20 +++++-
.../rmnode/RMNodeUpdateContainerEvent.java | 9 +--
.../scheduler/SchedulerApplicationAttempt.java | 3 +-
6 files changed, 97 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e74d1be0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
index 2ebca57..05a9c72 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
@@ -113,4 +113,9 @@ public abstract class NodeHeartbeatResponse {
public abstract void setContainerQueuingLimit(
ContainerQueuingLimit containerQueuingLimit);
+
+ public abstract List<Container> getContainersToDecrease();
+
+ public abstract void addAllContainersToDecrease(
+ Collection<Container> containersToDecrease);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e74d1be0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index 11f5f61..bbd1294 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -80,6 +80,8 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
private MasterKey nmTokenMasterKey = null;
private ContainerQueuingLimit containerQueuingLimit = null;
private List<Container> containersToUpdate = null;
+ // NOTE: This is required for backward compatibility.
+ private List<Container> containersToDecrease = null;
private List<SignalContainerRequest> containersToSignal = null;
public NodeHeartbeatResponsePBImpl() {
@@ -126,6 +128,9 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
if (this.containersToUpdate != null) {
addContainersToUpdateToProto();
}
+ if (this.containersToDecrease != null) {
+ addContainersToDecreaseToProto();
+ }
if (this.containersToSignal != null) {
addContainersToSignalToProto();
}
@@ -572,6 +577,66 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
builder.addAllContainersToUpdate(iterable);
}
+ private void initContainersToDecrease() {
+ if (this.containersToDecrease != null) {
+ return;
+ }
+ NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+ List<ContainerProto> list = p.getContainersToDecreaseList();
+ this.containersToDecrease = new ArrayList<>();
+
+ for (ContainerProto c : list) {
+ this.containersToDecrease.add(convertFromProtoFormat(c));
+ }
+ }
+
+ @Override
+ public List<Container> getContainersToDecrease() {
+ initContainersToDecrease();
+ return this.containersToDecrease;
+ }
+
+ @Override
+ public void addAllContainersToDecrease(
+ final Collection<Container> containersToDecrease) {
+ if (containersToDecrease == null) {
+ return;
+ }
+ initContainersToDecrease();
+ this.containersToDecrease.addAll(containersToDecrease);
+ }
+
+ private void addContainersToDecreaseToProto() {
+ maybeInitBuilder();
+ builder.clearContainersToDecrease();
+ if (this.containersToDecrease == null) {
+ return;
+ }
+
+ Iterable<ContainerProto> iterable = new
+ Iterable<ContainerProto>() {
+ @Override
+ public Iterator<ContainerProto> iterator() {
+ return new Iterator<ContainerProto>() {
+ private Iterator<Container> iter = containersToDecrease.iterator();
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+ @Override
+ public ContainerProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ builder.addAllContainersToDecrease(iterable);
+ }
+
@Override
public Map<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
if (this.systemCredentials != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e74d1be0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index f49db7e..8aa5788 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -641,7 +642,8 @@ public class RMContainerImpl implements RMContainer {
new AllocationExpirationInfo(event.getContainerId()));
container.eventHandler.handle(new RMNodeUpdateContainerEvent(
container.nodeId,
- Collections.singletonList(container.getContainer())));
+ Collections.singletonMap(container.getContainer(),
+ ContainerUpdateType.DECREASE_RESOURCE)));
} else if (Resources.fitsIn(nmContainerResource, rmContainerResource)) {
// If nmContainerResource < rmContainerResource, this is caused by the
// following sequence:
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e74d1be0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index d270aa3..c547128 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
@@ -173,7 +174,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
private final Map<ContainerId, Container> toBeUpdatedContainers =
new HashMap<>();
-
+
+ // NOTE: This is required for backward compatibility.
+ private final Map<ContainerId, Container> toBeDecreasedContainers =
+ new HashMap<>();
+
private final Map<ContainerId, Container> nmReportedIncreasedContainers =
new HashMap<>();
@@ -626,6 +631,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
try {
response.addAllContainersToUpdate(toBeUpdatedContainers.values());
toBeUpdatedContainers.clear();
+
+ // NOTE: This is required for backward compatibility.
+ response.addAllContainersToDecrease(toBeDecreasedContainers.values());
+ toBeDecreasedContainers.clear();
} finally {
this.writeLock.unlock();
}
@@ -1043,8 +1052,13 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
RMNodeUpdateContainerEvent de = (RMNodeUpdateContainerEvent) event;
- for (Container c : de.getToBeUpdatedContainers()) {
- rmNode.toBeUpdatedContainers.put(c.getId(), c);
+ for (Map.Entry<Container, ContainerUpdateType> e :
+ de.getToBeUpdatedContainers().entrySet()) {
+ // NOTE: This is required for backward compatibility.
+ if (ContainerUpdateType.DECREASE_RESOURCE == e.getValue()) {
+ rmNode.toBeDecreasedContainers.put(e.getKey().getId(), e.getKey());
+ }
+ rmNode.toBeUpdatedContainers.put(e.getKey().getId(), e.getKey());
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e74d1be0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeUpdateContainerEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeUpdateContainerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeUpdateContainerEvent.java
index 73af563..b8f8e73 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeUpdateContainerEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeUpdateContainerEvent.java
@@ -19,8 +19,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.NodeId;
/**
@@ -29,16 +31,15 @@ import org.apache.hadoop.yarn.api.records.NodeId;
*
*/
public class RMNodeUpdateContainerEvent extends RMNodeEvent {
- private List<Container> toBeUpdatedContainers;
+ private Map<Container, ContainerUpdateType> toBeUpdatedContainers;
public RMNodeUpdateContainerEvent(NodeId nodeId,
- List<Container> toBeUpdatedContainers) {
+ Map<Container, ContainerUpdateType> toBeUpdatedContainers) {
super(nodeId, RMNodeEventType.UPDATE_CONTAINER);
-
this.toBeUpdatedContainers = toBeUpdatedContainers;
}
- public List<Container> getToBeUpdatedContainers() {
+ public Map<Container, ContainerUpdateType> getToBeUpdatedContainers() {
return toBeUpdatedContainers;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e74d1be0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 6a44cae..c807590 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -690,7 +690,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
if (autoUpdate) {
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeUpdateContainerEvent(rmContainer.getNodeId(),
- Collections.singletonList(rmContainer.getContainer())));
+ Collections.singletonMap(
+ rmContainer.getContainer(), updateType)));
} else {
rmContainer.handle(new RMContainerUpdatesAcquiredEvent(
rmContainer.getContainerId(),
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org