You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2016/10/29 09:04:50 UTC
hadoop git commit: YARN-5799. Fix Opportunistic Allocation to set the
correct value of Node Http Address. (asuresh)
Repository: hadoop
Updated Branches:
refs/heads/trunk 1c8ab41e8 -> aa3cab1eb
YARN-5799. Fix Opportunistic Allocation to set the correct value of Node Http Address. (asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/aa3cab1e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/aa3cab1e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/aa3cab1e
Branch: refs/heads/trunk
Commit: aa3cab1eb29c56368d15882d7260a994e615e8d8
Parents: 1c8ab41
Author: Arun Suresh <as...@apache.org>
Authored: Sat Oct 29 02:03:57 2016 -0700
Committer: Arun Suresh <as...@apache.org>
Committed: Sat Oct 29 02:03:57 2016 -0700
----------------------------------------------------------------------
.../DistributedSchedulingAllocateResponse.java | 6 +-
...RegisterDistributedSchedulingAMResponse.java | 6 +-
.../server/api/protocolrecords/RemoteNode.java | 90 +++++++++++++
...ributedSchedulingAllocateResponsePBImpl.java | 41 +++---
...erDistributedSchedulingAMResponsePBImpl.java | 39 +++---
.../impl/pb/RemoteNodePBImpl.java | 135 +++++++++++++++++++
.../OpportunisticContainerAllocator.java | 24 ++--
.../OpportunisticContainerContext.java | 14 +-
.../yarn_server_common_service_protos.proto | 9 +-
.../yarn/server/nodemanager/NodeManager.java | 2 +-
.../scheduler/DistributedScheduler.java | 3 +-
.../scheduler/TestDistributedScheduler.java | 20 ++-
...pportunisticContainerAllocatorAMService.java | 30 ++++-
...pportunisticContainerAllocatorAMService.java | 6 +-
14 files changed, 343 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulingAllocateResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulingAllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulingAllocateResponse.java
index 7a40449..edc0cf8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulingAllocateResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulingAllocateResponse.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.Records;
import java.util.List;
@@ -58,9 +57,10 @@ public abstract class DistributedSchedulingAllocateResponse {
@Public
@Unstable
- public abstract void setNodesForScheduling(List<NodeId> nodesForScheduling);
+ public abstract void setNodesForScheduling(
+ List<RemoteNode> nodesForScheduling);
@Public
@Unstable
- public abstract List<NodeId> getNodesForScheduling();
+ public abstract List<RemoteNode> getNodesForScheduling();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterDistributedSchedulingAMResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterDistributedSchedulingAMResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterDistributedSchedulingAMResponse.java
index a0a0e38..f7d8df2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterDistributedSchedulingAMResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterDistributedSchedulingAMResponse.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Records;
@@ -99,10 +98,11 @@ public abstract class RegisterDistributedSchedulingAMResponse {
@Public
@Unstable
- public abstract void setNodesForScheduling(List<NodeId> nodesForScheduling);
+ public abstract void setNodesForScheduling(
+ List<RemoteNode> nodesForScheduling);
@Public
@Unstable
- public abstract List<NodeId> getNodesForScheduling();
+ public abstract List<RemoteNode> getNodesForScheduling();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
new file mode 100644
index 0000000..2b76257
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * This class is used to encapsulate the {@link NodeId} as well as the HTTP
+ * address that can be used to communicate with the Node.
+ */
+@Private
+@Unstable
+public abstract class RemoteNode implements Comparable<RemoteNode> {
+
+ /**
+ * Create new Instance.
+ * @param nodeId NodeId.
+ * @param httpAddress Http address.
+ * @return RemoteNode instance.
+ */
+ @Private
+ @Unstable
+ public static RemoteNode newInstance(NodeId nodeId, String httpAddress) {
+ RemoteNode remoteNode = Records.newRecord(RemoteNode.class);
+ remoteNode.setNodeId(nodeId);
+ remoteNode.setHttpAddress(httpAddress);
+ return remoteNode;
+ }
+
+ /**
+ * Get {@link NodeId}.
+ * @return NodeId.
+ */
+ @Private
+ @Unstable
+ public abstract NodeId getNodeId();
+
+ /**
+ * Set {@link NodeId}.
+ * @param nodeId NodeId.
+ */
+ @Private
+ @Unstable
+ public abstract void setNodeId(NodeId nodeId);
+
+ /**
+ * Get HTTP address.
+ * @return Http Address.
+ */
+ @Private
+ @Unstable
+ public abstract String getHttpAddress();
+
+ /**
+ * Set HTTP address.
+ * @param httpAddress HTTP address.
+ */
+ @Private
+ @Unstable
+ public abstract void setHttpAddress(String httpAddress);
+
+ /**
+ * Use the underlying {@link NodeId} comparator.
+ * @param other RemoteNode.
+ * @return Comparison.
+ */
+ @Override
+ public int compareTo(RemoteNode other) {
+ return this.getNodeId().compareTo(other.getNodeId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulingAllocateResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulingAllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulingAllocateResponsePBImpl.java
index 18d5073..8c48b61 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulingAllocateResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulingAllocateResponsePBImpl.java
@@ -21,12 +21,13 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
-import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RemoteNodeProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
+
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -45,7 +46,7 @@ public class DistributedSchedulingAllocateResponsePBImpl extends
boolean viaProto = false;
private AllocateResponse allocateResponse;
- private List<NodeId> nodesForScheduling;
+ private List<RemoteNode> nodesForScheduling;
public DistributedSchedulingAllocateResponsePBImpl() {
builder = YarnServerCommonServiceProtos.
@@ -86,8 +87,8 @@ public class DistributedSchedulingAllocateResponsePBImpl extends
private synchronized void mergeLocalToBuilder() {
if (this.nodesForScheduling != null) {
builder.clearNodesForScheduling();
- Iterable<YarnProtos.NodeIdProto> iterable = getNodeIdProtoIterable(
- this.nodesForScheduling);
+ Iterable<YarnServerCommonServiceProtos.RemoteNodeProto> iterable =
+ getNodeIdProtoIterable(this.nodesForScheduling);
builder.addAllNodesForScheduling(iterable);
}
if (this.allocateResponse != null) {
@@ -123,7 +124,7 @@ public class DistributedSchedulingAllocateResponsePBImpl extends
}
@Override
- public void setNodesForScheduling(List<NodeId> nodesForScheduling) {
+ public void setNodesForScheduling(List<RemoteNode> nodesForScheduling) {
maybeInitBuilder();
if (nodesForScheduling == null || nodesForScheduling.isEmpty()) {
if (this.nodesForScheduling != null) {
@@ -137,7 +138,7 @@ public class DistributedSchedulingAllocateResponsePBImpl extends
}
@Override
- public List<NodeId> getNodesForScheduling() {
+ public List<RemoteNode> getNodesForScheduling() {
if (nodesForScheduling != null) {
return nodesForScheduling;
}
@@ -149,24 +150,25 @@ public class DistributedSchedulingAllocateResponsePBImpl extends
YarnServerCommonServiceProtos.
DistributedSchedulingAllocateResponseProtoOrBuilder p =
viaProto ? proto : builder;
- List<YarnProtos.NodeIdProto> list = p.getNodesForSchedulingList();
+ List<YarnServerCommonServiceProtos.RemoteNodeProto> list =
+ p.getNodesForSchedulingList();
nodesForScheduling = new ArrayList<>();
if (list != null) {
- for (YarnProtos.NodeIdProto t : list) {
- nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t));
+ for (YarnServerCommonServiceProtos.RemoteNodeProto t : list) {
+ nodesForScheduling.add(new RemoteNodePBImpl(t));
}
}
}
- private synchronized Iterable<YarnProtos.NodeIdProto> getNodeIdProtoIterable(
- final List<NodeId> nodeList) {
+ private synchronized Iterable<RemoteNodeProto> getNodeIdProtoIterable(
+ final List<RemoteNode> nodeList) {
maybeInitBuilder();
- return new Iterable<YarnProtos.NodeIdProto>() {
+ return new Iterable<RemoteNodeProto>() {
@Override
- public synchronized Iterator<YarnProtos.NodeIdProto> iterator() {
- return new Iterator<YarnProtos.NodeIdProto>() {
+ public synchronized Iterator<RemoteNodeProto> iterator() {
+ return new Iterator<RemoteNodeProto>() {
- Iterator<NodeId> iter = nodeList.iterator();
+ Iterator<RemoteNode> iter = nodeList.iterator();
@Override
public boolean hasNext() {
@@ -174,8 +176,8 @@ public class DistributedSchedulingAllocateResponsePBImpl extends
}
@Override
- public YarnProtos.NodeIdProto next() {
- return ProtoUtils.convertToProtoFormat(iter.next());
+ public RemoteNodeProto next() {
+ return ((RemoteNodePBImpl)iter.next()).getProto();
}
@Override
@@ -186,5 +188,4 @@ public class DistributedSchedulingAllocateResponsePBImpl extends
}
};
}
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterDistributedSchedulingAMResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterDistributedSchedulingAMResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterDistributedSchedulingAMResponsePBImpl.java
index 4aaf99c..41b2a4f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterDistributedSchedulingAMResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterDistributedSchedulingAMResponsePBImpl.java
@@ -23,13 +23,15 @@ import com.google.protobuf.TextFormat;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
-import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RemoteNodeProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
+
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
+
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -52,7 +54,7 @@ public class RegisterDistributedSchedulingAMResponsePBImpl extends
private Resource maxContainerResource;
private Resource minContainerResource;
private Resource incrContainerResource;
- private List<NodeId> nodesForScheduling;
+ private List<RemoteNode> nodesForScheduling;
private RegisterApplicationMasterResponse registerApplicationMasterResponse;
public RegisterDistributedSchedulingAMResponsePBImpl() {
@@ -95,8 +97,8 @@ public class RegisterDistributedSchedulingAMResponsePBImpl extends
private synchronized void mergeLocalToBuilder() {
if (this.nodesForScheduling != null) {
builder.clearNodesForScheduling();
- Iterable<YarnProtos.NodeIdProto> iterable = getNodeIdProtoIterable(
- this.nodesForScheduling);
+ Iterable<YarnServerCommonServiceProtos.RemoteNodeProto> iterable =
+ getNodeIdProtoIterable(this.nodesForScheduling);
builder.addAllNodesForScheduling(iterable);
}
if (this.maxContainerResource != null) {
@@ -261,7 +263,7 @@ public class RegisterDistributedSchedulingAMResponsePBImpl extends
}
@Override
- public void setNodesForScheduling(List<NodeId> nodesForScheduling) {
+ public void setNodesForScheduling(List<RemoteNode> nodesForScheduling) {
maybeInitBuilder();
if (nodesForScheduling == null || nodesForScheduling.isEmpty()) {
if (this.nodesForScheduling != null) {
@@ -275,7 +277,7 @@ public class RegisterDistributedSchedulingAMResponsePBImpl extends
}
@Override
- public List<NodeId> getNodesForScheduling() {
+ public List<RemoteNode> getNodesForScheduling() {
if (nodesForScheduling != null) {
return nodesForScheduling;
}
@@ -287,24 +289,25 @@ public class RegisterDistributedSchedulingAMResponsePBImpl extends
YarnServerCommonServiceProtos.
RegisterDistributedSchedulingAMResponseProtoOrBuilder p =
viaProto ? proto : builder;
- List<YarnProtos.NodeIdProto> list = p.getNodesForSchedulingList();
+ List<YarnServerCommonServiceProtos.RemoteNodeProto> list =
+ p.getNodesForSchedulingList();
nodesForScheduling = new ArrayList<>();
if (list != null) {
- for (YarnProtos.NodeIdProto t : list) {
- nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t));
+ for (YarnServerCommonServiceProtos.RemoteNodeProto t : list) {
+ nodesForScheduling.add(new RemoteNodePBImpl(t));
}
}
}
- private synchronized Iterable<YarnProtos.NodeIdProto> getNodeIdProtoIterable(
- final List<NodeId> nodeList) {
+ private synchronized Iterable<RemoteNodeProto> getNodeIdProtoIterable(
+ final List<RemoteNode> nodeList) {
maybeInitBuilder();
- return new Iterable<YarnProtos.NodeIdProto>() {
+ return new Iterable<RemoteNodeProto>() {
@Override
- public synchronized Iterator<YarnProtos.NodeIdProto> iterator() {
- return new Iterator<YarnProtos.NodeIdProto>() {
+ public synchronized Iterator<RemoteNodeProto> iterator() {
+ return new Iterator<RemoteNodeProto>() {
- Iterator<NodeId> iter = nodeList.iterator();
+ Iterator<RemoteNode> iter = nodeList.iterator();
@Override
public boolean hasNext() {
@@ -312,8 +315,8 @@ public class RegisterDistributedSchedulingAMResponsePBImpl extends
}
@Override
- public YarnProtos.NodeIdProto next() {
- return ProtoUtils.convertToProtoFormat(iter.next());
+ public RemoteNodeProto next() {
+ return ((RemoteNodePBImpl)iter.next()).getProto();
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java
new file mode 100644
index 0000000..3e4fd4a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RemoteNodeProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RemoteNodeProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
+
+/**
+ * Implementation of {@link RemoteNode}.
+ */
+public class RemoteNodePBImpl extends RemoteNode {
+
+ private RemoteNodeProto proto = RemoteNodeProto.getDefaultInstance();
+ private RemoteNodeProto.Builder builder = null;
+ private boolean viaProto = false;
+
+ private NodeId nodeId = null;
+
+ public RemoteNodePBImpl() {
+ builder = RemoteNodeProto.newBuilder();
+ }
+
+ public RemoteNodePBImpl(RemoteNodeProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public RemoteNodeProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.nodeId != null
+ && !((NodeIdPBImpl) nodeId).getProto().equals(
+ builder.getNodeId())) {
+ builder.setNodeId(ProtoUtils.convertToProtoFormat(this.nodeId));
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto) {
+ maybeInitBuilder();
+ }
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = RemoteNodeProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public NodeId getNodeId() {
+ RemoteNodeProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.nodeId != null) {
+ return this.nodeId;
+ }
+ if (!p.hasNodeId()) {
+ return null;
+ }
+ this.nodeId = ProtoUtils.convertFromProtoFormat(p.getNodeId());
+ return this.nodeId;
+ }
+
+ @Override
+ public void setNodeId(NodeId nodeId) {
+ maybeInitBuilder();
+ if (nodeId == null) {
+ builder.clearNodeId();
+ }
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public String getHttpAddress() {
+ RemoteNodeProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasHttpAddress()) {
+ return null;
+ }
+ return (p.getHttpAddress());
+ }
+
+ @Override
+ public void setHttpAddress(String httpAddress) {
+ maybeInitBuilder();
+ if (httpAddress == null) {
+ builder.clearHttpAddress();
+ return;
+ }
+ builder.setHttpAddress(httpAddress);
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
index 9c158e9..4410db1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ContainerType;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
@@ -174,17 +175,14 @@ public class OpportunisticContainerAllocator {
new DominantResourceCalculator();
private final BaseContainerTokenSecretManager tokenSecretManager;
- private int webpagePort;
/**
* Create a new Opportunistic Container Allocator.
* @param tokenSecretManager TokenSecretManager
- * @param webpagePort Webpage Port
*/
public OpportunisticContainerAllocator(
- BaseContainerTokenSecretManager tokenSecretManager, int webpagePort) {
+ BaseContainerTokenSecretManager tokenSecretManager) {
this.tokenSecretManager = tokenSecretManager;
- this.webpagePort = webpagePort;
}
/**
@@ -271,15 +269,15 @@ public class OpportunisticContainerAllocator {
private void allocateContainersInternal(long rmIdentifier,
AllocationParams appParams, ContainerIdGenerator idCounter,
Set<String> blacklist, ApplicationAttemptId id,
- Map<String, NodeId> allNodes, String userName,
+ Map<String, RemoteNode> allNodes, String userName,
Map<Resource, List<Container>> containers, ResourceRequest anyAsk)
throws YarnException {
int toAllocate = anyAsk.getNumContainers()
- (containers.isEmpty() ? 0 :
containers.get(anyAsk.getCapability()).size());
- List<NodeId> nodesForScheduling = new ArrayList<>();
- for (Entry<String, NodeId> nodeEntry : allNodes.entrySet()) {
+ List<RemoteNode> nodesForScheduling = new ArrayList<>();
+ for (Entry<String, RemoteNode> nodeEntry : allNodes.entrySet()) {
// Do not use blacklisted nodes for scheduling.
if (blacklist.contains(nodeEntry.getKey())) {
continue;
@@ -295,9 +293,9 @@ public class OpportunisticContainerAllocator {
for (int numCont = 0; numCont < toAllocate; numCont++) {
nextNodeToSchedule++;
nextNodeToSchedule %= nodesForScheduling.size();
- NodeId nodeId = nodesForScheduling.get(nextNodeToSchedule);
+ RemoteNode node = nodesForScheduling.get(nextNodeToSchedule);
Container container = buildContainer(rmIdentifier, appParams, idCounter,
- anyAsk, id, userName, nodeId);
+ anyAsk, id, userName, node);
List<Container> cList = containers.get(anyAsk.getCapability());
if (cList == null) {
cList = new ArrayList<>();
@@ -313,7 +311,7 @@ public class OpportunisticContainerAllocator {
private Container buildContainer(long rmIdentifier,
AllocationParams appParams, ContainerIdGenerator idCounter,
ResourceRequest rr, ApplicationAttemptId id, String userName,
- NodeId nodeId) throws YarnException {
+ RemoteNode node) throws YarnException {
ContainerId cId =
ContainerId.newContainerId(id, idCounter.generateContainerId());
@@ -324,7 +322,7 @@ public class OpportunisticContainerAllocator {
long currTime = System.currentTimeMillis();
ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier(
- cId, 0, nodeId.getHost() + ":" + nodeId.getPort(), userName,
+ cId, 0, node.getNodeId().toString(), userName,
capability, currTime + appParams.containerTokenExpiryInterval,
tokenSecretManager.getCurrentKey().getKeyId(), rmIdentifier,
rr.getPriority(), currTime,
@@ -332,10 +330,10 @@ public class OpportunisticContainerAllocator {
ExecutionType.OPPORTUNISTIC);
byte[] pwd =
tokenSecretManager.createPassword(containerTokenIdentifier);
- Token containerToken = newContainerToken(nodeId, pwd,
+ Token containerToken = newContainerToken(node.getNodeId(), pwd,
containerTokenIdentifier);
Container container = BuilderUtils.newContainer(
- cId, nodeId, nodeId.getHost() + ":" + webpagePort,
+ cId, node.getNodeId(), node.getHttpAddress(),
capability, rr.getPriority(), containerToken,
containerTokenIdentifier.getExecutionType(),
rr.getAllocationRequestId());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
index 6fcddf8..725e2d9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
@@ -23,10 +23,10 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,8 +60,8 @@ public class OpportunisticContainerContext {
private ContainerIdGenerator containerIdGenerator =
new ContainerIdGenerator();
- private volatile List<NodeId> nodeList = new LinkedList<>();
- private final Map<String, NodeId> nodeMap = new LinkedHashMap<>();
+ private volatile List<RemoteNode> nodeList = new LinkedList<>();
+ private final Map<String, RemoteNode> nodeMap = new LinkedHashMap<>();
private final Set<String> blacklist = new HashSet<>();
@@ -89,11 +89,11 @@ public class OpportunisticContainerContext {
this.containerIdGenerator = containerIdGenerator;
}
- public Map<String, NodeId> getNodeMap() {
+ public Map<String, RemoteNode> getNodeMap() {
return Collections.unmodifiableMap(nodeMap);
}
- public synchronized void updateNodeList(List<NodeId> newNodeList) {
+ public synchronized void updateNodeList(List<RemoteNode> newNodeList) {
// This is an optimization for centralized placement. The
// OppContainerAllocatorAMService has a cached list of nodes which it sets
// here. The nodeMap needs to be updated only if the backing node list is
@@ -101,8 +101,8 @@ public class OpportunisticContainerContext {
if (newNodeList != nodeList) {
nodeList = newNodeList;
nodeMap.clear();
- for (NodeId n : nodeList) {
- nodeMap.put(n.getHost(), n);
+ for (RemoteNode n : nodeList) {
+ nodeMap.put(n.getNodeId().getHost(), n);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index d485e6b..4350fc5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -26,6 +26,11 @@ import "yarn_protos.proto";
import "yarn_server_common_protos.proto";
import "yarn_service_protos.proto";
+message RemoteNodeProto {
+ optional NodeIdProto node_id = 1;
+ optional string http_address = 2;
+}
+
message RegisterDistributedSchedulingAMResponseProto {
optional RegisterApplicationMasterResponseProto register_response = 1;
optional ResourceProto max_container_resource = 2;
@@ -33,12 +38,12 @@ message RegisterDistributedSchedulingAMResponseProto {
optional ResourceProto incr_container_resource = 4;
optional int32 container_token_expiry_interval = 5;
optional int64 container_id_start = 6;
- repeated NodeIdProto nodes_for_scheduling = 7;
+ repeated RemoteNodeProto nodes_for_scheduling = 7;
}
message DistributedSchedulingAllocateResponseProto {
optional AllocateResponseProto allocate_response = 1;
- repeated NodeIdProto nodes_for_scheduling = 2;
+ repeated RemoteNodeProto nodes_for_scheduling = 2;
}
message DistributedSchedulingAllocateRequestProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 5424464..0f0a081 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -374,7 +374,7 @@ public class NodeManager extends CompositeService
((NMContext) context).setQueueableContainerAllocator(
new OpportunisticContainerAllocator(
- context.getContainerTokenSecretManager(), webServer.getPort()));
+ context.getContainerTokenSecretManager()));
dispatcher.register(ContainerManagerEventType.class, containerManager);
dispatcher.register(NodeManagerEventType.class, this);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
index 8a40337..a12d16a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyApplicationContext;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
@@ -198,7 +199,7 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
setNodeList(registerResponse.getNodesForScheduling());
}
- private void setNodeList(List<NodeId> nodeList) {
+ private void setNodeList(List<RemoteNode> nodeList) {
oppContainerContext.updateNodeList(nodeList);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
index 8f1ae7f..736dc31 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
@@ -74,7 +75,8 @@ public class TestDistributedScheduler {
RequestInterceptor finalReqIntcptr = setup(conf, distributedScheduler);
registerAM(distributedScheduler, finalReqIntcptr, Arrays.asList(
- NodeId.newInstance("a", 1), NodeId.newInstance("b", 2)));
+ RemoteNode.newInstance(NodeId.newInstance("a", 1), "http://a:1"),
+ RemoteNode.newInstance(NodeId.newInstance("b", 2), "http://b:2")));
final AtomicBoolean flipFlag = new AtomicBoolean(true);
Mockito.when(
@@ -87,10 +89,16 @@ public class TestDistributedScheduler {
flipFlag.set(!flipFlag.get());
if (flipFlag.get()) {
return createAllocateResponse(Arrays.asList(
- NodeId.newInstance("c", 3), NodeId.newInstance("d", 4)));
+ RemoteNode.newInstance(
+ NodeId.newInstance("c", 3), "http://c:3"),
+ RemoteNode.newInstance(
+ NodeId.newInstance("d", 4), "http://d:4")));
} else {
return createAllocateResponse(Arrays.asList(
- NodeId.newInstance("d", 4), NodeId.newInstance("c", 3)));
+ RemoteNode.newInstance(
+ NodeId.newInstance("d", 4), "http://d:4"),
+ RemoteNode.newInstance(
+ NodeId.newInstance("c", 3), "http://c:3")));
}
}
});
@@ -164,7 +172,7 @@ public class TestDistributedScheduler {
}
private void registerAM(DistributedScheduler distributedScheduler,
- RequestInterceptor finalReqIntcptr, List<NodeId> nodeList)
+ RequestInterceptor finalReqIntcptr, List<RemoteNode> nodeList)
throws Exception {
RegisterDistributedSchedulingAMResponse distSchedRegisterResponse =
Records.newRecord(RegisterDistributedSchedulingAMResponse.class);
@@ -208,7 +216,7 @@ public class TestDistributedScheduler {
};
nmContainerTokenSecretManager.setMasterKey(mKey);
OpportunisticContainerAllocator containerAllocator =
- new OpportunisticContainerAllocator(nmContainerTokenSecretManager, 77);
+ new OpportunisticContainerAllocator(nmContainerTokenSecretManager);
NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
new NMTokenSecretManagerInNM();
@@ -236,7 +244,7 @@ public class TestDistributedScheduler {
}
private DistributedSchedulingAllocateResponse createAllocateResponse(
- List<NodeId> nodes) {
+ List<RemoteNode> nodes) {
DistributedSchedulingAllocateResponse distSchedAllocateResponse =
Records.newRecord(DistributedSchedulingAllocateResponse.class);
distSchedAllocateResponse
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
index a7c0a50..815d29d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
@@ -49,6 +49,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
+
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@@ -74,6 +76,7 @@ import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -97,7 +100,7 @@ public class OpportunisticContainerAllocatorAMService
private final int k;
private final long cacheRefreshInterval;
- private List<NodeId> cachedNodeIds;
+ private List<RemoteNode> cachedNodes;
private long lastCacheUpdateTime;
public OpportunisticContainerAllocatorAMService(RMContext rmContext,
@@ -105,7 +108,7 @@ public class OpportunisticContainerAllocatorAMService
super(OpportunisticContainerAllocatorAMService.class.getName(),
rmContext, scheduler);
this.oppContainerAllocator = new OpportunisticContainerAllocator(
- rmContext.getContainerTokenSecretManager(), 0);
+ rmContext.getContainerTokenSecretManager());
this.k = rmContext.getYarnConfiguration().getInt(
YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED,
YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED_DEFAULT);
@@ -372,14 +375,29 @@ public class OpportunisticContainerAllocatorAMService
);
}
- private synchronized List<NodeId> getLeastLoadedNodes() {
+ private synchronized List<RemoteNode> getLeastLoadedNodes() {
long currTime = System.currentTimeMillis();
if ((currTime - lastCacheUpdateTime > cacheRefreshInterval)
- || cachedNodeIds == null) {
- cachedNodeIds = this.nodeMonitor.selectLeastLoadedNodes(this.k);
+ || cachedNodes == null) {
+ cachedNodes = convertToRemoteNodes(
+ this.nodeMonitor.selectLeastLoadedNodes(this.k));
lastCacheUpdateTime = currTime;
}
- return cachedNodeIds;
+ return cachedNodes;
+ }
+
+ private List<RemoteNode> convertToRemoteNodes(List<NodeId> nodeIds) {
+ ArrayList<RemoteNode> retNodes = new ArrayList<>();
+ for (NodeId nId : nodeIds) {
+ retNodes.add(convertToRemoteNode(nId));
+ }
+ return retNodes;
+ }
+
+ private RemoteNode convertToRemoteNode(NodeId nodeId) {
+ return RemoteNode.newInstance(nodeId,
+ ((AbstractYarnScheduler)rmContext.getScheduler()).getNode(nodeId)
+ .getHttpAddress());
}
private Resource createMaxContainerResource() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
index 207f5ba..3154d26 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
@@ -190,7 +191,7 @@ public class TestOpportunisticContainerAllocatorAMService {
dsProxy.allocateForDistributedScheduling(null,
distAllReq.getProto()));
Assert.assertEquals(
- "h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
+ "h1", dsAllocResp.getNodesForScheduling().get(0).getNodeId().getHost());
FinishApplicationMasterResponse dsfinishResp =
new FinishApplicationMasterResponsePBImpl(
@@ -269,7 +270,8 @@ public class TestOpportunisticContainerAllocatorAMService {
DistributedSchedulingAllocateResponse resp = factory
.newRecordInstance(DistributedSchedulingAllocateResponse.class);
resp.setNodesForScheduling(
- Arrays.asList(NodeId.newInstance("h1", 1234)));
+ Arrays.asList(RemoteNode.newInstance(
+ NodeId.newInstance("h1", 1234), "http://h1:4321")));
return resp;
}
};
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org