You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/07/10 19:16:41 UTC
[1/6] hadoop git commit: YARN-3800. Reduce storage footprint for
ReservationAllocation. Contributed by Anubhav Dhoot.
Repository: hadoop
Updated Branches:
refs/heads/YARN-1197 f4ca530c1 -> 08244264c
YARN-3800. Reduce storage footprint for ReservationAllocation. Contributed by Anubhav Dhoot.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0e602fa3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0e602fa3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0e602fa3
Branch: refs/heads/YARN-1197
Commit: 0e602fa3a1529134214452fba10a90307d9c2072
Parents: f4ca530
Author: carlo curino <Carlo Curino>
Authored: Thu Jul 9 16:47:35 2015 -0700
Committer: carlo curino <Carlo Curino>
Committed: Thu Jul 9 16:51:59 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 ++
.../reservation/GreedyReservationAgent.java | 27 ++++++-----
.../reservation/InMemoryPlan.java | 9 ++--
.../InMemoryReservationAllocation.java | 24 +++++----
.../RLESparseResourceAllocation.java | 43 ++---------------
.../reservation/ReservationAllocation.java | 3 +-
.../reservation/ReservationSystemUtil.java | 51 ++++++++++++++++++++
.../reservation/ReservationSystemTestUtil.java | 11 +++--
.../reservation/TestCapacityOverTimePolicy.java | 16 +++---
.../reservation/TestGreedyReservationAgent.java | 2 +-
.../reservation/TestInMemoryPlan.java | 37 ++++++++++----
.../TestInMemoryReservationAllocation.java | 29 ++++++-----
.../TestRLESparseResourceAllocation.java | 33 ++++++-------
.../TestSimpleCapacityReplanner.java | 11 +++--
14 files changed, 176 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 3c232eb..89b5e9f 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -323,6 +323,9 @@ Release 2.8.0 - UNRELEASED
YARN-3827. Migrate YARN native build to new CMake framework (Alan Burlison
via Colin P. McCabe)
+ YARN-3800. Reduce storage footprint for ReservationAllocation. (Anubhav Dhoot
+ via curino)
+
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java
index 5a61b94..214df1c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java
@@ -97,8 +97,8 @@ public class GreedyReservationAgent implements ReservationAgent {
long curDeadline = deadline;
long oldDeadline = -1;
- Map<ReservationInterval, ReservationRequest> allocations =
- new HashMap<ReservationInterval, ReservationRequest>();
+ Map<ReservationInterval, Resource> allocations =
+ new HashMap<ReservationInterval, Resource>();
RLESparseResourceAllocation tempAssigned =
new RLESparseResourceAllocation(plan.getResourceCalculator(),
plan.getMinimumAllocation());
@@ -108,6 +108,8 @@ public class GreedyReservationAgent implements ReservationAgent {
ReservationRequestInterpreter type = contract.getReservationRequests()
.getInterpreter();
+ boolean hasGang = false;
+
// Iterate the stages in backward from deadline
for (ListIterator<ReservationRequest> li =
stages.listIterator(stages.size()); li.hasPrevious();) {
@@ -117,8 +119,10 @@ public class GreedyReservationAgent implements ReservationAgent {
// validate the RR respect basic constraints
validateInput(plan, currentReservationStage, totalCapacity);
+ hasGang |= currentReservationStage.getConcurrency() > 1;
+
// run allocation for a single stage
- Map<ReservationInterval, ReservationRequest> curAlloc =
+ Map<ReservationInterval, Resource> curAlloc =
placeSingleStage(plan, tempAssigned, currentReservationStage,
earliestStart, curDeadline, oldReservation, totalCapacity);
@@ -178,8 +182,7 @@ public class GreedyReservationAgent implements ReservationAgent {
// create reservation with above allocations if not null/empty
- ReservationRequest ZERO_RES =
- ReservationRequest.newInstance(Resource.newInstance(0, 0), 0);
+ Resource ZERO_RES = Resource.newInstance(0, 0);
long firstStartTime = findEarliestTime(allocations.keySet());
@@ -200,7 +203,7 @@ public class GreedyReservationAgent implements ReservationAgent {
new InMemoryReservationAllocation(reservationId, contract, user,
plan.getQueueName(), firstStartTime,
findLatestTime(allocations.keySet()), allocations,
- plan.getResourceCalculator(), plan.getMinimumAllocation());
+ plan.getResourceCalculator(), plan.getMinimumAllocation(), hasGang);
if (oldReservation != null) {
return plan.updateReservation(capReservation);
} else {
@@ -242,13 +245,13 @@ public class GreedyReservationAgent implements ReservationAgent {
* previous instant in time until the time-window is exhausted or we placed
* all the user request.
*/
- private Map<ReservationInterval, ReservationRequest> placeSingleStage(
+ private Map<ReservationInterval, Resource> placeSingleStage(
Plan plan, RLESparseResourceAllocation tempAssigned,
ReservationRequest rr, long earliestStart, long curDeadline,
ReservationAllocation oldResAllocation, final Resource totalCapacity) {
- Map<ReservationInterval, ReservationRequest> allocationRequests =
- new HashMap<ReservationInterval, ReservationRequest>();
+ Map<ReservationInterval, Resource> allocationRequests =
+ new HashMap<ReservationInterval, Resource>();
// compute the gang as a resource and get the duration
Resource gang = Resources.multiply(rr.getCapability(), rr.getConcurrency());
@@ -322,7 +325,7 @@ public class GreedyReservationAgent implements ReservationAgent {
ReservationInterval reservationInt =
new ReservationInterval(curDeadline - dur, curDeadline);
- ReservationRequest reservationRes =
+ ReservationRequest reservationRequest =
ReservationRequest.newInstance(rr.getCapability(),
rr.getConcurrency() * maxGang, rr.getConcurrency(),
rr.getDuration());
@@ -331,6 +334,8 @@ public class GreedyReservationAgent implements ReservationAgent {
// placing other ReservationRequest within the same
// ReservationDefinition,
// and we must avoid double-counting the available resources
+ final Resource reservationRes = ReservationSystemUtil.toResource(
+ reservationRequest);
tempAssigned.addInterval(reservationInt, reservationRes);
allocationRequests.put(reservationInt, reservationRes);
@@ -350,7 +355,7 @@ public class GreedyReservationAgent implements ReservationAgent {
// If we are here is becasue we did not manage to satisfy this request.
// So we need to remove unwanted side-effect from tempAssigned (needed
// for ANY).
- for (Map.Entry<ReservationInterval, ReservationRequest> tempAllocation :
+ for (Map.Entry<ReservationInterval, Resource> tempAllocation :
allocationRequests.entrySet()) {
tempAssigned.removeInterval(tempAllocation.getKey(),
tempAllocation.getValue());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
index ce2e7d7..50d66cf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
@@ -31,7 +31,6 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@@ -110,7 +109,7 @@ class InMemoryPlan implements Plan {
private void incrementAllocation(ReservationAllocation reservation) {
assert (readWriteLock.isWriteLockedByCurrentThread());
- Map<ReservationInterval, ReservationRequest> allocationRequests =
+ Map<ReservationInterval, Resource> allocationRequests =
reservation.getAllocationRequests();
// check if we have encountered the user earlier and if not add an entry
String user = reservation.getUser();
@@ -119,7 +118,7 @@ class InMemoryPlan implements Plan {
resAlloc = new RLESparseResourceAllocation(resCalc, minAlloc);
userResourceAlloc.put(user, resAlloc);
}
- for (Map.Entry<ReservationInterval, ReservationRequest> r : allocationRequests
+ for (Map.Entry<ReservationInterval, Resource> r : allocationRequests
.entrySet()) {
resAlloc.addInterval(r.getKey(), r.getValue());
rleSparseVector.addInterval(r.getKey(), r.getValue());
@@ -128,11 +127,11 @@ class InMemoryPlan implements Plan {
private void decrementAllocation(ReservationAllocation reservation) {
assert (readWriteLock.isWriteLockedByCurrentThread());
- Map<ReservationInterval, ReservationRequest> allocationRequests =
+ Map<ReservationInterval, Resource> allocationRequests =
reservation.getAllocationRequests();
String user = reservation.getUser();
RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user);
- for (Map.Entry<ReservationInterval, ReservationRequest> r : allocationRequests
+ for (Map.Entry<ReservationInterval, Resource> r : allocationRequests
.entrySet()) {
resAlloc.removeInterval(r.getKey(), r.getValue());
rleSparseVector.removeInterval(r.getKey(), r.getValue());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
index fc8407b..a4dd23b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
@@ -22,7 +22,6 @@ import java.util.Map;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -40,7 +39,7 @@ class InMemoryReservationAllocation implements ReservationAllocation {
private final ReservationDefinition contract;
private final long startTime;
private final long endTime;
- private final Map<ReservationInterval, ReservationRequest> allocationRequests;
+ private final Map<ReservationInterval, Resource> allocationRequests;
private boolean hasGang = false;
private long acceptedAt = -1;
@@ -49,22 +48,29 @@ class InMemoryReservationAllocation implements ReservationAllocation {
InMemoryReservationAllocation(ReservationId reservationID,
ReservationDefinition contract, String user, String planName,
long startTime, long endTime,
- Map<ReservationInterval, ReservationRequest> allocationRequests,
+ Map<ReservationInterval, Resource> allocations,
ResourceCalculator calculator, Resource minAlloc) {
+ this(reservationID, contract, user, planName, startTime, endTime,
+ allocations, calculator, minAlloc, false);
+ }
+
+ InMemoryReservationAllocation(ReservationId reservationID,
+ ReservationDefinition contract, String user, String planName,
+ long startTime, long endTime,
+ Map<ReservationInterval, Resource> allocations,
+ ResourceCalculator calculator, Resource minAlloc, boolean hasGang) {
this.contract = contract;
this.startTime = startTime;
this.endTime = endTime;
this.reservationID = reservationID;
this.user = user;
- this.allocationRequests = allocationRequests;
+ this.allocationRequests = allocations;
this.planName = planName;
+ this.hasGang = hasGang;
resourcesOverTime = new RLESparseResourceAllocation(calculator, minAlloc);
- for (Map.Entry<ReservationInterval, ReservationRequest> r : allocationRequests
+ for (Map.Entry<ReservationInterval, Resource> r : allocations
.entrySet()) {
resourcesOverTime.addInterval(r.getKey(), r.getValue());
- if (r.getValue().getConcurrency() > 1) {
- hasGang = true;
- }
}
}
@@ -89,7 +95,7 @@ class InMemoryReservationAllocation implements ReservationAllocation {
}
@Override
- public Map<ReservationInterval, ReservationRequest> getAllocationRequests() {
+ public Map<ReservationInterval, Resource> getAllocationRequests() {
return Collections.unmodifiableMap(allocationRequests);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
index 3f6f405..2957cc6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.io.IOException;
import java.io.StringWriter;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
@@ -31,9 +30,7 @@ import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -80,14 +77,11 @@ public class RLESparseResourceAllocation {
*
* @param reservationInterval the interval for which the resource is to be
* added
- * @param capacity the resource to be added
+ * @param totCap the resource to be added
* @return true if addition is successful, false otherwise
*/
public boolean addInterval(ReservationInterval reservationInterval,
- ReservationRequest capacity) {
- Resource totCap =
- Resources.multiply(capacity.getCapability(),
- (float) capacity.getNumContainers());
+ Resource totCap) {
if (totCap.equals(ZERO_RESOURCE)) {
return true;
}
@@ -143,44 +137,15 @@ public class RLESparseResourceAllocation {
}
/**
- * Add multiple resources for the specified interval
- *
- * @param reservationInterval the interval for which the resource is to be
- * added
- * @param ReservationRequests the resources to be added
- * @param clusterResource the total resources in the cluster
- * @return true if addition is successful, false otherwise
- */
- public boolean addCompositeInterval(ReservationInterval reservationInterval,
- List<ReservationRequest> ReservationRequests, Resource clusterResource) {
- ReservationRequest aggregateReservationRequest =
- Records.newRecord(ReservationRequest.class);
- Resource capacity = Resource.newInstance(0, 0);
- for (ReservationRequest ReservationRequest : ReservationRequests) {
- Resources.addTo(capacity, Resources.multiply(
- ReservationRequest.getCapability(),
- ReservationRequest.getNumContainers()));
- }
- aggregateReservationRequest.setNumContainers((int) Math.ceil(Resources
- .divide(resourceCalculator, clusterResource, capacity, minAlloc)));
- aggregateReservationRequest.setCapability(minAlloc);
-
- return addInterval(reservationInterval, aggregateReservationRequest);
- }
-
- /**
* Removes a resource for the specified interval
*
* @param reservationInterval the interval for which the resource is to be
* removed
- * @param capacity the resource to be removed
+ * @param totCap the resource to be removed
* @return true if removal is successful, false otherwise
*/
public boolean removeInterval(ReservationInterval reservationInterval,
- ReservationRequest capacity) {
- Resource totCap =
- Resources.multiply(capacity.getCapability(),
- (float) capacity.getNumContainers());
+ Resource totCap) {
if (totCap.equals(ZERO_RESOURCE)) {
return true;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
index 89c0e55..0d3c692 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
@@ -22,7 +22,6 @@ import java.util.Map;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
/**
@@ -71,7 +70,7 @@ public interface ReservationAllocation extends
* @return the allocationRequests the map of resources requested against the
* time interval for which they were
*/
- public Map<ReservationInterval, ReservationRequest> getAllocationRequests();
+ public Map<ReservationInterval, Resource> getAllocationRequests();
/**
* Return a string identifying the plan to which the reservation belongs
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java
new file mode 100644
index 0000000..8affae4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.util.HashMap;
+import java.util.Map;
+
+final class ReservationSystemUtil {
+
+ private ReservationSystemUtil() {
+ // not called
+ }
+
+ public static Resource toResource(ReservationRequest request) {
+ Resource resource = Resources.multiply(request.getCapability(),
+ (float) request.getNumContainers());
+ return resource;
+ }
+
+ public static Map<ReservationInterval, Resource> toResources(
+ Map<ReservationInterval, ReservationRequest> allocations) {
+ Map<ReservationInterval, Resource> resources =
+ new HashMap<ReservationInterval, Resource>();
+ for (Map.Entry<ReservationInterval, ReservationRequest> entry :
+ allocations.entrySet()) {
+ resources.put(entry.getKey(),
+ toResource(entry.getValue()));
+ }
+ return resources;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
index bfaf06b..be1d69a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
@@ -378,14 +378,15 @@ public class ReservationSystemTestUtil {
return rr;
}
- public static Map<ReservationInterval, ReservationRequest> generateAllocation(
+ public static Map<ReservationInterval, Resource> generateAllocation(
long startTime, long step, int[] alloc) {
- Map<ReservationInterval, ReservationRequest> req =
- new TreeMap<ReservationInterval, ReservationRequest>();
+ Map<ReservationInterval, Resource> req =
+ new TreeMap<ReservationInterval, Resource>();
for (int i = 0; i < alloc.length; i++) {
req.put(new ReservationInterval(startTime + i * step, startTime + (i + 1)
- * step), ReservationRequest.newInstance(
- Resource.newInstance(1024, 1), alloc[i]));
+ * step), ReservationSystemUtil.toResource(ReservationRequest
+ .newInstance(
+ Resource.newInstance(1024, 1), alloc[i])));
}
return req;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
index 61561e9..19f876d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Map;
@@ -198,12 +197,14 @@ public class TestCapacityOverTimePolicy {
@Test(expected = PlanningQuotaException.class)
public void testFailAvg() throws IOException, PlanningException {
// generate an allocation which violates the 25% average single-shot
- Map<ReservationInterval, ReservationRequest> req =
- new TreeMap<ReservationInterval, ReservationRequest>();
+ Map<ReservationInterval, Resource> req =
+ new TreeMap<ReservationInterval, Resource>();
long win = timeWindow / 2 + 100;
int cont = (int) Math.ceil(0.5 * totCont);
req.put(new ReservationInterval(initTime, initTime + win),
- ReservationRequest.newInstance(Resource.newInstance(1024, 1), cont));
+ ReservationSystemUtil.toResource(
+ ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+ cont)));
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
@@ -214,12 +215,13 @@ public class TestCapacityOverTimePolicy {
@Test
public void testFailAvgBySum() throws IOException, PlanningException {
// generate an allocation which violates the 25% average by sum
- Map<ReservationInterval, ReservationRequest> req =
- new TreeMap<ReservationInterval, ReservationRequest>();
+ Map<ReservationInterval, Resource> req =
+ new TreeMap<ReservationInterval, Resource>();
long win = 86400000 / 4 + 1;
int cont = (int) Math.ceil(0.5 * totCont);
req.put(new ReservationInterval(initTime, initTime + win),
- ReservationRequest.newInstance(Resource.newInstance(1024, 1), cont));
+ ReservationSystemUtil.toResource(ReservationRequest.newInstance(Resource
+ .newInstance(1024, 1), cont)));
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java
index b8cf6c5..de94dcd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java
@@ -516,7 +516,7 @@ public class TestGreedyReservationAgent {
.generateAllocation(0, step, f), res, minAlloc)));
int[] f2 = { 5, 5, 5, 5, 5, 5, 5 };
- Map<ReservationInterval, ReservationRequest> alloc =
+ Map<ReservationInterval, Resource> alloc =
ReservationSystemTestUtil.generateAllocation(5000, step, f2);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
index 91c1962..722fb29 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
@@ -100,9 +100,11 @@ public class TestInMemoryPlan {
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length,
alloc.length, allocations.values());
+ Map<ReservationInterval, Resource> allocs =
+ ReservationSystemUtil.toResources(allocations);
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
- start, start + alloc.length, allocations, resCalc, minAlloc);
+ start, start + alloc.length, allocs, resCalc, minAlloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation);
@@ -132,9 +134,11 @@ public class TestInMemoryPlan {
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length,
alloc.length, allocations.values());
+ Map<ReservationInterval, Resource> allocs = ReservationSystemUtil.toResources
+ (allocations);
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
- start, start + alloc.length, allocations, resCalc, minAlloc);
+ start, start + alloc.length, allocs, resCalc, minAlloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation);
@@ -158,9 +162,11 @@ public class TestInMemoryPlan {
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length,
alloc.length, allocations.values());
+ Map<ReservationInterval, Resource> allocs = ReservationSystemUtil.toResources
+ (allocations);
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
- start, start + alloc.length, allocations, resCalc, minAlloc);
+ start, start + alloc.length, allocs, resCalc, minAlloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation);
@@ -202,9 +208,11 @@ public class TestInMemoryPlan {
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length,
alloc.length, allocations.values());
+ Map<ReservationInterval, Resource> allocs = ReservationSystemUtil.toResources
+ (allocations);
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
- start, start + alloc.length, allocations, resCalc, minAlloc);
+ start, start + alloc.length, allocs, resCalc, minAlloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation);
@@ -226,9 +234,12 @@ public class TestInMemoryPlan {
rDef =
createSimpleReservationDefinition(start, start + updatedAlloc.length,
updatedAlloc.length, allocations.values());
+ Map<ReservationInterval, Resource> updatedAllocs =
+ ReservationSystemUtil.toResources(allocations);
rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
- start, start + updatedAlloc.length, allocations, resCalc, minAlloc);
+ start, start + updatedAlloc.length, updatedAllocs, resCalc,
+ minAlloc);
try {
plan.updateReservation(rAllocation);
} catch (PlanningException e) {
@@ -260,9 +271,11 @@ public class TestInMemoryPlan {
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length,
alloc.length, allocations.values());
+ Map<ReservationInterval, Resource> allocs =
+ ReservationSystemUtil.toResources(allocations);
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
- start, start + alloc.length, allocations, resCalc, minAlloc);
+ start, start + alloc.length, allocs, resCalc, minAlloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.updateReservation(rAllocation);
@@ -290,9 +303,11 @@ public class TestInMemoryPlan {
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length,
alloc.length, allocations.values());
+ Map<ReservationInterval, Resource> allocs =
+ ReservationSystemUtil.toResources(allocations);
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
- start, start + alloc.length, allocations, resCalc, minAlloc);
+ start, start + alloc.length, allocs, resCalc, minAlloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation);
@@ -359,9 +374,11 @@ public class TestInMemoryPlan {
ReservationDefinition rDef1 =
createSimpleReservationDefinition(start, start + alloc1.length,
alloc1.length, allocations1.values());
+ Map<ReservationInterval, Resource> allocs1 =
+ ReservationSystemUtil.toResources(allocations1);
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID1, rDef1, user,
- planName, start, start + alloc1.length, allocations1, resCalc,
+ planName, start, start + alloc1.length, allocs1, resCalc,
minAlloc);
Assert.assertNull(plan.getReservationById(reservationID1));
try {
@@ -388,9 +405,11 @@ public class TestInMemoryPlan {
ReservationDefinition rDef2 =
createSimpleReservationDefinition(start, start + alloc2.length,
alloc2.length, allocations2.values());
+ Map<ReservationInterval, Resource> allocs2 =
+ ReservationSystemUtil.toResources(allocations2);
rAllocation =
new InMemoryReservationAllocation(reservationID2, rDef2, user,
- planName, start, start + alloc2.length, allocations2, resCalc,
+ planName, start, start + alloc2.length, allocs2, resCalc,
minAlloc);
Assert.assertNull(plan.getReservationById(reservationID2));
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java
index 76f39dc..55224a9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java
@@ -69,7 +69,7 @@ public class TestInMemoryReservationAllocation {
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length + 1,
alloc.length);
- Map<ReservationInterval, ReservationRequest> allocations =
+ Map<ReservationInterval, Resource> allocations =
generateAllocation(start, alloc, false, false);
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
@@ -91,7 +91,7 @@ public class TestInMemoryReservationAllocation {
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length + 1,
alloc.length);
- Map<ReservationInterval, ReservationRequest> allocations =
+ Map<ReservationInterval, Resource> allocations =
generateAllocation(start, alloc, true, false);
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
@@ -114,7 +114,7 @@ public class TestInMemoryReservationAllocation {
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length + 1,
alloc.length);
- Map<ReservationInterval, ReservationRequest> allocations =
+ Map<ReservationInterval, Resource> allocations =
generateAllocation(start, alloc, true, false);
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
@@ -137,8 +137,8 @@ public class TestInMemoryReservationAllocation {
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length + 1,
alloc.length);
- Map<ReservationInterval, ReservationRequest> allocations =
- new HashMap<ReservationInterval, ReservationRequest>();
+ Map<ReservationInterval, Resource> allocations =
+ new HashMap<ReservationInterval, Resource>();
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length + 1, allocations, resCalc, minAlloc);
@@ -156,11 +156,13 @@ public class TestInMemoryReservationAllocation {
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length + 1,
alloc.length);
- Map<ReservationInterval, ReservationRequest> allocations =
- generateAllocation(start, alloc, false, true);
+ boolean isGang = true;
+ Map<ReservationInterval, Resource> allocations =
+ generateAllocation(start, alloc, false, isGang);
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
- start, start + alloc.length + 1, allocations, resCalc, minAlloc);
+ start, start + alloc.length + 1, allocations, resCalc, minAlloc,
+ isGang);
doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc);
Assert.assertTrue(rAllocation.containsGangs());
for (int i = 0; i < alloc.length; i++) {
@@ -171,7 +173,7 @@ public class TestInMemoryReservationAllocation {
private void doAssertions(ReservationAllocation rAllocation,
ReservationId reservationID, ReservationDefinition rDef,
- Map<ReservationInterval, ReservationRequest> allocations, int start,
+ Map<ReservationInterval, Resource> allocations, int start,
int[] alloc) {
Assert.assertEquals(reservationID, rAllocation.getReservationId());
Assert.assertEquals(rDef, rAllocation.getReservationDefinition());
@@ -198,10 +200,10 @@ public class TestInMemoryReservationAllocation {
return rDef;
}
- private Map<ReservationInterval, ReservationRequest> generateAllocation(
+ private Map<ReservationInterval, Resource> generateAllocation(
int startTime, int[] alloc, boolean isStep, boolean isGang) {
- Map<ReservationInterval, ReservationRequest> req =
- new HashMap<ReservationInterval, ReservationRequest>();
+ Map<ReservationInterval, Resource> req =
+ new HashMap<ReservationInterval, Resource>();
int numContainers = 0;
for (int i = 0; i < alloc.length; i++) {
if (isStep) {
@@ -215,7 +217,8 @@ public class TestInMemoryReservationAllocation {
if (isGang) {
rr.setConcurrency(numContainers);
}
- req.put(new ReservationInterval(startTime + i, startTime + i + 1), rr);
+ req.put(new ReservationInterval(startTime + i, startTime + i + 1),
+ ReservationSystemUtil.toResource(rr));
}
return req;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
index c7301c7..d0f4dc6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
@@ -46,9 +46,9 @@ public class TestRLESparseResourceAllocation {
new RLESparseResourceAllocation(resCalc, minAlloc);
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
- Set<Entry<ReservationInterval, ReservationRequest>> inputs =
+ Set<Entry<ReservationInterval, Resource>> inputs =
generateAllocation(start, alloc, false).entrySet();
- for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+ for (Entry<ReservationInterval, Resource> ip : inputs) {
rleSparseVector.addInterval(ip.getKey(), ip.getValue());
}
LOG.info(rleSparseVector.toString());
@@ -63,7 +63,7 @@ public class TestRLESparseResourceAllocation {
}
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(start + alloc.length + 2));
- for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+ for (Entry<ReservationInterval, Resource> ip : inputs) {
rleSparseVector.removeInterval(ip.getKey(), ip.getValue());
}
LOG.info(rleSparseVector.toString());
@@ -83,9 +83,9 @@ public class TestRLESparseResourceAllocation {
new RLESparseResourceAllocation(resCalc, minAlloc);
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
- Set<Entry<ReservationInterval, ReservationRequest>> inputs =
+ Set<Entry<ReservationInterval, Resource>> inputs =
generateAllocation(start, alloc, true).entrySet();
- for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+ for (Entry<ReservationInterval, Resource> ip : inputs) {
rleSparseVector.addInterval(ip.getKey(), ip.getValue());
}
LOG.info(rleSparseVector.toString());
@@ -101,8 +101,8 @@ public class TestRLESparseResourceAllocation {
}
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(start + alloc.length + 2));
- for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
- rleSparseVector.removeInterval(ip.getKey(), ip.getValue());
+ for (Entry<ReservationInterval, Resource> ip : inputs) {
+ rleSparseVector.removeInterval(ip.getKey(),ip.getValue());
}
LOG.info(rleSparseVector.toString());
for (int i = 0; i < alloc.length; i++) {
@@ -121,9 +121,9 @@ public class TestRLESparseResourceAllocation {
new RLESparseResourceAllocation(resCalc, minAlloc);
int[] alloc = { 0, 5, 10, 10, 5, 0 };
int start = 100;
- Set<Entry<ReservationInterval, ReservationRequest>> inputs =
+ Set<Entry<ReservationInterval, Resource>> inputs =
generateAllocation(start, alloc, true).entrySet();
- for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+ for (Entry<ReservationInterval, Resource> ip : inputs) {
rleSparseVector.addInterval(ip.getKey(), ip.getValue());
}
LOG.info(rleSparseVector.toString());
@@ -139,7 +139,7 @@ public class TestRLESparseResourceAllocation {
}
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(start + alloc.length + 2));
- for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+ for (Entry<ReservationInterval, Resource> ip : inputs) {
rleSparseVector.removeInterval(ip.getKey(), ip.getValue());
}
LOG.info(rleSparseVector.toString());
@@ -157,17 +157,17 @@ public class TestRLESparseResourceAllocation {
RLESparseResourceAllocation rleSparseVector =
new RLESparseResourceAllocation(resCalc, minAlloc);
rleSparseVector.addInterval(new ReservationInterval(0, Long.MAX_VALUE),
- ReservationRequest.newInstance(Resource.newInstance(0, 0), (0)));
+ Resource.newInstance(0, 0));
LOG.info(rleSparseVector.toString());
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(new Random().nextLong()));
Assert.assertTrue(rleSparseVector.isEmpty());
}
- private Map<ReservationInterval, ReservationRequest> generateAllocation(
+ private Map<ReservationInterval, Resource> generateAllocation(
int startTime, int[] alloc, boolean isStep) {
- Map<ReservationInterval, ReservationRequest> req =
- new HashMap<ReservationInterval, ReservationRequest>();
+ Map<ReservationInterval, Resource> req =
+ new HashMap<ReservationInterval, Resource>();
int numContainers = 0;
for (int i = 0; i < alloc.length; i++) {
if (isStep) {
@@ -176,9 +176,8 @@ public class TestRLESparseResourceAllocation {
numContainers = alloc[i];
}
req.put(new ReservationInterval(startTime + i, startTime + i + 1),
-
- ReservationRequest.newInstance(Resource.newInstance(1024, 1),
- (numContainers)));
+ ReservationSystemUtil.toResource(ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), (numContainers))));
}
return req;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java
index 1ca9f2e..d4a97ba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java
@@ -146,14 +146,15 @@ public class TestSimpleCapacityReplanner {
}
}
- private Map<ReservationInterval, ReservationRequest> generateAllocation(
+ private Map<ReservationInterval, Resource> generateAllocation(
int startTime, int[] alloc) {
- Map<ReservationInterval, ReservationRequest> req =
- new TreeMap<ReservationInterval, ReservationRequest>();
+ Map<ReservationInterval, Resource> req =
+ new TreeMap<ReservationInterval, Resource>();
for (int i = 0; i < alloc.length; i++) {
req.put(new ReservationInterval(startTime + i, startTime + i + 1),
- ReservationRequest.newInstance(Resource.newInstance(1024, 1),
- alloc[i]));
+ ReservationSystemUtil.toResource(
+ ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+ alloc[i])));
}
return req;
}
[3/6] hadoop git commit: YARN-3888. ApplicationMaster link is broken
in RM WebUI when appstate is NEW. Contributed by Bibin A Chundatt
Posted by ji...@apache.org.
YARN-3888. ApplicationMaster link is broken in RM WebUI when appstate is
NEW. Contributed by Bibin A Chundatt
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/52148767
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/52148767
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/52148767
Branch: refs/heads/YARN-1197
Commit: 52148767924baf423172d26f2c6d8a4cfc6e143f
Parents: 1a0752d
Author: Xuan <xg...@apache.org>
Authored: Thu Jul 9 21:37:33 2015 -0700
Committer: Xuan <xg...@apache.org>
Committed: Thu Jul 9 21:37:33 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +++
.../yarn/server/resourcemanager/webapp/RMAppsBlock.java | 10 ++++++----
2 files changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52148767/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 89b5e9f..2a9ff98 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -607,6 +607,9 @@ Release 2.8.0 - UNRELEASED
YARN-3892. Fixed NPE on RMStateStore#serviceStop when
CapacityScheduler#serviceInit fails. (Bibin A Chundatt via jianhe)
+ YARN-3888. ApplicationMaster link is broken in RM WebUI when appstate is NEW.
+ (Bibin A Chundatt via xgong)
+
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52148767/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppsBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppsBlock.java
index d252c30..5e80d23 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppsBlock.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppsBlock.java
@@ -131,13 +131,15 @@ public class RMAppsBlock extends AppsBlock {
String trackingURL =
app.getTrackingUrl() == null
- || app.getTrackingUrl().equals(UNAVAILABLE) ? null : app
- .getTrackingUrl();
+ || app.getTrackingUrl().equals(UNAVAILABLE)
+ || app.getAppState() == YarnApplicationState.NEW ? null : app
+ .getTrackingUrl();
String trackingUI =
app.getTrackingUrl() == null
- || app.getTrackingUrl().equals(UNAVAILABLE) ? "Unassigned" : app
- .getAppState() == YarnApplicationState.FINISHED
+ || app.getTrackingUrl().equals(UNAVAILABLE)
+ || app.getAppState() == YarnApplicationState.NEW ? "Unassigned"
+ : app.getAppState() == YarnApplicationState.FINISHED
|| app.getAppState() == YarnApplicationState.FAILED
|| app.getAppState() == YarnApplicationState.KILLED ? "History"
: "ApplicationMaster";
[4/6] hadoop git commit: HDFS-8749. Fix findbugs warnings in
BlockManager.java. Contributed by Brahma Reddy Battula.
Posted by ji...@apache.org.
HDFS-8749. Fix findbugs warnings in BlockManager.java. Contributed by Brahma Reddy Battula.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d66302ed
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d66302ed
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d66302ed
Branch: refs/heads/YARN-1197
Commit: d66302ed9b2c25b560d8319d6d755aee7cfa4d67
Parents: 5214876
Author: Akira Ajisaka <aa...@apache.org>
Authored: Fri Jul 10 15:04:06 2015 +0900
Committer: Akira Ajisaka <aa...@apache.org>
Committed: Fri Jul 10 15:04:06 2015 +0900
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +++
.../apache/hadoop/hdfs/server/blockmanagement/BlockManager.java | 2 --
2 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d66302ed/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index e26e061..5c1208d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1026,6 +1026,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8729. Fix TestFileTruncate#testTruncateWithDataNodesRestartImmediately
which occasionally failed. (Walter Su via jing9)
+ HDFS-8749. Fix findbugs warnings in BlockManager.java.
+ (Brahma Reddy Battula via aajisaka)
+
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d66302ed/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 0b60a97..7dce2a8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -3596,8 +3596,6 @@ public class BlockManager implements BlockStatsMXBean {
String src, BlockInfo[] blocks) {
for (BlockInfo b: blocks) {
if (!b.isComplete()) {
- final BlockInfoUnderConstruction uc =
- (BlockInfoUnderConstruction)b;
final int numNodes = b.numNodes();
final int min = getMinStorageNum(b);
final BlockUCState state = b.getBlockUCState();
[5/6] hadoop git commit: HDFS-2956. calling fetchdt without a
--renewer argument throws NPE (Contributed by Vinayakumar B)HDFS-2956.
calling fetchdt without a --renewer argument throws NPE (Contributed by
Vinayakumar B)
Posted by ji...@apache.org.
HDFS-2956. calling fetchdt without a --renewer argument throws NPE (Contributed by Vinayakumar B)HDFS-2956. calling fetchdt without a --renewer argument throws NPE (Contributed by Vinayakumar B)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b4890803
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b4890803
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b4890803
Branch: refs/heads/YARN-1197
Commit: b48908033fcac7a4bd4313c1fd1457999fba08e1
Parents: d66302e
Author: Vinayakumar B <vi...@apache.org>
Authored: Fri Jul 10 15:47:04 2015 +0530
Committer: Vinayakumar B <vi...@apache.org>
Committed: Fri Jul 10 15:47:04 2015 +0530
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 ++
.../ClientNamenodeProtocolTranslatorPB.java | 2 +-
.../hdfs/tools/TestDelegationTokenFetcher.java | 39 ++++++++++++++++++++
3 files changed, 43 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4890803/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 5c1208d..13b2621 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1029,6 +1029,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8749. Fix findbugs warnings in BlockManager.java.
(Brahma Reddy Battula via aajisaka)
+ HDFS-2956. calling fetchdt without a --renewer argument throws NPE
+ (vinayakumarb)
+
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4890803/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 4ec6f9e..566d54f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -929,7 +929,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
throws IOException {
GetDelegationTokenRequestProto req = GetDelegationTokenRequestProto
.newBuilder()
- .setRenewer(renewer.toString())
+ .setRenewer(renewer == null ? "" : renewer.toString())
.build();
try {
GetDelegationTokenResponseProto resp = rpcProxy.getDelegationToken(null, req);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4890803/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDelegationTokenFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDelegationTokenFetcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDelegationTokenFetcher.java
index ab3933b..80a1a6c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDelegationTokenFetcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDelegationTokenFetcher.java
@@ -18,7 +18,10 @@
package org.apache.hadoop.hdfs.tools;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
@@ -28,12 +31,18 @@ import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.tools.FakeRenewer;
import org.junit.Assert;
import org.junit.Rule;
@@ -105,4 +114,34 @@ public class TestDelegationTokenFetcher {
Assert.assertFalse(p.getFileSystem(conf).exists(p));
}
+
+ @Test
+ public void testDelegationTokenWithoutRenewerViaRPC() throws Exception {
+ conf.setBoolean(DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
+ .build();
+ try {
+ cluster.waitActive();
+ DistributedFileSystem fs = cluster.getFileSystem();
+ // Should be able to fetch token without renewer.
+ LocalFileSystem localFileSystem = FileSystem.getLocal(conf);
+ Path p = new Path(f.getRoot().getAbsolutePath(), tokenFile);
+ p = localFileSystem.makeQualified(p);
+ DelegationTokenFetcher.saveDelegationToken(conf, fs, null, p);
+ Credentials creds = Credentials.readTokenStorageFile(p, conf);
+ Iterator<Token<?>> itr = creds.getAllTokens().iterator();
+ assertTrue("token not exist error", itr.hasNext());
+ assertNotNull("Token should be there without renewer", itr.next());
+ try {
+ // Without renewer renewal of token should fail.
+ DelegationTokenFetcher.renewTokens(conf, p);
+ fail("Should have failed to renew");
+ } catch (AccessControlException e) {
+ GenericTestUtils.assertExceptionContains(
+ "tried to renew a token without a renewer", e);
+ }
+ } finally {
+ cluster.shutdown();
+ }
+ }
}
[2/6] hadoop git commit: HADOOP-12210. Collect network usage on the
node. Contributed by Robert Grandl
Posted by ji...@apache.org.
HADOOP-12210. Collect network usage on the node. Contributed by Robert Grandl
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1a0752d8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1a0752d8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1a0752d8
Branch: refs/heads/YARN-1197
Commit: 1a0752d85a15499d120b4a79af9bd740fcd1f8e0
Parents: 0e602fa
Author: Chris Douglas <cd...@apache.org>
Authored: Mon Jul 6 17:28:20 2015 -0700
Committer: Chris Douglas <cd...@apache.org>
Committed: Thu Jul 9 17:48:43 2015 -0700
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 2 +
.../java/org/apache/hadoop/util/SysInfo.java | 12 +++
.../org/apache/hadoop/util/SysInfoLinux.java | 93 +++++++++++++++++++-
.../org/apache/hadoop/util/SysInfoWindows.java | 15 ++++
.../apache/hadoop/util/TestSysInfoLinux.java | 40 ++++++++-
.../gridmix/DummyResourceCalculatorPlugin.java | 19 ++++
.../yarn/util/ResourceCalculatorPlugin.java | 16 ++++
7 files changed, 195 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a0752d8/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index d9a9eba..3d4f1e4 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -693,6 +693,8 @@ Release 2.8.0 - UNRELEASED
HADOOP-12180. Move ResourceCalculatorPlugin from YARN to Common.
(Chris Douglas via kasha)
+ HADOOP-12210. Collect network usage on the node (Robert Grandl via cdouglas)
+
OPTIMIZATIONS
HADOOP-11785. Reduce the number of listStatus operation in distcp
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a0752d8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java
index ec7fb24..24b339d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java
@@ -108,4 +108,16 @@ public abstract class SysInfo {
*/
public abstract float getCpuUsage();
+ /**
+ * Obtain the aggregated number of bytes read over the network.
+ * @return total number of bytes read.
+ */
+ public abstract long getNetworkBytesRead();
+
+ /**
+ * Obtain the aggregated number of bytes written to the network.
+ * @return total number of bytes written.
+ */
+ public abstract long getNetworkBytesWritten();
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a0752d8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java
index 055298d..8801985 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java
@@ -83,9 +83,22 @@ public class SysInfoLinux extends SysInfo {
"[ \t]*([0-9]*)[ \t]*([0-9]*)[ \t].*");
private CpuTimeTracker cpuTimeTracker;
+ /**
+ * Pattern for parsing /proc/net/dev.
+ */
+ private static final String PROCFS_NETFILE = "/proc/net/dev";
+ private static final Pattern PROCFS_NETFILE_FORMAT =
+ Pattern .compile("^[ \t]*([a-zA-Z]+[0-9]*):" +
+ "[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)" +
+ "[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)" +
+ "[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)" +
+ "[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+).*");
+
+
private String procfsMemFile;
private String procfsCpuFile;
private String procfsStatFile;
+ private String procfsNetFile;
private long jiffyLengthInMillis;
private long ramSize = 0;
@@ -98,6 +111,8 @@ public class SysInfoLinux extends SysInfo {
/* number of physical cores on the system. */
private int numCores = 0;
private long cpuFrequency = 0L; // CPU frequency on the system (kHz)
+ private long numNetBytesRead = 0L; // aggregated bytes read from network
+ private long numNetBytesWritten = 0L; // aggregated bytes written to network
private boolean readMemInfoFile = false;
private boolean readCpuInfoFile = false;
@@ -130,7 +145,7 @@ public class SysInfoLinux extends SysInfo {
public SysInfoLinux() {
this(PROCFS_MEMFILE, PROCFS_CPUINFO, PROCFS_STAT,
- JIFFY_LENGTH_IN_MILLIS);
+ PROCFS_NETFILE, JIFFY_LENGTH_IN_MILLIS);
}
/**
@@ -139,16 +154,19 @@ public class SysInfoLinux extends SysInfo {
* @param procfsMemFile fake file for /proc/meminfo
* @param procfsCpuFile fake file for /proc/cpuinfo
* @param procfsStatFile fake file for /proc/stat
+ * @param procfsNetFile fake file for /proc/net/dev
* @param jiffyLengthInMillis fake jiffy length value
*/
@VisibleForTesting
public SysInfoLinux(String procfsMemFile,
String procfsCpuFile,
String procfsStatFile,
+ String procfsNetFile,
long jiffyLengthInMillis) {
this.procfsMemFile = procfsMemFile;
this.procfsCpuFile = procfsCpuFile;
this.procfsStatFile = procfsStatFile;
+ this.procfsNetFile = procfsNetFile;
this.jiffyLengthInMillis = jiffyLengthInMillis;
this.cpuTimeTracker = new CpuTimeTracker(jiffyLengthInMillis);
}
@@ -338,6 +356,61 @@ public class SysInfoLinux extends SysInfo {
}
}
+ /**
+ * Read /proc/net/dev file, parse and calculate amount
+ * of bytes read and written through the network.
+ */
+ private void readProcNetInfoFile() {
+
+ numNetBytesRead = 0L;
+ numNetBytesWritten = 0L;
+
+ // Read "/proc/net/dev" file
+ BufferedReader in;
+ InputStreamReader fReader;
+ try {
+ fReader = new InputStreamReader(
+ new FileInputStream(procfsNetFile), Charset.forName("UTF-8"));
+ in = new BufferedReader(fReader);
+ } catch (FileNotFoundException f) {
+ return;
+ }
+
+ Matcher mat;
+ try {
+ String str = in.readLine();
+ while (str != null) {
+ mat = PROCFS_NETFILE_FORMAT.matcher(str);
+ if (mat.find()) {
+ assert mat.groupCount() >= 16;
+
+ // ignore loopback interfaces
+ if (mat.group(1).equals("lo")) {
+ str = in.readLine();
+ continue;
+ }
+ numNetBytesRead += Long.parseLong(mat.group(2));
+ numNetBytesWritten += Long.parseLong(mat.group(10));
+ }
+ str = in.readLine();
+ }
+ } catch (IOException io) {
+ LOG.warn("Error reading the stream " + io);
+ } finally {
+ // Close the streams
+ try {
+ fReader.close();
+ try {
+ in.close();
+ } catch (IOException i) {
+ LOG.warn("Error closing the stream " + in);
+ }
+ } catch (IOException i) {
+ LOG.warn("Error closing the stream " + fReader);
+ }
+ }
+ }
+
/** {@inheritDoc} */
@Override
public long getPhysicalMemorySize() {
@@ -405,6 +478,20 @@ public class SysInfoLinux extends SysInfo {
return overallCpuUsage;
}
+ /** {@inheritDoc} */
+ @Override
+ public long getNetworkBytesRead() {
+ readProcNetInfoFile();
+ return numNetBytesRead;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long getNetworkBytesWritten() {
+ readProcNetInfoFile();
+ return numNetBytesWritten;
+ }
+
/**
* Test the {@link SysInfoLinux}.
*
@@ -424,6 +511,10 @@ public class SysInfoLinux extends SysInfo {
System.out.println("CPU frequency (kHz) : " + plugin.getCpuFrequency());
System.out.println("Cumulative CPU time (ms) : " +
plugin.getCumulativeCpuTime());
+ System.out.println("Total network read (bytes) : "
+ + plugin.getNetworkBytesRead());
+ System.out.println("Total network written (bytes) : "
+ + plugin.getNetworkBytesWritten());
try {
// Sleep so we can compute the CPU usage
Thread.sleep(500L);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a0752d8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java
index da4c1c5..f8542a3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java
@@ -178,4 +178,19 @@ public class SysInfoWindows extends SysInfo {
refreshIfNeeded();
return cpuUsage;
}
+
+ /** {@inheritDoc} */
+ @Override
+ public long getNetworkBytesRead() {
+ // TODO unimplemented
+ return 0L;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long getNetworkBytesWritten() {
+ // TODO unimplemented
+ return 0L;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a0752d8/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoLinux.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoLinux.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoLinux.java
index 73edc77..2a31f31 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoLinux.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoLinux.java
@@ -44,8 +44,10 @@ public class TestSysInfoLinux {
public FakeLinuxResourceCalculatorPlugin(String procfsMemFile,
String procfsCpuFile,
String procfsStatFile,
+ String procfsNetFile,
long jiffyLengthInMillis) {
- super(procfsMemFile, procfsCpuFile, procfsStatFile, jiffyLengthInMillis);
+ super(procfsMemFile, procfsCpuFile, procfsStatFile, procfsNetFile,
+ jiffyLengthInMillis);
}
@Override
long getCurrentTime() {
@@ -61,14 +63,17 @@ public class TestSysInfoLinux {
private static final String FAKE_MEMFILE;
private static final String FAKE_CPUFILE;
private static final String FAKE_STATFILE;
+ private static final String FAKE_NETFILE;
private static final long FAKE_JIFFY_LENGTH = 10L;
static {
int randomNum = (new Random()).nextInt(1000000000);
FAKE_MEMFILE = TEST_ROOT_DIR + File.separator + "MEMINFO_" + randomNum;
FAKE_CPUFILE = TEST_ROOT_DIR + File.separator + "CPUINFO_" + randomNum;
FAKE_STATFILE = TEST_ROOT_DIR + File.separator + "STATINFO_" + randomNum;
+ FAKE_NETFILE = TEST_ROOT_DIR + File.separator + "NETINFO_" + randomNum;
plugin = new FakeLinuxResourceCalculatorPlugin(FAKE_MEMFILE, FAKE_CPUFILE,
FAKE_STATFILE,
+ FAKE_NETFILE,
FAKE_JIFFY_LENGTH);
}
static final String MEMINFO_FORMAT =
@@ -141,6 +146,17 @@ public class TestSysInfoLinux {
"procs_running 1\n" +
"procs_blocked 0\n";
+ static final String NETINFO_FORMAT =
+ "Inter-| Receive | Transmit\n"+
+ "face |bytes packets errs drop fifo frame compressed multicast|bytes packets"+
+ "errs drop fifo colls carrier compressed\n"+
+ " lo: 42236310 563003 0 0 0 0 0 0 42236310 563003 " +
+ "0 0 0 0 0 0\n"+
+ " eth0: %d 3452527 0 0 0 0 0 299787 %d 1866280 0 0 " +
+ "0 0 0 0\n"+
+ " eth1: %d 3152521 0 0 0 0 0 219781 %d 1866290 0 0 " +
+ "0 0 0 0\n";
+
/**
* Test parsing /proc/stat and /proc/cpuinfo
* @throws IOException
@@ -320,4 +336,26 @@ public class TestSysInfoLinux {
IOUtils.closeQuietly(fWriter);
}
}
+
+ /**
+ * Test parsing /proc/net/dev
+ * @throws IOException
+ */
+ @Test
+ public void parsingProcNetFile() throws IOException {
+ long numBytesReadIntf1 = 2097172468L;
+ long numBytesWrittenIntf1 = 1355620114L;
+ long numBytesReadIntf2 = 1097172460L;
+ long numBytesWrittenIntf2 = 1055620110L;
+ File tempFile = new File(FAKE_NETFILE);
+ tempFile.deleteOnExit();
+ FileWriter fWriter = new FileWriter(FAKE_NETFILE);
+ fWriter.write(String.format(NETINFO_FORMAT,
+ numBytesReadIntf1, numBytesWrittenIntf1,
+ numBytesReadIntf2, numBytesWrittenIntf2));
+ fWriter.close();
+ assertEquals(plugin.getNetworkBytesRead(), numBytesReadIntf1 + numBytesReadIntf2);
+ assertEquals(plugin.getNetworkBytesWritten(), numBytesWrittenIntf1 + numBytesWrittenIntf2);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a0752d8/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DummyResourceCalculatorPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DummyResourceCalculatorPlugin.java b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DummyResourceCalculatorPlugin.java
index fd4cb83..b86303b 100644
--- a/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DummyResourceCalculatorPlugin.java
+++ b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DummyResourceCalculatorPlugin.java
@@ -48,6 +48,12 @@ public class DummyResourceCalculatorPlugin extends ResourceCalculatorPlugin {
"mapred.tasktracker.cumulativecputime.testing";
/** CPU usage percentage for testing */
public static final String CPU_USAGE = "mapred.tasktracker.cpuusage.testing";
+ /** cumulative number of bytes read over the network */
+ public static final String NETWORK_BYTES_READ =
+ "mapred.tasktracker.networkread.testing";
+ /** cumulative number of bytes written over the network */
+ public static final String NETWORK_BYTES_WRITTEN =
+ "mapred.tasktracker.networkwritten.testing";
/** process cumulative CPU usage time for testing */
public static final String PROC_CUMULATIVE_CPU_TIME =
"mapred.tasktracker.proccumulativecputime.testing";
@@ -111,4 +117,17 @@ public class DummyResourceCalculatorPlugin extends ResourceCalculatorPlugin {
public float getCpuUsage() {
return getConf().getFloat(CPU_USAGE, -1);
}
+
+ /** {@inheritDoc} */
+ @Override
+ public long getNetworkBytesRead() {
+ return getConf().getLong(NETWORK_BYTES_READ, -1);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long getNetworkBytesWritten() {
+ return getConf().getLong(NETWORK_BYTES_WRITTEN, -1);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a0752d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java
index 5e5f1b4..21724a9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java
@@ -124,6 +124,22 @@ public class ResourceCalculatorPlugin extends Configured {
return sys.getCpuUsage();
}
+ /**
+ * Obtain the aggregated number of bytes read over the network.
+ * @return total number of bytes read.
+ */
+ public long getNetworkBytesRead() {
+ return sys.getNetworkBytesRead();
+ }
+
+ /**
+ * Obtain the aggregated number of bytes written to the network.
+ * @return total number of bytes written.
+ */
+ public long getNetworkBytesWritten() {
+ return sys.getNetworkBytesWritten();
+ }
+
/**
* Create the ResourceCalculatorPlugin from the class name and configure it. If
* class name is null, this method will try and return a memory calculator
[6/6] hadoop git commit: YARN-3445. Cache runningApps in RMNode for
getting running apps on given NodeId. (Junping Du via mingma)
Posted by ji...@apache.org.
YARN-3445. Cache runningApps in RMNode for getting running apps on given NodeId. (Junping Du via mingma)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/08244264
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/08244264
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/08244264
Branch: refs/heads/YARN-1197
Commit: 08244264c0583472b9c4e16591cfde72c6db62a2
Parents: b489080
Author: Ming Ma <mi...@apache.org>
Authored: Fri Jul 10 08:30:10 2015 -0700
Committer: Ming Ma <mi...@apache.org>
Committed: Fri Jul 10 08:30:10 2015 -0700
----------------------------------------------------------------------
.../hadoop/yarn/sls/nodemanager/NodeInfo.java | 8 +++-
.../yarn/sls/scheduler/RMNodeWrapper.java | 5 +++
hadoop-yarn-project/CHANGES.txt | 3 ++
.../server/resourcemanager/rmnode/RMNode.java | 2 +
.../resourcemanager/rmnode/RMNodeImpl.java | 43 ++++++++++++++++----
.../yarn/server/resourcemanager/MockNodes.java | 5 +++
.../resourcemanager/TestRMNodeTransitions.java | 36 ++++++++++++++--
7 files changed, 91 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index ee6eb7b..440779c 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -62,7 +62,8 @@ public class NodeInfo {
private NodeState state;
private List<ContainerId> toCleanUpContainers;
private List<ApplicationId> toCleanUpApplications;
-
+ private List<ApplicationId> runningApplications;
+
public FakeRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
Resource perNode, String rackName, String healthReport,
int cmdPort, String hostName, NodeState state) {
@@ -77,6 +78,7 @@ public class NodeInfo {
this.state = state;
toCleanUpApplications = new ArrayList<ApplicationId>();
toCleanUpContainers = new ArrayList<ContainerId>();
+ runningApplications = new ArrayList<ApplicationId>();
}
public NodeId getNodeID() {
@@ -135,6 +137,10 @@ public class NodeInfo {
return toCleanUpApplications;
}
+ public List<ApplicationId> getRunningApps() {
+ return runningApplications;
+ }
+
public void updateNodeHeartbeatResponseForCleanup(
NodeHeartbeatResponse response) {
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index b64be1b..a6633ae 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -119,6 +119,11 @@ public class RMNodeWrapper implements RMNode {
}
@Override
+ public List<ApplicationId> getRunningApps() {
+ return node.getRunningApps();
+ }
+
+ @Override
public void updateNodeHeartbeatResponseForCleanup(
NodeHeartbeatResponse nodeHeartbeatResponse) {
node.updateNodeHeartbeatResponseForCleanup(nodeHeartbeatResponse);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 2a9ff98..db000d7 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -1678,6 +1678,9 @@ Release 2.6.0 - 2014-11-18
YARN-2811. In Fair Scheduler, reservation fulfillments shouldn't ignore max
share (Siqi Li via Sandy Ryza)
+ YARN-3445. Cache runningApps in RMNode for getting running apps on given
+ NodeId. (Junping Du via mingma)
+
IMPROVEMENTS
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc
http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index 95eeaf6..0386be6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -119,6 +119,8 @@ public interface RMNode {
public List<ApplicationId> getAppsToCleanup();
+ List<ApplicationId> getRunningApps();
+
/**
* Update a {@link NodeHeartbeatResponse} with the list of containers and
* applications to clean up for this node.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index d1e6190..9bc91c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -123,11 +123,16 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
new HashSet<ContainerId>();
/* the list of applications that have finished and need to be purged */
- private final List<ApplicationId> finishedApplications = new ArrayList<ApplicationId>();
+ private final List<ApplicationId> finishedApplications =
+ new ArrayList<ApplicationId>();
+
+ /* the list of applications that are running on this node */
+ private final List<ApplicationId> runningApplications =
+ new ArrayList<ApplicationId>();
private NodeHeartbeatResponse latestNodeHeartBeatResponse = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
-
+
private static final StateMachineFactory<RMNodeImpl,
NodeState,
RMNodeEventType,
@@ -136,7 +141,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
NodeState,
RMNodeEventType,
RMNodeEvent>(NodeState.NEW)
-
+
//Transitions from NEW state
.addTransition(NodeState.NEW, NodeState.RUNNING,
RMNodeEventType.STARTED, new AddNodeTransition())
@@ -383,6 +388,16 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
}
@Override
+ public List<ApplicationId> getRunningApps() {
+ this.readLock.lock();
+ try {
+ return new ArrayList<ApplicationId>(this.runningApplications);
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
public List<ContainerId> getContainersToCleanUp() {
this.readLock.lock();
@@ -519,9 +534,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
LOG.warn("Cannot get RMApp by appId=" + appId
+ ", just added it to finishedApplications list for cleanup");
rmNode.finishedApplications.add(appId);
+ rmNode.runningApplications.remove(appId);
return;
}
+ // Add running applications back due to Node add or Node reconnection.
+ rmNode.runningApplications.add(appId);
context.getDispatcher().getEventHandler()
.handle(new RMAppRunningOnNodeEvent(appId, nodeId));
}
@@ -707,8 +725,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
- rmNode.finishedApplications.add(((
- RMNodeCleanAppEvent) event).getAppId());
+ ApplicationId appId = ((RMNodeCleanAppEvent) event).getAppId();
+ rmNode.finishedApplications.add(appId);
+ rmNode.runningApplications.remove(appId);
}
}
@@ -910,12 +929,22 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
+ "cleanup, no further processing");
continue;
}
- if (finishedApplications.contains(containerId.getApplicationAttemptId()
- .getApplicationId())) {
+
+ ApplicationId containerAppId =
+ containerId.getApplicationAttemptId().getApplicationId();
+
+ if (finishedApplications.contains(containerAppId)) {
LOG.info("Container " + containerId
+ " belongs to an application that is already killed,"
+ " no further processing");
continue;
+ } else if (!runningApplications.contains(containerAppId)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Container " + containerId
+ + " is the first container get launched for application "
+ + containerAppId);
+ }
+ runningApplications.add(containerAppId);
}
// Process running containers
http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 2d863d1..095fe28 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -187,6 +187,11 @@ public class MockNodes {
}
@Override
+ public List<ApplicationId> getRunningApps() {
+ return null;
+ }
+
+ @Override
public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) {
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index 01f4357..ece896b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -33,6 +33,7 @@ import java.util.List;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
@@ -485,9 +486,9 @@ public class TestRMNodeTransitions {
NodeId nodeId = node.getNodeID();
// Expire a container
- ContainerId completedContainerId = BuilderUtils.newContainerId(
- BuilderUtils.newApplicationAttemptId(
- BuilderUtils.newApplicationId(0, 0), 0), 0);
+ ContainerId completedContainerId = BuilderUtils.newContainerId(
+ BuilderUtils.newApplicationAttemptId(
+ BuilderUtils.newApplicationId(0, 0), 0), 0);
node.handle(new RMNodeCleanContainerEvent(nodeId, completedContainerId));
Assert.assertEquals(1, node.getContainersToCleanUp().size());
@@ -512,6 +513,35 @@ public class TestRMNodeTransitions {
Assert.assertEquals(finishedAppId, hbrsp.getApplicationsToCleanup().get(0));
}
+ @Test(timeout=20000)
+ public void testUpdateHeartbeatResponseForAppLifeCycle() {
+ RMNodeImpl node = getRunningNode();
+ NodeId nodeId = node.getNodeID();
+
+ ApplicationId runningAppId = BuilderUtils.newApplicationId(0, 1);
+ // Create a running container
+ ContainerId runningContainerId = BuilderUtils.newContainerId(
+ BuilderUtils.newApplicationAttemptId(
+ runningAppId, 0), 0);
+
+ ContainerStatus status = ContainerStatus.newInstance(runningContainerId,
+ ContainerState.RUNNING, "", 0);
+ List<ContainerStatus> statusList = new ArrayList<ContainerStatus>();
+ statusList.add(status);
+ NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(true,
+ "", System.currentTimeMillis());
+ node.handle(new RMNodeStatusEvent(nodeId, nodeHealth,
+ statusList, null, null));
+
+ Assert.assertEquals(1, node.getRunningApps().size());
+
+ // Finish an application
+ ApplicationId finishedAppId = runningAppId;
+ node.handle(new RMNodeCleanAppEvent(nodeId, finishedAppId));
+ Assert.assertEquals(1, node.getAppsToCleanup().size());
+ Assert.assertEquals(0, node.getRunningApps().size());
+ }
+
private RMNodeImpl getRunningNode() {
return getRunningNode(null, 0);
}