You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2018/01/31 15:57:36 UTC
[12/32] hadoop git commit: YARN-7669. API and interface modifications
for placement constraint processor. (asuresh)
YARN-7669. API and interface modifications for placement constraint processor. (asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/06eb63e6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/06eb63e6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/06eb63e6
Branch: refs/heads/trunk
Commit: 06eb63e64b05e2e8bb8a76c15360ab0495f11317
Parents: 88d8d3f
Author: Arun Suresh <as...@apache.org>
Authored: Tue Dec 19 22:47:46 2017 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Wed Jan 31 01:30:17 2018 -0800
----------------------------------------------------------------------
.../yarn/ams/ApplicationMasterServiceUtils.java | 16 +
.../api/protocolrecords/AllocateResponse.java | 23 +
.../api/records/RejectedSchedulingRequest.java | 70 +++
.../yarn/api/records/RejectionReason.java | 44 ++
.../src/main/proto/yarn_protos.proto | 10 +
.../src/main/proto/yarn_service_protos.proto | 1 +
.../impl/pb/AllocateResponsePBImpl.java | 85 ++++
.../yarn/api/records/impl/pb/ProtoUtils.java | 16 +
.../pb/RejectedSchedulingRequestPBImpl.java | 148 +++++++
.../records/impl/pb/ResourceSizingPBImpl.java | 8 +
.../impl/pb/SchedulingRequestPBImpl.java | 11 +
.../hadoop/yarn/api/TestPBImplRecords.java | 2 +
.../resourcemanager/RMActiveServiceContext.java | 2 +-
.../yarn/server/resourcemanager/RMContext.java | 2 +-
.../server/resourcemanager/RMContextImpl.java | 2 +-
.../server/resourcemanager/ResourceManager.java | 2 +-
.../constraint/AllocationTagsManager.java | 431 -------------------
.../constraint/AllocationTagsNamespaces.java | 31 --
.../InvalidAllocationTagsQueryException.java | 35 --
.../constraint/AllocationTagsManager.java | 431 +++++++++++++++++++
.../constraint/AllocationTagsNamespaces.java | 31 ++
.../InvalidAllocationTagsQueryException.java | 35 ++
.../api/ConstraintPlacementAlgorithm.java | 43 ++
.../api/ConstraintPlacementAlgorithmInput.java | 32 ++
.../api/ConstraintPlacementAlgorithmOutput.java | 58 +++
...traintPlacementAlgorithmOutputCollector.java | 32 ++
.../constraint/api/PlacedSchedulingRequest.java | 79 ++++
.../constraint/api/SchedulingResponse.java | 70 +++
.../scheduler/constraint/api/package-info.java | 28 ++
.../constraint/TestAllocationTagsManager.java | 328 --------------
.../rmcontainer/TestRMContainerImpl.java | 2 +-
.../scheduler/capacity/TestUtils.java | 2 +-
.../constraint/TestAllocationTagsManager.java | 328 ++++++++++++++
.../scheduler/fifo/TestFifoScheduler.java | 2 +-
34 files changed, 1608 insertions(+), 832 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06eb63e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceUtils.java
index 476da8b..8bdfaf3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceUtils.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.ams;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
+import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
@@ -86,4 +87,19 @@ public final class ApplicationMasterServiceUtils {
}
allocateResponse.setAllocatedContainers(allocatedContainers);
}
+
+ /**
+ * Add rejected Scheduling Requests to {@link AllocateResponse}.
+ * @param allocateResponse Allocate Response.
+ * @param rejectedRequests Rejected SchedulingRequests.
+ */
+ public static void addToRejectedSchedulingRequests(
+ AllocateResponse allocateResponse,
+ List<RejectedSchedulingRequest> rejectedRequests) {
+ if (allocateResponse.getRejectedSchedulingRequests() != null
+ && !allocateResponse.getRejectedSchedulingRequests().isEmpty()) {
+ rejectedRequests.addAll(allocateResponse.getRejectedSchedulingRequests());
+ }
+ allocateResponse.setRejectedSchedulingRequests(rejectedRequests);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06eb63e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
index 655c6dc..52c30e2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.api.protocolrecords;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -35,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
@@ -410,6 +412,27 @@ public abstract class AllocateResponse {
public abstract void setContainersFromPreviousAttempts(
List<Container> containersFromPreviousAttempt);
+ /**
+ * Get a list of all SchedulingRequests that the RM has rejected between
+ * this allocate call and the previous one.
+ * @return List of RejectedSchedulingRequests.
+ */
+ @Public
+ @Unstable
+ public List<RejectedSchedulingRequest> getRejectedSchedulingRequests() {
+ return Collections.EMPTY_LIST;
+ }
+
+ /**
+ * Add a list of rejected SchedulingRequests to the AllocateResponse.
+ * @param rejectedRequests List of Rejected Scheduling Requests.
+ */
+ @Private
+ @Unstable
+ public void setRejectedSchedulingRequests(
+ List<RejectedSchedulingRequest> rejectedRequests) {
+ }
+
@Private
@Unstable
public static AllocateResponseBuilder newBuilder() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06eb63e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/RejectedSchedulingRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/RejectedSchedulingRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/RejectedSchedulingRequest.java
new file mode 100644
index 0000000..6e2d95b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/RejectedSchedulingRequest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * This encapsulates a Rejected SchedulingRequest. It contains the offending
+ * Scheduling Request along with the reason for rejection.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public abstract class RejectedSchedulingRequest {
+
+ /**
+ * Create new RejectedSchedulingRequest.
+ * @param reason Rejection Reason.
+ * @param request Rejected Scheduling Request.
+ * @return RejectedSchedulingRequest.
+ */
+ public static RejectedSchedulingRequest newInstance(RejectionReason reason,
+ SchedulingRequest request) {
+ RejectedSchedulingRequest instance =
+ Records.newRecord(RejectedSchedulingRequest.class);
+ instance.setReason(reason);
+ instance.setRequest(request);
+ return instance;
+ }
+
+ /**
+ * Get Rejection Reason.
+ * @return Rejection reason.
+ */
+ public abstract RejectionReason getReason();
+
+ /**
+ * Set Rejection Reason.
+ * @param reason Rejection Reason.
+ */
+ public abstract void setReason(RejectionReason reason);
+
+ /**
+ * Get the Rejected Scheduling Request.
+ * @return SchedulingRequest.
+ */
+ public abstract SchedulingRequest getRequest();
+
+ /**
+ * Set the SchedulingRequest.
+ * @param request SchedulingRequest.
+ */
+ public abstract void setRequest(SchedulingRequest request);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06eb63e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/RejectionReason.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/RejectionReason.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/RejectionReason.java
new file mode 100644
index 0000000..afbc2ed
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/RejectionReason.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Reason for rejecting a Scheduling Request.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public enum RejectionReason {
+ /**
+ * This is used to indicate a possible constraint violation. For eg. If the
+ * App requested anti-affinity across 5 container requests, but only 4 nodes
+ * exist. Another eg. could be if tag A has affinity with tag B and tag B has
+ * affinity with tag C, but tag A has anti-affinity with tag C, all at a rack
+ * scope - and only 1 rack exists. Essentially all situations where the
+ * Algorithm cannot assign a Node to SchedulingRequest.
+ */
+ COULD_NOT_PLACE_ON_NODE,
+ /**
+ * This is used to indicate when after the Algorithm has placed a Scheduling
+ * Request at a node, but the commit failed because the Queue has no
+ * capacity etc. This can be a transient situation.
+ */
+ COULD_NOT_SCHEDULE_ON_NODE
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06eb63e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index fdc39a7..5cb1177 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -424,6 +424,16 @@ enum AMCommandProto {
AM_SHUTDOWN = 2;
}
+enum RejectionReasonProto {
+ RRP_COULD_NOT_PLACE_ON_NODE = 1;
+ RRP_COULD_NOT_SCHEDULE_ON_NODE = 2;
+}
+
+message RejectedSchedulingRequestProto {
+ required RejectionReasonProto reason = 1;
+ required SchedulingRequestProto request = 2;
+}
+
message PreemptionMessageProto {
optional StrictPreemptionContractProto strictContract = 1;
optional PreemptionContractProto contract = 2;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06eb63e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index e49c4e3..92a65ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -120,6 +120,7 @@ message AllocateResponseProto {
repeated UpdateContainerErrorProto update_errors = 15;
repeated UpdatedContainerProto updated_containers = 16;
repeated ContainerProto containers_from_previous_attempts = 17;
+ repeated RejectedSchedulingRequestProto rejected_scheduling_requests = 18;
}
enum SchedulerResourceTypes {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06eb63e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
index 5ca1e73..3ab5563 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
@@ -47,9 +48,11 @@ import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
+import org.apache.hadoop.yarn.api.records.impl.pb.RejectedSchedulingRequestPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.UpdatedContainerPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnProtos.CollectorInfoProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
@@ -81,6 +84,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
private List<NodeReport> updatedNodes = null;
private List<UpdateContainerError> updateErrors = null;
+ private List<RejectedSchedulingRequest> rejectedRequests = null;
private PreemptionMessage preempt;
private Token amrmToken = null;
private Priority appPriority = null;
@@ -140,6 +144,13 @@ public class AllocateResponsePBImpl extends AllocateResponse {
getContainerStatusProtoIterable(this.completedContainersStatuses);
builder.addAllCompletedContainerStatuses(iterable);
}
+ if (this.rejectedRequests != null) {
+ builder.clearRejectedSchedulingRequests();
+ Iterable<YarnProtos.RejectedSchedulingRequestProto> iterable =
+ getRejectedSchedulingRequestsProtoIterable(
+ this.rejectedRequests);
+ builder.addAllRejectedSchedulingRequests(iterable);
+ }
if (this.updatedNodes != null) {
builder.clearUpdatedNodes();
Iterable<NodeReportProto> iterable =
@@ -471,6 +482,24 @@ public class AllocateResponsePBImpl extends AllocateResponse {
containersFromPreviousAttempts.addAll(containers);
}
+ @Override
+ public synchronized List<RejectedSchedulingRequest>
+ getRejectedSchedulingRequests() {
+ initRejectedRequestsList();
+ return this.rejectedRequests;
+ }
+
+ @Override
+ public synchronized void setRejectedSchedulingRequests(
+ List<RejectedSchedulingRequest> rejectedReqs) {
+ if (rejectedReqs == null) {
+ return;
+ }
+ initRejectedRequestsList();
+ this.rejectedRequests.clear();
+ this.rejectedRequests.addAll(rejectedReqs);
+ }
+
private synchronized void initLocalUpdatedContainerList() {
if (this.updatedContainers != null) {
return;
@@ -528,6 +557,20 @@ public class AllocateResponsePBImpl extends AllocateResponse {
}
}
+ private synchronized void initRejectedRequestsList() {
+ if (this.rejectedRequests != null) {
+ return;
+ }
+ AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
+ List<YarnProtos.RejectedSchedulingRequestProto> list =
+ p.getRejectedSchedulingRequestsList();
+ rejectedRequests = new ArrayList<>();
+
+ for (YarnProtos.RejectedSchedulingRequestProto c : list) {
+ rejectedRequests.add(convertFromProtoFormat(c));
+ }
+ }
+
private synchronized void initLocalNewNMTokenList() {
if (nmTokens != null) {
return;
@@ -712,6 +755,38 @@ public class AllocateResponsePBImpl extends AllocateResponse {
}
};
}
+
+ private synchronized Iterable<YarnProtos.RejectedSchedulingRequestProto>
+ getRejectedSchedulingRequestsProtoIterable(
+ final List<RejectedSchedulingRequest> rejectedReqsList) {
+ maybeInitBuilder();
+ return new Iterable<YarnProtos.RejectedSchedulingRequestProto>() {
+ @Override
+ public Iterator<YarnProtos.RejectedSchedulingRequestProto> iterator() {
+ return new Iterator<YarnProtos.RejectedSchedulingRequestProto>() {
+
+ private Iterator<RejectedSchedulingRequest> iter =
+ rejectedReqsList.iterator();
+
+ @Override
+ public synchronized boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public synchronized YarnProtos.RejectedSchedulingRequestProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public synchronized void remove() {
+ throw new UnsupportedOperationException();
+
+ }
+ };
+ }
+ };
+ }
private synchronized Iterable<NodeReportProto>
getNodeReportProtoIterable(
@@ -808,6 +883,16 @@ public class AllocateResponsePBImpl extends AllocateResponse {
return ((ContainerStatusPBImpl)t).getProto();
}
+ private synchronized RejectedSchedulingRequestPBImpl convertFromProtoFormat(
+ YarnProtos.RejectedSchedulingRequestProto p) {
+ return new RejectedSchedulingRequestPBImpl(p);
+ }
+
+ private synchronized YarnProtos.RejectedSchedulingRequestProto
+ convertToProtoFormat(RejectedSchedulingRequest t) {
+ return ((RejectedSchedulingRequestPBImpl)t).getProto();
+ }
+
private synchronized ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
return new ResourcePBImpl(p);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06eb63e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
index 168d864..76e86ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.api.records.RejectionReason;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
@@ -233,6 +234,21 @@ public class ProtoUtils {
}
/*
+ * RejectionReason
+ */
+ private static final String REJECTION_REASON_PREFIX = "RRP_";
+ public static YarnProtos.RejectionReasonProto convertToProtoFormat(
+ RejectionReason e) {
+ return YarnProtos.RejectionReasonProto
+ .valueOf(REJECTION_REASON_PREFIX + e.name());
+ }
+ public static RejectionReason convertFromProtoFormat(
+ YarnProtos.RejectionReasonProto e) {
+ return RejectionReason.valueOf(e.name()
+ .replace(REJECTION_REASON_PREFIX, ""));
+ }
+
+ /*
* ByteBuffer
*/
public static ByteBuffer convertFromProtoFormat(ByteString byteString) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06eb63e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/RejectedSchedulingRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/RejectedSchedulingRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/RejectedSchedulingRequestPBImpl.java
new file mode 100644
index 0000000..ed78551
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/RejectedSchedulingRequestPBImpl.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+import com.google.protobuf.TextFormat;
+import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
+import org.apache.hadoop.yarn.api.records.RejectionReason;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+
+/**
+ * Implementation of RejectedSchedulingRequest.
+ */
+public class RejectedSchedulingRequestPBImpl extends RejectedSchedulingRequest {
+
+ private YarnProtos.RejectedSchedulingRequestProto proto =
+ YarnProtos.RejectedSchedulingRequestProto.getDefaultInstance();
+ private YarnProtos.RejectedSchedulingRequestProto.Builder builder = null;
+ private boolean viaProto = false;
+ private SchedulingRequest request;
+
+ public RejectedSchedulingRequestPBImpl() {
+ builder = YarnProtos.RejectedSchedulingRequestProto.newBuilder();
+ }
+
+ public RejectedSchedulingRequestPBImpl(
+ YarnProtos.RejectedSchedulingRequestProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public synchronized YarnProtos.RejectedSchedulingRequestProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ private synchronized void mergeLocalToProto() {
+ if (viaProto) {
+ maybeInitBuilder();
+ }
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private synchronized void mergeLocalToBuilder() {
+ if (this.request != null) {
+ builder.setRequest(convertToProtoFormat(this.request));
+ }
+ }
+ private synchronized void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = YarnProtos.RejectedSchedulingRequestProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public synchronized RejectionReason getReason() {
+ YarnProtos.RejectedSchedulingRequestProtoOrBuilder p =
+ viaProto ? proto : builder;
+ if (!p.hasReason()) {
+ return null;
+ }
+ return ProtoUtils.convertFromProtoFormat(p.getReason());
+ }
+
+ @Override
+ public synchronized void setReason(RejectionReason reason) {
+ maybeInitBuilder();
+ if (reason == null) {
+ builder.clearReason();
+ return;
+ }
+ builder.setReason(ProtoUtils.convertToProtoFormat(reason));
+ }
+
+ @Override
+ public synchronized SchedulingRequest getRequest() {
+ YarnProtos.RejectedSchedulingRequestProtoOrBuilder p =
+ viaProto ? proto : builder;
+ if (this.request != null) {
+ return this.request;
+ }
+ if (!p.hasRequest()) {
+ return null;
+ }
+ this.request = convertFromProtoFormat(p.getRequest());
+ return this.request;
+ }
+
+ @Override
+ public synchronized void setRequest(SchedulingRequest req) {
+ maybeInitBuilder();
+ if (null == req) {
+ builder.clearRequest();
+ }
+ this.request = req;
+ }
+
+ private synchronized YarnProtos.SchedulingRequestProto convertToProtoFormat(
+ SchedulingRequest r) {
+ return ((SchedulingRequestPBImpl)r).getProto();
+ }
+
+ private synchronized SchedulingRequestPBImpl convertFromProtoFormat(
+ YarnProtos.SchedulingRequestProto p) {
+ return new SchedulingRequestPBImpl(p);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06eb63e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceSizingPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceSizingPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceSizingPBImpl.java
index f98e488..4054837 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceSizingPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceSizingPBImpl.java
@@ -114,4 +114,12 @@ public class ResourceSizingPBImpl extends ResourceSizing {
private ResourceProto convertToProtoFormat(Resource r) {
return ProtoUtils.convertToProtoFormat(r);
}
+
+ @Override
+ public String toString() {
+ return "ResourceSizingPBImpl{" +
+ "numAllocations=" + getNumAllocations() +
+ ", resources=" + getResources() +
+ '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06eb63e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java
index 305856a..1f86043 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java
@@ -279,4 +279,15 @@ public class SchedulingRequestPBImpl extends SchedulingRequest {
}
return false;
}
+
+ @Override
+ public String toString() {
+ return "SchedulingRequestPBImpl{" +
+ "priority=" + getPriority() +
+ ", allocationReqId=" + getAllocationRequestId() +
+ ", executionType=" + getExecutionType() +
+ ", allocationTags=" + getAllocationTags() +
+ ", resourceSizing=" + getResourceSizing() +
+ '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06eb63e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
index a0b907d..ae80910 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
@@ -138,6 +138,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueStatistics;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
import org.apache.hadoop.yarn.api.records.ReservationAllocationState;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
@@ -436,6 +437,7 @@ public class TestPBImplRecords extends BasePBImplRecordsTest {
generateByNewInstance(ResourceTypeInfo.class);
generateByNewInstance(ResourceSizing.class);
generateByNewInstance(SchedulingRequest.class);
+ generateByNewInstance(RejectedSchedulingRequest.class);
}
@Test
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06eb63e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
index 6ee3a4c..4d0c230 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
-import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
@@ -43,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetime
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06eb63e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index 62899d9..00da108 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
-import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
@@ -44,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAlloca
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06eb63e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 315fdc1..da50ef8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
-import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
@@ -50,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAlloca
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06eb63e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index da0feda..a1d3dfc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -73,7 +73,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Pu
import org.apache.hadoop.yarn.server.resourcemanager.metrics.CombinedSystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
@@ -97,6 +96,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06eb63e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsManager.java
deleted file mode 100644
index b67fab9..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsManager.java
+++ /dev/null
@@ -1,431 +0,0 @@
-/*
- * *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * /
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.constraint;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.SchedulingRequest;
-import org.apache.log4j.Logger;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.LongBinaryOperator;
-
-/**
- * Support storing maps between container-tags/applications and
- * nodes. This will be required by affinity/anti-affinity implementation and
- * cardinality.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class AllocationTagsManager {
-
- private static final Logger LOG = Logger.getLogger(
- AllocationTagsManager.class);
-
- private ReentrantReadWriteLock.ReadLock readLock;
- private ReentrantReadWriteLock.WriteLock writeLock;
-
- // Application's tags to node
- private Map<ApplicationId, NodeToCountedTags> perAppMappings =
- new HashMap<>();
-
- // Global tags to node mapping (used to fast return aggregated tags
- // cardinality across apps)
- private NodeToCountedTags globalMapping = new NodeToCountedTags();
-
- /**
- * Store node to counted tags.
- */
- @VisibleForTesting
- static class NodeToCountedTags {
- // Map<NodeId, Map<Tag, Count>>
- private Map<NodeId, Map<String, Long>> nodeToTagsWithCount =
- new HashMap<>();
-
- // protected by external locks
- private void addTagsToNode(NodeId nodeId, Set<String> tags) {
- Map<String, Long> innerMap = nodeToTagsWithCount.computeIfAbsent(nodeId,
- k -> new HashMap<>());
-
- for (String tag : tags) {
- Long count = innerMap.get(tag);
- if (count == null) {
- innerMap.put(tag, 1L);
- } else{
- innerMap.put(tag, count + 1);
- }
- }
- }
-
- // protected by external locks
- private void addTagToNode(NodeId nodeId, String tag) {
- Map<String, Long> innerMap = nodeToTagsWithCount.computeIfAbsent(nodeId,
- k -> new HashMap<>());
-
- Long count = innerMap.get(tag);
- if (count == null) {
- innerMap.put(tag, 1L);
- } else{
- innerMap.put(tag, count + 1);
- }
- }
-
- private void removeTagFromInnerMap(Map<String, Long> innerMap, String tag) {
- Long count = innerMap.get(tag);
- if (count > 1) {
- innerMap.put(tag, count - 1);
- } else {
- if (count <= 0) {
- LOG.warn(
- "Trying to remove tags from node, however the count already"
- + " becomes 0 or less, it could be a potential bug.");
- }
- innerMap.remove(tag);
- }
- }
-
- private void removeTagsFromNode(NodeId nodeId, Set<String> tags) {
- Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
- if (innerMap == null) {
- LOG.warn("Failed to find node=" + nodeId
- + " while trying to remove tags, please double check.");
- return;
- }
-
- for (String tag : tags) {
- removeTagFromInnerMap(innerMap, tag);
- }
-
- if (innerMap.isEmpty()) {
- nodeToTagsWithCount.remove(nodeId);
- }
- }
-
- private void removeTagFromNode(NodeId nodeId, String tag) {
- Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
- if (innerMap == null) {
- LOG.warn("Failed to find node=" + nodeId
- + " while trying to remove tags, please double check.");
- return;
- }
-
- removeTagFromInnerMap(innerMap, tag);
-
- if (innerMap.isEmpty()) {
- nodeToTagsWithCount.remove(nodeId);
- }
- }
-
- private long getCardinality(NodeId nodeId, String tag) {
- Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
- if (innerMap == null) {
- return 0;
- }
- Long value = innerMap.get(tag);
- return value == null ? 0 : value;
- }
-
- private long getCardinality(NodeId nodeId, Set<String> tags,
- LongBinaryOperator op) {
- Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
- if (innerMap == null) {
- return 0;
- }
-
- long returnValue = 0;
- boolean firstTag = true;
-
- if (tags != null && !tags.isEmpty()) {
- for (String tag : tags) {
- Long value = innerMap.get(tag);
- if (value == null) {
- value = 0L;
- }
-
- if (firstTag) {
- returnValue = value;
- firstTag = false;
- continue;
- }
-
- returnValue = op.applyAsLong(returnValue, value);
- }
- } else {
- // Similar to above if, but only iterate values for better performance
- for (long value : innerMap.values()) {
- // For the first value, we will not apply op
- if (firstTag) {
- returnValue = value;
- firstTag = false;
- continue;
- }
- returnValue = op.applyAsLong(returnValue, value);
- }
- }
- return returnValue;
- }
-
- private boolean isEmpty() {
- return nodeToTagsWithCount.isEmpty();
- }
-
- @VisibleForTesting
- public Map<NodeId, Map<String, Long>> getNodeToTagsWithCount() {
- return nodeToTagsWithCount;
- }
- }
-
- @VisibleForTesting
- Map<ApplicationId, NodeToCountedTags> getPerAppMappings() {
- return perAppMappings;
- }
-
- @VisibleForTesting
- NodeToCountedTags getGlobalMapping() {
- return globalMapping;
- }
-
- public AllocationTagsManager() {
- ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- readLock = lock.readLock();
- writeLock = lock.writeLock();
- }
-
- /**
- * Notify container allocated on a node.
- *
- * @param nodeId allocated node.
- * @param applicationId applicationId
- * @param containerId container id.
- * @param allocationTags allocation tags, see
- * {@link SchedulingRequest#getAllocationTags()}
- * application_id will be added to allocationTags.
- */
- public void addContainer(NodeId nodeId, ApplicationId applicationId,
- ContainerId containerId, Set<String> allocationTags) {
- String applicationIdTag =
- AllocationTagsNamespaces.APP_ID + applicationId.toString();
-
- boolean useSet = false;
- if (allocationTags != null && !allocationTags.isEmpty()) {
- // Copy before edit it.
- allocationTags = new HashSet<>(allocationTags);
- allocationTags.add(applicationIdTag);
- useSet = true;
- }
-
- writeLock.lock();
- try {
- NodeToCountedTags perAppTagsMapping = perAppMappings.computeIfAbsent(
- applicationId, k -> new NodeToCountedTags());
-
- if (useSet) {
- perAppTagsMapping.addTagsToNode(nodeId, allocationTags);
- globalMapping.addTagsToNode(nodeId, allocationTags);
- } else {
- perAppTagsMapping.addTagToNode(nodeId, applicationIdTag);
- globalMapping.addTagToNode(nodeId, applicationIdTag);
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Added container=" + containerId + " with tags=[" + StringUtils
- .join(allocationTags, ",") + "]");
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- /**
- * Notify container removed.
- *
- * @param nodeId nodeId
- * @param applicationId applicationId
- * @param containerId containerId.
- * @param allocationTags allocation tags for given container
- */
- public void removeContainer(NodeId nodeId, ApplicationId applicationId,
- ContainerId containerId, Set<String> allocationTags) {
- String applicationIdTag =
- AllocationTagsNamespaces.APP_ID + applicationId.toString();
- boolean useSet = false;
-
- if (allocationTags != null && !allocationTags.isEmpty()) {
- // Copy before edit it.
- allocationTags = new HashSet<>(allocationTags);
- allocationTags.add(applicationIdTag);
- useSet = true;
- }
-
- writeLock.lock();
- try {
- NodeToCountedTags perAppTagsMapping = perAppMappings.get(applicationId);
- if (perAppTagsMapping == null) {
- return;
- }
-
- if (useSet) {
- perAppTagsMapping.removeTagsFromNode(nodeId, allocationTags);
- globalMapping.removeTagsFromNode(nodeId, allocationTags);
- } else {
- perAppTagsMapping.removeTagFromNode(nodeId, applicationIdTag);
- globalMapping.removeTagFromNode(nodeId, applicationIdTag);
- }
-
- if (perAppTagsMapping.isEmpty()) {
- perAppMappings.remove(applicationId);
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Removed container=" + containerId + " with tags=[" + StringUtils
- .join(allocationTags, ",") + "]");
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- /**
- * Get cardinality for following conditions. External can pass-in a binary op
- * to implement customized logic. *
- * @param nodeId nodeId, required.
- * @param applicationId applicationId. When null is specified, return
- * aggregated cardinality among all nodes.
- * @param tag allocation tag, see
- * {@link SchedulingRequest#getAllocationTags()},
- * When multiple tags specified. Returns cardinality
- * depends on op. If a specified tag doesn't exist,
- * 0 will be its cardinality.
- * When null/empty tags specified, all tags
- * (of the node/app) will be considered.
- * @return cardinality of specified query on the node.
- * @throws InvalidAllocationTagsQueryException when illegal query
- * parameter specified
- */
- public long getNodeCardinality(NodeId nodeId, ApplicationId applicationId,
- String tag) throws InvalidAllocationTagsQueryException {
- readLock.lock();
-
- try {
- if (nodeId == null) {
- throw new InvalidAllocationTagsQueryException(
- "Must specify nodeId/tags/op to query cardinality");
- }
-
- NodeToCountedTags mapping;
- if (applicationId != null) {
- mapping = perAppMappings.get(applicationId);
- } else{
- mapping = globalMapping;
- }
-
- if (mapping == null) {
- return 0;
- }
-
- return mapping.getCardinality(nodeId, tag);
- } finally {
- readLock.unlock();
- }
- }
-
- /**
- * Check if given tag exists on node.
- *
- * @param nodeId nodeId, required.
- * @param applicationId applicationId. When null is specified, return
- * aggregated cardinality among all nodes.
- * @param tag allocation tag, see
- * {@link SchedulingRequest#getAllocationTags()},
- * When multiple tags specified. Returns cardinality
- * depends on op. If a specified tag doesn't exist,
- * 0 will be its cardinality.
- * When null/empty tags specified, all tags
- * (of the node/app) will be considered.
- * @return cardinality of specified query on the node.
- * @throws InvalidAllocationTagsQueryException when illegal query
- * parameter specified
- */
- public boolean allocationTagExistsOnNode(NodeId nodeId,
- ApplicationId applicationId, String tag)
- throws InvalidAllocationTagsQueryException {
- return getNodeCardinality(nodeId, applicationId, tag) > 0;
- }
-
- /**
- * Get cardinality for following conditions. External can pass-in a binary op
- * to implement customized logic.
- *
- * @param nodeId nodeId, required.
- * @param applicationId applicationId. When null is specified, return
- * aggregated cardinality among all nodes.
- * @param tags allocation tags, see
- * {@link SchedulingRequest#getAllocationTags()},
- * When multiple tags specified. Returns cardinality
- * depends on op. If a specified tag doesn't exist, 0
- * will be its cardinality. When null/empty tags
- * specified, all tags (of the node/app) will be
- * considered.
- * @param op operator. Such as Long::max, Long::sum, etc. Required.
- * This sparameter only take effect when #values >= 2.
- * @return cardinality of specified query on the node.
- * @throws InvalidAllocationTagsQueryException when illegal query
- * parameter specified
- */
- public long getNodeCardinalityByOp(NodeId nodeId, ApplicationId applicationId,
- Set<String> tags, LongBinaryOperator op)
- throws InvalidAllocationTagsQueryException {
- readLock.lock();
-
- try {
- if (nodeId == null || op == null) {
- throw new InvalidAllocationTagsQueryException(
- "Must specify nodeId/tags/op to query cardinality");
- }
-
- NodeToCountedTags mapping;
- if (applicationId != null) {
- mapping = perAppMappings.get(applicationId);
- } else{
- mapping = globalMapping;
- }
-
- if (mapping == null) {
- return 0;
- }
-
- return mapping.getCardinality(nodeId, tags, op);
- } finally {
- readLock.unlock();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06eb63e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsNamespaces.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsNamespaces.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsNamespaces.java
deleted file mode 100644
index 893ff1c..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsNamespaces.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * /
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.constraint;
-
-/**
- * Predefined namespaces for tags
- *
- * Same as namespace of resource types. Namespaces of placement tags are start
- * with alphabets and ended with "/"
- */
-public class AllocationTagsNamespaces {
- public static final String APP_ID = "yarn_app_id/";
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06eb63e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/InvalidAllocationTagsQueryException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/InvalidAllocationTagsQueryException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/InvalidAllocationTagsQueryException.java
deleted file mode 100644
index 5519e39..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/InvalidAllocationTagsQueryException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * /
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.constraint;
-
-import org.apache.hadoop.yarn.exceptions.YarnException;
-
-/**
- * Exception when invalid parameter specified to do placement tags related
- * queries.
- */
-public class InvalidAllocationTagsQueryException extends YarnException {
- private static final long serialVersionUID = 12312831974894L;
-
- public InvalidAllocationTagsQueryException(String msg) {
- super(msg);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06eb63e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
new file mode 100644
index 0000000..c278606
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
@@ -0,0 +1,431 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.LongBinaryOperator;
+
+/**
+ * Support storing maps between container-tags/applications and
+ * nodes. This will be required by affinity/anti-affinity implementation and
+ * cardinality.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class AllocationTagsManager {
+
+ private static final Logger LOG = Logger.getLogger(
+ AllocationTagsManager.class);
+
+ private ReentrantReadWriteLock.ReadLock readLock;
+ private ReentrantReadWriteLock.WriteLock writeLock;
+
+ // Application's tags to node
+ private Map<ApplicationId, NodeToCountedTags> perAppMappings =
+ new HashMap<>();
+
+ // Global tags to node mapping (used to fast return aggregated tags
+ // cardinality across apps)
+ private NodeToCountedTags globalMapping = new NodeToCountedTags();
+
+ /**
+ * Store node to counted tags.
+ */
+ @VisibleForTesting
+ static class NodeToCountedTags {
+ // Map<NodeId, Map<Tag, Count>>
+ private Map<NodeId, Map<String, Long>> nodeToTagsWithCount =
+ new HashMap<>();
+
+ // protected by external locks
+ private void addTagsToNode(NodeId nodeId, Set<String> tags) {
+ Map<String, Long> innerMap = nodeToTagsWithCount.computeIfAbsent(nodeId,
+ k -> new HashMap<>());
+
+ for (String tag : tags) {
+ Long count = innerMap.get(tag);
+ if (count == null) {
+ innerMap.put(tag, 1L);
+ } else{
+ innerMap.put(tag, count + 1);
+ }
+ }
+ }
+
+ // protected by external locks
+ private void addTagToNode(NodeId nodeId, String tag) {
+ Map<String, Long> innerMap = nodeToTagsWithCount.computeIfAbsent(nodeId,
+ k -> new HashMap<>());
+
+ Long count = innerMap.get(tag);
+ if (count == null) {
+ innerMap.put(tag, 1L);
+ } else{
+ innerMap.put(tag, count + 1);
+ }
+ }
+
+ private void removeTagFromInnerMap(Map<String, Long> innerMap, String tag) {
+ Long count = innerMap.get(tag);
+ if (count > 1) {
+ innerMap.put(tag, count - 1);
+ } else {
+ if (count <= 0) {
+ LOG.warn(
+ "Trying to remove tags from node, however the count already"
+ + " becomes 0 or less, it could be a potential bug.");
+ }
+ innerMap.remove(tag);
+ }
+ }
+
+ private void removeTagsFromNode(NodeId nodeId, Set<String> tags) {
+ Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
+ if (innerMap == null) {
+ LOG.warn("Failed to find node=" + nodeId
+ + " while trying to remove tags, please double check.");
+ return;
+ }
+
+ for (String tag : tags) {
+ removeTagFromInnerMap(innerMap, tag);
+ }
+
+ if (innerMap.isEmpty()) {
+ nodeToTagsWithCount.remove(nodeId);
+ }
+ }
+
+ private void removeTagFromNode(NodeId nodeId, String tag) {
+ Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
+ if (innerMap == null) {
+ LOG.warn("Failed to find node=" + nodeId
+ + " while trying to remove tags, please double check.");
+ return;
+ }
+
+ removeTagFromInnerMap(innerMap, tag);
+
+ if (innerMap.isEmpty()) {
+ nodeToTagsWithCount.remove(nodeId);
+ }
+ }
+
+ private long getCardinality(NodeId nodeId, String tag) {
+ Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
+ if (innerMap == null) {
+ return 0;
+ }
+ Long value = innerMap.get(tag);
+ return value == null ? 0 : value;
+ }
+
+ private long getCardinality(NodeId nodeId, Set<String> tags,
+ LongBinaryOperator op) {
+ Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
+ if (innerMap == null) {
+ return 0;
+ }
+
+ long returnValue = 0;
+ boolean firstTag = true;
+
+ if (tags != null && !tags.isEmpty()) {
+ for (String tag : tags) {
+ Long value = innerMap.get(tag);
+ if (value == null) {
+ value = 0L;
+ }
+
+ if (firstTag) {
+ returnValue = value;
+ firstTag = false;
+ continue;
+ }
+
+ returnValue = op.applyAsLong(returnValue, value);
+ }
+ } else {
+ // Similar to above if, but only iterate values for better performance
+ for (long value : innerMap.values()) {
+ // For the first value, we will not apply op
+ if (firstTag) {
+ returnValue = value;
+ firstTag = false;
+ continue;
+ }
+ returnValue = op.applyAsLong(returnValue, value);
+ }
+ }
+ return returnValue;
+ }
+
+ private boolean isEmpty() {
+ return nodeToTagsWithCount.isEmpty();
+ }
+
+ @VisibleForTesting
+ public Map<NodeId, Map<String, Long>> getNodeToTagsWithCount() {
+ return nodeToTagsWithCount;
+ }
+ }
+
+ @VisibleForTesting
+ Map<ApplicationId, NodeToCountedTags> getPerAppMappings() {
+ return perAppMappings;
+ }
+
+ @VisibleForTesting
+ NodeToCountedTags getGlobalMapping() {
+ return globalMapping;
+ }
+
+ public AllocationTagsManager() {
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ readLock = lock.readLock();
+ writeLock = lock.writeLock();
+ }
+
+ /**
+ * Notify container allocated on a node.
+ *
+ * @param nodeId allocated node.
+ * @param applicationId applicationId
+ * @param containerId container id.
+ * @param allocationTags allocation tags, see
+ * {@link SchedulingRequest#getAllocationTags()}
+ * application_id will be added to allocationTags.
+ */
+ public void addContainer(NodeId nodeId, ApplicationId applicationId,
+ ContainerId containerId, Set<String> allocationTags) {
+ String applicationIdTag =
+ AllocationTagsNamespaces.APP_ID + applicationId.toString();
+
+ boolean useSet = false;
+ if (allocationTags != null && !allocationTags.isEmpty()) {
+ // Copy before edit it.
+ allocationTags = new HashSet<>(allocationTags);
+ allocationTags.add(applicationIdTag);
+ useSet = true;
+ }
+
+ writeLock.lock();
+ try {
+ NodeToCountedTags perAppTagsMapping = perAppMappings.computeIfAbsent(
+ applicationId, k -> new NodeToCountedTags());
+
+ if (useSet) {
+ perAppTagsMapping.addTagsToNode(nodeId, allocationTags);
+ globalMapping.addTagsToNode(nodeId, allocationTags);
+ } else {
+ perAppTagsMapping.addTagToNode(nodeId, applicationIdTag);
+ globalMapping.addTagToNode(nodeId, applicationIdTag);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Added container=" + containerId + " with tags=[" + StringUtils
+ .join(allocationTags, ",") + "]");
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Notify container removed.
+ *
+ * @param nodeId nodeId
+ * @param applicationId applicationId
+ * @param containerId containerId.
+ * @param allocationTags allocation tags for given container
+ */
+ public void removeContainer(NodeId nodeId, ApplicationId applicationId,
+ ContainerId containerId, Set<String> allocationTags) {
+ String applicationIdTag =
+ AllocationTagsNamespaces.APP_ID + applicationId.toString();
+ boolean useSet = false;
+
+ if (allocationTags != null && !allocationTags.isEmpty()) {
+ // Copy before edit it.
+ allocationTags = new HashSet<>(allocationTags);
+ allocationTags.add(applicationIdTag);
+ useSet = true;
+ }
+
+ writeLock.lock();
+ try {
+ NodeToCountedTags perAppTagsMapping = perAppMappings.get(applicationId);
+ if (perAppTagsMapping == null) {
+ return;
+ }
+
+ if (useSet) {
+ perAppTagsMapping.removeTagsFromNode(nodeId, allocationTags);
+ globalMapping.removeTagsFromNode(nodeId, allocationTags);
+ } else {
+ perAppTagsMapping.removeTagFromNode(nodeId, applicationIdTag);
+ globalMapping.removeTagFromNode(nodeId, applicationIdTag);
+ }
+
+ if (perAppTagsMapping.isEmpty()) {
+ perAppMappings.remove(applicationId);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Removed container=" + containerId + " with tags=[" + StringUtils
+ .join(allocationTags, ",") + "]");
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Get cardinality for following conditions. External can pass-in a binary op
+ * to implement customized logic. *
+ * @param nodeId nodeId, required.
+ * @param applicationId applicationId. When null is specified, return
+ * aggregated cardinality among all nodes.
+ * @param tag allocation tag, see
+ * {@link SchedulingRequest#getAllocationTags()},
+ * When multiple tags specified. Returns cardinality
+ * depends on op. If a specified tag doesn't exist,
+ * 0 will be its cardinality.
+ * When null/empty tags specified, all tags
+ * (of the node/app) will be considered.
+ * @return cardinality of specified query on the node.
+ * @throws InvalidAllocationTagsQueryException when illegal query
+ * parameter specified
+ */
+ public long getNodeCardinality(NodeId nodeId, ApplicationId applicationId,
+ String tag) throws InvalidAllocationTagsQueryException {
+ readLock.lock();
+
+ try {
+ if (nodeId == null) {
+ throw new InvalidAllocationTagsQueryException(
+ "Must specify nodeId/tags/op to query cardinality");
+ }
+
+ NodeToCountedTags mapping;
+ if (applicationId != null) {
+ mapping = perAppMappings.get(applicationId);
+ } else{
+ mapping = globalMapping;
+ }
+
+ if (mapping == null) {
+ return 0;
+ }
+
+ return mapping.getCardinality(nodeId, tag);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * Check if given tag exists on node.
+ *
+ * @param nodeId nodeId, required.
+ * @param applicationId applicationId. When null is specified, return
+ * aggregated cardinality among all nodes.
+ * @param tag allocation tag, see
+ * {@link SchedulingRequest#getAllocationTags()},
+ * When multiple tags specified. Returns cardinality
+ * depends on op. If a specified tag doesn't exist,
+ * 0 will be its cardinality.
+ * When null/empty tags specified, all tags
+ * (of the node/app) will be considered.
+ * @return cardinality of specified query on the node.
+ * @throws InvalidAllocationTagsQueryException when illegal query
+ * parameter specified
+ */
+ public boolean allocationTagExistsOnNode(NodeId nodeId,
+ ApplicationId applicationId, String tag)
+ throws InvalidAllocationTagsQueryException {
+ return getNodeCardinality(nodeId, applicationId, tag) > 0;
+ }
+
+ /**
+ * Get cardinality for following conditions. External can pass-in a binary op
+ * to implement customized logic.
+ *
+ * @param nodeId nodeId, required.
+ * @param applicationId applicationId. When null is specified, return
+ * aggregated cardinality among all nodes.
+ * @param tags allocation tags, see
+ * {@link SchedulingRequest#getAllocationTags()},
+ * When multiple tags specified. Returns cardinality
+ * depends on op. If a specified tag doesn't exist, 0
+ * will be its cardinality. When null/empty tags
+ * specified, all tags (of the node/app) will be
+ * considered.
+ * @param op operator. Such as Long::max, Long::sum, etc. Required.
+ * This sparameter only take effect when #values >= 2.
+ * @return cardinality of specified query on the node.
+ * @throws InvalidAllocationTagsQueryException when illegal query
+ * parameter specified
+ */
+ public long getNodeCardinalityByOp(NodeId nodeId, ApplicationId applicationId,
+ Set<String> tags, LongBinaryOperator op)
+ throws InvalidAllocationTagsQueryException {
+ readLock.lock();
+
+ try {
+ if (nodeId == null || op == null) {
+ throw new InvalidAllocationTagsQueryException(
+ "Must specify nodeId/tags/op to query cardinality");
+ }
+
+ NodeToCountedTags mapping;
+ if (applicationId != null) {
+ mapping = perAppMappings.get(applicationId);
+ } else{
+ mapping = globalMapping;
+ }
+
+ if (mapping == null) {
+ return 0;
+ }
+
+ return mapping.getCardinality(nodeId, tags, op);
+ } finally {
+ readLock.unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06eb63e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsNamespaces.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsNamespaces.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsNamespaces.java
new file mode 100644
index 0000000..43fcfe5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsNamespaces.java
@@ -0,0 +1,31 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+/**
+ * Predefined namespaces for tags
+ *
+ * Same as namespace of resource types. Namespaces of placement tags are start
+ * with alphabets and ended with "/"
+ */
+public class AllocationTagsNamespaces {
+ public static final String APP_ID = "yarn_app_id/";
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06eb63e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/InvalidAllocationTagsQueryException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/InvalidAllocationTagsQueryException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/InvalidAllocationTagsQueryException.java
new file mode 100644
index 0000000..29483a2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/InvalidAllocationTagsQueryException.java
@@ -0,0 +1,35 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * Exception when invalid parameter specified to do placement tags related
+ * queries.
+ */
+public class InvalidAllocationTagsQueryException extends YarnException {
+ private static final long serialVersionUID = 12312831974894L;
+
+ public InvalidAllocationTagsQueryException(String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06eb63e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithm.java
new file mode 100644
index 0000000..2651663
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithm.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api;
+
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+
+/**
+ * Marker interface for a Constraint Placement. The only contract is that it
+ * should be initialized with the RMContext.
+ */
+public interface ConstraintPlacementAlgorithm {
+
+ /**
+ * Initialize the Algorithm.
+ * @param rmContext RMContext.
+ */
+ void init(RMContext rmContext);
+
+ /**
+ * The Algorithm is expected to compute the placement of the provided
+ * ConstraintPlacementAlgorithmInput and use the collector to aggregate
+ * any output.
+ * @param algorithmInput Input to the Algorithm.
+ * @param collector Collector for output of algorithm.
+ */
+ void place(ConstraintPlacementAlgorithmInput algorithmInput,
+ ConstraintPlacementAlgorithmOutputCollector collector);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06eb63e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithmInput.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithmInput.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithmInput.java
new file mode 100644
index 0000000..74572b8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithmInput.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api;
+
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+
+import java.util.Collection;
+
+/**
+ * This encapsulates an input to the Constraint Placement Algorithm. At the
+ * very least it must consist of a collection of SchedulerRequests.
+ */
+public interface ConstraintPlacementAlgorithmInput {
+
+ Collection<SchedulingRequest> getSchedulingRequests();
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org