You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/07/10 19:16:41 UTC

[1/6] hadoop git commit: YARN-3800. Reduce storage footprint for ReservationAllocation. Contributed by Anubhav Dhoot.

Repository: hadoop
Updated Branches:
  refs/heads/YARN-1197 f4ca530c1 -> 08244264c


YARN-3800. Reduce storage footprint for ReservationAllocation. Contributed by Anubhav Dhoot.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0e602fa3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0e602fa3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0e602fa3

Branch: refs/heads/YARN-1197
Commit: 0e602fa3a1529134214452fba10a90307d9c2072
Parents: f4ca530
Author: carlo curino <Carlo Curino>
Authored: Thu Jul 9 16:47:35 2015 -0700
Committer: carlo curino <Carlo Curino>
Committed: Thu Jul 9 16:51:59 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 ++
 .../reservation/GreedyReservationAgent.java     | 27 ++++++-----
 .../reservation/InMemoryPlan.java               |  9 ++--
 .../InMemoryReservationAllocation.java          | 24 +++++----
 .../RLESparseResourceAllocation.java            | 43 ++---------------
 .../reservation/ReservationAllocation.java      |  3 +-
 .../reservation/ReservationSystemUtil.java      | 51 ++++++++++++++++++++
 .../reservation/ReservationSystemTestUtil.java  | 11 +++--
 .../reservation/TestCapacityOverTimePolicy.java | 16 +++---
 .../reservation/TestGreedyReservationAgent.java |  2 +-
 .../reservation/TestInMemoryPlan.java           | 37 ++++++++++----
 .../TestInMemoryReservationAllocation.java      | 29 ++++++-----
 .../TestRLESparseResourceAllocation.java        | 33 ++++++-------
 .../TestSimpleCapacityReplanner.java            | 11 +++--
 14 files changed, 176 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 3c232eb..89b5e9f 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -323,6 +323,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3827. Migrate YARN native build to new CMake framework (Alan Burlison
     via Colin P. McCabe)
 
+    YARN-3800. Reduce storage footprint for ReservationAllocation. (Anubhav Dhoot
+    via curino)
+
   OPTIMIZATIONS
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java
index 5a61b94..214df1c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java
@@ -97,8 +97,8 @@ public class GreedyReservationAgent implements ReservationAgent {
     long curDeadline = deadline;
     long oldDeadline = -1;
 
-    Map<ReservationInterval, ReservationRequest> allocations =
-        new HashMap<ReservationInterval, ReservationRequest>();
+    Map<ReservationInterval, Resource> allocations =
+        new HashMap<ReservationInterval, Resource>();
     RLESparseResourceAllocation tempAssigned =
         new RLESparseResourceAllocation(plan.getResourceCalculator(),
             plan.getMinimumAllocation());
@@ -108,6 +108,8 @@ public class GreedyReservationAgent implements ReservationAgent {
     ReservationRequestInterpreter type = contract.getReservationRequests()
         .getInterpreter();
 
+    boolean hasGang = false;
+
     // Iterate the stages in backward from deadline
     for (ListIterator<ReservationRequest> li = 
         stages.listIterator(stages.size()); li.hasPrevious();) {
@@ -117,8 +119,10 @@ public class GreedyReservationAgent implements ReservationAgent {
       // validate the RR respect basic constraints
       validateInput(plan, currentReservationStage, totalCapacity);
 
+      hasGang |= currentReservationStage.getConcurrency() > 1;
+
       // run allocation for a single stage
-      Map<ReservationInterval, ReservationRequest> curAlloc =
+      Map<ReservationInterval, Resource> curAlloc =
           placeSingleStage(plan, tempAssigned, currentReservationStage,
               earliestStart, curDeadline, oldReservation, totalCapacity);
 
@@ -178,8 +182,7 @@ public class GreedyReservationAgent implements ReservationAgent {
 
     // create reservation with above allocations if not null/empty
 
-    ReservationRequest ZERO_RES =
-        ReservationRequest.newInstance(Resource.newInstance(0, 0), 0);
+    Resource ZERO_RES = Resource.newInstance(0, 0);
 
     long firstStartTime = findEarliestTime(allocations.keySet());
     
@@ -200,7 +203,7 @@ public class GreedyReservationAgent implements ReservationAgent {
         new InMemoryReservationAllocation(reservationId, contract, user,
             plan.getQueueName(), firstStartTime,
             findLatestTime(allocations.keySet()), allocations,
-            plan.getResourceCalculator(), plan.getMinimumAllocation());
+            plan.getResourceCalculator(), plan.getMinimumAllocation(), hasGang);
     if (oldReservation != null) {
       return plan.updateReservation(capReservation);
     } else {
@@ -242,13 +245,13 @@ public class GreedyReservationAgent implements ReservationAgent {
    * previous instant in time until the time-window is exhausted or we placed
    * all the user request.
    */
-  private Map<ReservationInterval, ReservationRequest> placeSingleStage(
+  private Map<ReservationInterval, Resource> placeSingleStage(
       Plan plan, RLESparseResourceAllocation tempAssigned,
       ReservationRequest rr, long earliestStart, long curDeadline,
       ReservationAllocation oldResAllocation, final Resource totalCapacity) {
 
-    Map<ReservationInterval, ReservationRequest> allocationRequests =
-        new HashMap<ReservationInterval, ReservationRequest>();
+    Map<ReservationInterval, Resource> allocationRequests =
+        new HashMap<ReservationInterval, Resource>();
 
     // compute the gang as a resource and get the duration
     Resource gang = Resources.multiply(rr.getCapability(), rr.getConcurrency());
@@ -322,7 +325,7 @@ public class GreedyReservationAgent implements ReservationAgent {
 
         ReservationInterval reservationInt =
             new ReservationInterval(curDeadline - dur, curDeadline);
-        ReservationRequest reservationRes =
+        ReservationRequest reservationRequest =
             ReservationRequest.newInstance(rr.getCapability(),
                 rr.getConcurrency() * maxGang, rr.getConcurrency(),
                 rr.getDuration());
@@ -331,6 +334,8 @@ public class GreedyReservationAgent implements ReservationAgent {
         // placing other ReservationRequest within the same
         // ReservationDefinition,
         // and we must avoid double-counting the available resources
+        final Resource reservationRes = ReservationSystemUtil.toResource(
+            reservationRequest);
         tempAssigned.addInterval(reservationInt, reservationRes);
         allocationRequests.put(reservationInt, reservationRes);
 
@@ -350,7 +355,7 @@ public class GreedyReservationAgent implements ReservationAgent {
       // If we are here is becasue we did not manage to satisfy this request.
       // So we need to remove unwanted side-effect from tempAssigned (needed
       // for ANY).
-      for (Map.Entry<ReservationInterval, ReservationRequest> tempAllocation :
+      for (Map.Entry<ReservationInterval, Resource> tempAllocation :
         allocationRequests.entrySet()) {
         tempAssigned.removeInterval(tempAllocation.getKey(),
             tempAllocation.getValue());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
index ce2e7d7..50d66cf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
@@ -31,7 +31,6 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.api.records.ReservationRequest;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@@ -110,7 +109,7 @@ class InMemoryPlan implements Plan {
 
   private void incrementAllocation(ReservationAllocation reservation) {
     assert (readWriteLock.isWriteLockedByCurrentThread());
-    Map<ReservationInterval, ReservationRequest> allocationRequests =
+    Map<ReservationInterval, Resource> allocationRequests =
         reservation.getAllocationRequests();
     // check if we have encountered the user earlier and if not add an entry
     String user = reservation.getUser();
@@ -119,7 +118,7 @@ class InMemoryPlan implements Plan {
       resAlloc = new RLESparseResourceAllocation(resCalc, minAlloc);
       userResourceAlloc.put(user, resAlloc);
     }
-    for (Map.Entry<ReservationInterval, ReservationRequest> r : allocationRequests
+    for (Map.Entry<ReservationInterval, Resource> r : allocationRequests
         .entrySet()) {
       resAlloc.addInterval(r.getKey(), r.getValue());
       rleSparseVector.addInterval(r.getKey(), r.getValue());
@@ -128,11 +127,11 @@ class InMemoryPlan implements Plan {
 
   private void decrementAllocation(ReservationAllocation reservation) {
     assert (readWriteLock.isWriteLockedByCurrentThread());
-    Map<ReservationInterval, ReservationRequest> allocationRequests =
+    Map<ReservationInterval, Resource> allocationRequests =
         reservation.getAllocationRequests();
     String user = reservation.getUser();
     RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user);
-    for (Map.Entry<ReservationInterval, ReservationRequest> r : allocationRequests
+    for (Map.Entry<ReservationInterval, Resource> r : allocationRequests
         .entrySet()) {
       resAlloc.removeInterval(r.getKey(), r.getValue());
       rleSparseVector.removeInterval(r.getKey(), r.getValue());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
index fc8407b..a4dd23b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
@@ -22,7 +22,6 @@ import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.api.records.ReservationRequest;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -40,7 +39,7 @@ class InMemoryReservationAllocation implements ReservationAllocation {
   private final ReservationDefinition contract;
   private final long startTime;
   private final long endTime;
-  private final Map<ReservationInterval, ReservationRequest> allocationRequests;
+  private final Map<ReservationInterval, Resource> allocationRequests;
   private boolean hasGang = false;
   private long acceptedAt = -1;
 
@@ -49,22 +48,29 @@ class InMemoryReservationAllocation implements ReservationAllocation {
   InMemoryReservationAllocation(ReservationId reservationID,
       ReservationDefinition contract, String user, String planName,
       long startTime, long endTime,
-      Map<ReservationInterval, ReservationRequest> allocationRequests,
+      Map<ReservationInterval, Resource> allocations,
       ResourceCalculator calculator, Resource minAlloc) {
+    this(reservationID, contract, user, planName, startTime, endTime,
+        allocations, calculator, minAlloc, false);
+  }
+
+  InMemoryReservationAllocation(ReservationId reservationID,
+      ReservationDefinition contract, String user, String planName,
+      long startTime, long endTime,
+      Map<ReservationInterval, Resource> allocations,
+      ResourceCalculator calculator, Resource minAlloc, boolean hasGang) {
     this.contract = contract;
     this.startTime = startTime;
     this.endTime = endTime;
     this.reservationID = reservationID;
     this.user = user;
-    this.allocationRequests = allocationRequests;
+    this.allocationRequests = allocations;
     this.planName = planName;
+    this.hasGang = hasGang;
     resourcesOverTime = new RLESparseResourceAllocation(calculator, minAlloc);
-    for (Map.Entry<ReservationInterval, ReservationRequest> r : allocationRequests
+    for (Map.Entry<ReservationInterval, Resource> r : allocations
         .entrySet()) {
       resourcesOverTime.addInterval(r.getKey(), r.getValue());
-      if (r.getValue().getConcurrency() > 1) {
-        hasGang = true;
-      }
     }
   }
 
@@ -89,7 +95,7 @@ class InMemoryReservationAllocation implements ReservationAllocation {
   }
 
   @Override
-  public Map<ReservationInterval, ReservationRequest> getAllocationRequests() {
+  public Map<ReservationInterval, Resource> getAllocationRequests() {
     return Collections.unmodifiableMap(allocationRequests);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
index 3f6f405..2957cc6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableMap;
@@ -31,9 +30,7 @@ import java.util.TreeMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.apache.hadoop.yarn.api.records.ReservationRequest;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -80,14 +77,11 @@ public class RLESparseResourceAllocation {
    * 
    * @param reservationInterval the interval for which the resource is to be
    *          added
-   * @param capacity the resource to be added
+   * @param totCap the resource to be added
    * @return true if addition is successful, false otherwise
    */
   public boolean addInterval(ReservationInterval reservationInterval,
-      ReservationRequest capacity) {
-    Resource totCap =
-        Resources.multiply(capacity.getCapability(),
-            (float) capacity.getNumContainers());
+      Resource totCap) {
     if (totCap.equals(ZERO_RESOURCE)) {
       return true;
     }
@@ -143,44 +137,15 @@ public class RLESparseResourceAllocation {
   }
 
   /**
-   * Add multiple resources for the specified interval
-   * 
-   * @param reservationInterval the interval for which the resource is to be
-   *          added
-   * @param ReservationRequests the resources to be added
-   * @param clusterResource the total resources in the cluster
-   * @return true if addition is successful, false otherwise
-   */
-  public boolean addCompositeInterval(ReservationInterval reservationInterval,
-      List<ReservationRequest> ReservationRequests, Resource clusterResource) {
-    ReservationRequest aggregateReservationRequest =
-        Records.newRecord(ReservationRequest.class);
-    Resource capacity = Resource.newInstance(0, 0);
-    for (ReservationRequest ReservationRequest : ReservationRequests) {
-      Resources.addTo(capacity, Resources.multiply(
-          ReservationRequest.getCapability(),
-          ReservationRequest.getNumContainers()));
-    }
-    aggregateReservationRequest.setNumContainers((int) Math.ceil(Resources
-        .divide(resourceCalculator, clusterResource, capacity, minAlloc)));
-    aggregateReservationRequest.setCapability(minAlloc);
-
-    return addInterval(reservationInterval, aggregateReservationRequest);
-  }
-
-  /**
    * Removes a resource for the specified interval
    * 
    * @param reservationInterval the interval for which the resource is to be
    *          removed
-   * @param capacity the resource to be removed
+   * @param totCap the resource to be removed
    * @return true if removal is successful, false otherwise
    */
   public boolean removeInterval(ReservationInterval reservationInterval,
-      ReservationRequest capacity) {
-    Resource totCap =
-        Resources.multiply(capacity.getCapability(),
-            (float) capacity.getNumContainers());
+      Resource totCap) {
     if (totCap.equals(ZERO_RESOURCE)) {
       return true;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
index 89c0e55..0d3c692 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
@@ -22,7 +22,6 @@ import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.api.records.ReservationRequest;
 import org.apache.hadoop.yarn.api.records.Resource;
 
 /**
@@ -71,7 +70,7 @@ public interface ReservationAllocation extends
    * @return the allocationRequests the map of resources requested against the
    *         time interval for which they were
    */
-  public Map<ReservationInterval, ReservationRequest> getAllocationRequests();
+  public Map<ReservationInterval, Resource> getAllocationRequests();
 
   /**
    * Return a string identifying the plan to which the reservation belongs

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java
new file mode 100644
index 0000000..8affae4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.util.HashMap;
+import java.util.Map;
+
+final class ReservationSystemUtil {
+
+  private ReservationSystemUtil() {
+    // not called
+  }
+
+  public static Resource toResource(ReservationRequest request) {
+    Resource resource = Resources.multiply(request.getCapability(),
+        (float) request.getNumContainers());
+    return resource;
+  }
+
+  public static Map<ReservationInterval, Resource> toResources(
+      Map<ReservationInterval, ReservationRequest> allocations) {
+    Map<ReservationInterval, Resource> resources =
+        new HashMap<ReservationInterval, Resource>();
+    for (Map.Entry<ReservationInterval, ReservationRequest> entry :
+        allocations.entrySet()) {
+      resources.put(entry.getKey(),
+          toResource(entry.getValue()));
+    }
+    return resources;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
index bfaf06b..be1d69a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
@@ -378,14 +378,15 @@ public class ReservationSystemTestUtil {
     return rr;
   }
 
-  public static Map<ReservationInterval, ReservationRequest> generateAllocation(
+  public static Map<ReservationInterval, Resource> generateAllocation(
       long startTime, long step, int[] alloc) {
-    Map<ReservationInterval, ReservationRequest> req =
-        new TreeMap<ReservationInterval, ReservationRequest>();
+    Map<ReservationInterval, Resource> req =
+        new TreeMap<ReservationInterval, Resource>();
     for (int i = 0; i < alloc.length; i++) {
       req.put(new ReservationInterval(startTime + i * step, startTime + (i + 1)
-          * step), ReservationRequest.newInstance(
-          Resource.newInstance(1024, 1), alloc[i]));
+          * step), ReservationSystemUtil.toResource(ReservationRequest
+          .newInstance(
+          Resource.newInstance(1024, 1), alloc[i])));
     }
     return req;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
index 61561e9..19f876d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.Map;
@@ -198,12 +197,14 @@ public class TestCapacityOverTimePolicy {
   @Test(expected = PlanningQuotaException.class)
   public void testFailAvg() throws IOException, PlanningException {
     // generate an allocation which violates the 25% average single-shot
-    Map<ReservationInterval, ReservationRequest> req =
-        new TreeMap<ReservationInterval, ReservationRequest>();
+    Map<ReservationInterval, Resource> req =
+        new TreeMap<ReservationInterval, Resource>();
     long win = timeWindow / 2 + 100;
     int cont = (int) Math.ceil(0.5 * totCont);
     req.put(new ReservationInterval(initTime, initTime + win),
-        ReservationRequest.newInstance(Resource.newInstance(1024, 1), cont));
+        ReservationSystemUtil.toResource(
+            ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+                cont)));
 
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(
@@ -214,12 +215,13 @@ public class TestCapacityOverTimePolicy {
   @Test
   public void testFailAvgBySum() throws IOException, PlanningException {
     // generate an allocation which violates the 25% average by sum
-    Map<ReservationInterval, ReservationRequest> req =
-        new TreeMap<ReservationInterval, ReservationRequest>();
+    Map<ReservationInterval, Resource> req =
+        new TreeMap<ReservationInterval, Resource>();
     long win = 86400000 / 4 + 1;
     int cont = (int) Math.ceil(0.5 * totCont);
     req.put(new ReservationInterval(initTime, initTime + win),
-        ReservationRequest.newInstance(Resource.newInstance(1024, 1), cont));
+        ReservationSystemUtil.toResource(ReservationRequest.newInstance(Resource
+            .newInstance(1024, 1), cont)));
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(
             ReservationSystemTestUtil.getNewReservationId(), null, "u1",

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java
index b8cf6c5..de94dcd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java
@@ -516,7 +516,7 @@ public class TestGreedyReservationAgent {
                 .generateAllocation(0, step, f), res, minAlloc)));
 
     int[] f2 = { 5, 5, 5, 5, 5, 5, 5 };
-    Map<ReservationInterval, ReservationRequest> alloc = 
+    Map<ReservationInterval, Resource> alloc =
         ReservationSystemTestUtil.generateAllocation(5000, step, f2);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
index 91c1962..722fb29 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
@@ -100,9 +100,11 @@ public class TestInMemoryPlan {
     ReservationDefinition rDef =
         createSimpleReservationDefinition(start, start + alloc.length,
             alloc.length, allocations.values());
+    Map<ReservationInterval, Resource> allocs =
+        ReservationSystemUtil.toResources(allocations);
     ReservationAllocation rAllocation =
         new InMemoryReservationAllocation(reservationID, rDef, user, planName,
-            start, start + alloc.length, allocations, resCalc, minAlloc);
+            start, start + alloc.length, allocs, resCalc, minAlloc);
     Assert.assertNull(plan.getReservationById(reservationID));
     try {
       plan.addReservation(rAllocation);
@@ -132,9 +134,11 @@ public class TestInMemoryPlan {
     ReservationDefinition rDef =
         createSimpleReservationDefinition(start, start + alloc.length,
             alloc.length, allocations.values());
+    Map<ReservationInterval, Resource> allocs = ReservationSystemUtil.toResources
+        (allocations);
     ReservationAllocation rAllocation =
         new InMemoryReservationAllocation(reservationID, rDef, user, planName,
-            start, start + alloc.length, allocations, resCalc, minAlloc);
+            start, start + alloc.length, allocs, resCalc, minAlloc);
     Assert.assertNull(plan.getReservationById(reservationID));
     try {
       plan.addReservation(rAllocation);
@@ -158,9 +162,11 @@ public class TestInMemoryPlan {
     ReservationDefinition rDef =
         createSimpleReservationDefinition(start, start + alloc.length,
             alloc.length, allocations.values());
+    Map<ReservationInterval, Resource> allocs = ReservationSystemUtil.toResources
+        (allocations);
     ReservationAllocation rAllocation =
         new InMemoryReservationAllocation(reservationID, rDef, user, planName,
-            start, start + alloc.length, allocations, resCalc, minAlloc);
+            start, start + alloc.length, allocs, resCalc, minAlloc);
     Assert.assertNull(plan.getReservationById(reservationID));
     try {
       plan.addReservation(rAllocation);
@@ -202,9 +208,11 @@ public class TestInMemoryPlan {
     ReservationDefinition rDef =
         createSimpleReservationDefinition(start, start + alloc.length,
             alloc.length, allocations.values());
+    Map<ReservationInterval, Resource> allocs = ReservationSystemUtil.toResources
+        (allocations);
     ReservationAllocation rAllocation =
         new InMemoryReservationAllocation(reservationID, rDef, user, planName,
-            start, start + alloc.length, allocations, resCalc, minAlloc);
+            start, start + alloc.length, allocs, resCalc, minAlloc);
     Assert.assertNull(plan.getReservationById(reservationID));
     try {
       plan.addReservation(rAllocation);
@@ -226,9 +234,12 @@ public class TestInMemoryPlan {
     rDef =
         createSimpleReservationDefinition(start, start + updatedAlloc.length,
             updatedAlloc.length, allocations.values());
+    Map<ReservationInterval, Resource> updatedAllocs =
+        ReservationSystemUtil.toResources(allocations);
     rAllocation =
         new InMemoryReservationAllocation(reservationID, rDef, user, planName,
-            start, start + updatedAlloc.length, allocations, resCalc, minAlloc);
+            start, start + updatedAlloc.length, updatedAllocs, resCalc,
+            minAlloc);
     try {
       plan.updateReservation(rAllocation);
     } catch (PlanningException e) {
@@ -260,9 +271,11 @@ public class TestInMemoryPlan {
     ReservationDefinition rDef =
         createSimpleReservationDefinition(start, start + alloc.length,
             alloc.length, allocations.values());
+    Map<ReservationInterval, Resource> allocs =
+        ReservationSystemUtil.toResources(allocations);
     ReservationAllocation rAllocation =
         new InMemoryReservationAllocation(reservationID, rDef, user, planName,
-            start, start + alloc.length, allocations, resCalc, minAlloc);
+            start, start + alloc.length, allocs, resCalc, minAlloc);
     Assert.assertNull(plan.getReservationById(reservationID));
     try {
       plan.updateReservation(rAllocation);
@@ -290,9 +303,11 @@ public class TestInMemoryPlan {
     ReservationDefinition rDef =
         createSimpleReservationDefinition(start, start + alloc.length,
             alloc.length, allocations.values());
+    Map<ReservationInterval, Resource> allocs =
+        ReservationSystemUtil.toResources(allocations);
     ReservationAllocation rAllocation =
         new InMemoryReservationAllocation(reservationID, rDef, user, planName,
-            start, start + alloc.length, allocations, resCalc, minAlloc);
+            start, start + alloc.length, allocs, resCalc, minAlloc);
     Assert.assertNull(plan.getReservationById(reservationID));
     try {
       plan.addReservation(rAllocation);
@@ -359,9 +374,11 @@ public class TestInMemoryPlan {
     ReservationDefinition rDef1 =
         createSimpleReservationDefinition(start, start + alloc1.length,
             alloc1.length, allocations1.values());
+    Map<ReservationInterval, Resource> allocs1 =
+        ReservationSystemUtil.toResources(allocations1);
     ReservationAllocation rAllocation =
         new InMemoryReservationAllocation(reservationID1, rDef1, user,
-            planName, start, start + alloc1.length, allocations1, resCalc,
+            planName, start, start + alloc1.length, allocs1, resCalc,
             minAlloc);
     Assert.assertNull(plan.getReservationById(reservationID1));
     try {
@@ -388,9 +405,11 @@ public class TestInMemoryPlan {
     ReservationDefinition rDef2 =
         createSimpleReservationDefinition(start, start + alloc2.length,
             alloc2.length, allocations2.values());
+    Map<ReservationInterval, Resource> allocs2 =
+        ReservationSystemUtil.toResources(allocations2);
     rAllocation =
         new InMemoryReservationAllocation(reservationID2, rDef2, user,
-            planName, start, start + alloc2.length, allocations2, resCalc,
+            planName, start, start + alloc2.length, allocs2, resCalc,
             minAlloc);
     Assert.assertNull(plan.getReservationById(reservationID2));
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java
index 76f39dc..55224a9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java
@@ -69,7 +69,7 @@ public class TestInMemoryReservationAllocation {
     ReservationDefinition rDef =
         createSimpleReservationDefinition(start, start + alloc.length + 1,
             alloc.length);
-    Map<ReservationInterval, ReservationRequest> allocations =
+    Map<ReservationInterval, Resource> allocations =
         generateAllocation(start, alloc, false, false);
     ReservationAllocation rAllocation =
         new InMemoryReservationAllocation(reservationID, rDef, user, planName,
@@ -91,7 +91,7 @@ public class TestInMemoryReservationAllocation {
     ReservationDefinition rDef =
         createSimpleReservationDefinition(start, start + alloc.length + 1,
             alloc.length);
-    Map<ReservationInterval, ReservationRequest> allocations =
+    Map<ReservationInterval, Resource> allocations =
         generateAllocation(start, alloc, true, false);
     ReservationAllocation rAllocation =
         new InMemoryReservationAllocation(reservationID, rDef, user, planName,
@@ -114,7 +114,7 @@ public class TestInMemoryReservationAllocation {
     ReservationDefinition rDef =
         createSimpleReservationDefinition(start, start + alloc.length + 1,
             alloc.length);
-    Map<ReservationInterval, ReservationRequest> allocations =
+    Map<ReservationInterval, Resource> allocations =
         generateAllocation(start, alloc, true, false);
     ReservationAllocation rAllocation =
         new InMemoryReservationAllocation(reservationID, rDef, user, planName,
@@ -137,8 +137,8 @@ public class TestInMemoryReservationAllocation {
     ReservationDefinition rDef =
         createSimpleReservationDefinition(start, start + alloc.length + 1,
             alloc.length);
-    Map<ReservationInterval, ReservationRequest> allocations =
-        new HashMap<ReservationInterval, ReservationRequest>();
+    Map<ReservationInterval, Resource> allocations =
+        new HashMap<ReservationInterval, Resource>();
     ReservationAllocation rAllocation =
         new InMemoryReservationAllocation(reservationID, rDef, user, planName,
             start, start + alloc.length + 1, allocations, resCalc, minAlloc);
@@ -156,11 +156,13 @@ public class TestInMemoryReservationAllocation {
     ReservationDefinition rDef =
         createSimpleReservationDefinition(start, start + alloc.length + 1,
             alloc.length);
-    Map<ReservationInterval, ReservationRequest> allocations =
-        generateAllocation(start, alloc, false, true);
+    boolean isGang = true;
+    Map<ReservationInterval, Resource> allocations =
+        generateAllocation(start, alloc, false, isGang);
     ReservationAllocation rAllocation =
         new InMemoryReservationAllocation(reservationID, rDef, user, planName,
-            start, start + alloc.length + 1, allocations, resCalc, minAlloc);
+            start, start + alloc.length + 1, allocations, resCalc, minAlloc,
+            isGang);
     doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc);
     Assert.assertTrue(rAllocation.containsGangs());
     for (int i = 0; i < alloc.length; i++) {
@@ -171,7 +173,7 @@ public class TestInMemoryReservationAllocation {
 
   private void doAssertions(ReservationAllocation rAllocation,
       ReservationId reservationID, ReservationDefinition rDef,
-      Map<ReservationInterval, ReservationRequest> allocations, int start,
+      Map<ReservationInterval, Resource> allocations, int start,
       int[] alloc) {
     Assert.assertEquals(reservationID, rAllocation.getReservationId());
     Assert.assertEquals(rDef, rAllocation.getReservationDefinition());
@@ -198,10 +200,10 @@ public class TestInMemoryReservationAllocation {
     return rDef;
   }
 
-  private Map<ReservationInterval, ReservationRequest> generateAllocation(
+  private Map<ReservationInterval, Resource> generateAllocation(
       int startTime, int[] alloc, boolean isStep, boolean isGang) {
-    Map<ReservationInterval, ReservationRequest> req =
-        new HashMap<ReservationInterval, ReservationRequest>();
+    Map<ReservationInterval, Resource> req =
+        new HashMap<ReservationInterval, Resource>();
     int numContainers = 0;
     for (int i = 0; i < alloc.length; i++) {
       if (isStep) {
@@ -215,7 +217,8 @@ public class TestInMemoryReservationAllocation {
       if (isGang) {
         rr.setConcurrency(numContainers);
       }
-      req.put(new ReservationInterval(startTime + i, startTime + i + 1), rr);
+      req.put(new ReservationInterval(startTime + i, startTime + i + 1),
+          ReservationSystemUtil.toResource(rr));
     }
     return req;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
index c7301c7..d0f4dc6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
@@ -46,9 +46,9 @@ public class TestRLESparseResourceAllocation {
         new RLESparseResourceAllocation(resCalc, minAlloc);
     int[] alloc = { 10, 10, 10, 10, 10, 10 };
     int start = 100;
-    Set<Entry<ReservationInterval, ReservationRequest>> inputs =
+    Set<Entry<ReservationInterval, Resource>> inputs =
         generateAllocation(start, alloc, false).entrySet();
-    for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+    for (Entry<ReservationInterval, Resource> ip : inputs) {
       rleSparseVector.addInterval(ip.getKey(), ip.getValue());
     }
     LOG.info(rleSparseVector.toString());
@@ -63,7 +63,7 @@ public class TestRLESparseResourceAllocation {
     }
     Assert.assertEquals(Resource.newInstance(0, 0),
         rleSparseVector.getCapacityAtTime(start + alloc.length + 2));
-    for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+    for (Entry<ReservationInterval, Resource> ip : inputs) {
       rleSparseVector.removeInterval(ip.getKey(), ip.getValue());
     }
     LOG.info(rleSparseVector.toString());
@@ -83,9 +83,9 @@ public class TestRLESparseResourceAllocation {
         new RLESparseResourceAllocation(resCalc, minAlloc);
     int[] alloc = { 10, 10, 10, 10, 10, 10 };
     int start = 100;
-    Set<Entry<ReservationInterval, ReservationRequest>> inputs =
+    Set<Entry<ReservationInterval, Resource>> inputs =
         generateAllocation(start, alloc, true).entrySet();
-    for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+    for (Entry<ReservationInterval, Resource> ip : inputs) {
       rleSparseVector.addInterval(ip.getKey(), ip.getValue());
     }
     LOG.info(rleSparseVector.toString());
@@ -101,8 +101,8 @@ public class TestRLESparseResourceAllocation {
     }
     Assert.assertEquals(Resource.newInstance(0, 0),
         rleSparseVector.getCapacityAtTime(start + alloc.length + 2));
-    for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
-      rleSparseVector.removeInterval(ip.getKey(), ip.getValue());
+    for (Entry<ReservationInterval, Resource> ip : inputs) {
+      rleSparseVector.removeInterval(ip.getKey(),ip.getValue());
     }
     LOG.info(rleSparseVector.toString());
     for (int i = 0; i < alloc.length; i++) {
@@ -121,9 +121,9 @@ public class TestRLESparseResourceAllocation {
         new RLESparseResourceAllocation(resCalc, minAlloc);
     int[] alloc = { 0, 5, 10, 10, 5, 0 };
     int start = 100;
-    Set<Entry<ReservationInterval, ReservationRequest>> inputs =
+    Set<Entry<ReservationInterval, Resource>> inputs =
         generateAllocation(start, alloc, true).entrySet();
-    for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+    for (Entry<ReservationInterval, Resource> ip : inputs) {
       rleSparseVector.addInterval(ip.getKey(), ip.getValue());
     }
     LOG.info(rleSparseVector.toString());
@@ -139,7 +139,7 @@ public class TestRLESparseResourceAllocation {
     }
     Assert.assertEquals(Resource.newInstance(0, 0),
         rleSparseVector.getCapacityAtTime(start + alloc.length + 2));
-    for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
+    for (Entry<ReservationInterval, Resource> ip : inputs) {
       rleSparseVector.removeInterval(ip.getKey(), ip.getValue());
     }
     LOG.info(rleSparseVector.toString());
@@ -157,17 +157,17 @@ public class TestRLESparseResourceAllocation {
     RLESparseResourceAllocation rleSparseVector =
         new RLESparseResourceAllocation(resCalc, minAlloc);
     rleSparseVector.addInterval(new ReservationInterval(0, Long.MAX_VALUE),
-        ReservationRequest.newInstance(Resource.newInstance(0, 0), (0)));
+        Resource.newInstance(0, 0));
     LOG.info(rleSparseVector.toString());
     Assert.assertEquals(Resource.newInstance(0, 0),
         rleSparseVector.getCapacityAtTime(new Random().nextLong()));
     Assert.assertTrue(rleSparseVector.isEmpty());
   }
 
-  private Map<ReservationInterval, ReservationRequest> generateAllocation(
+  private Map<ReservationInterval, Resource> generateAllocation(
       int startTime, int[] alloc, boolean isStep) {
-    Map<ReservationInterval, ReservationRequest> req =
-        new HashMap<ReservationInterval, ReservationRequest>();
+    Map<ReservationInterval, Resource> req =
+        new HashMap<ReservationInterval, Resource>();
     int numContainers = 0;
     for (int i = 0; i < alloc.length; i++) {
       if (isStep) {
@@ -176,9 +176,8 @@ public class TestRLESparseResourceAllocation {
         numContainers = alloc[i];
       }
       req.put(new ReservationInterval(startTime + i, startTime + i + 1),
-
-      ReservationRequest.newInstance(Resource.newInstance(1024, 1),
-          (numContainers)));
+          ReservationSystemUtil.toResource(ReservationRequest.newInstance(
+              Resource.newInstance(1024, 1), (numContainers))));
     }
     return req;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e602fa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java
index 1ca9f2e..d4a97ba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java
@@ -146,14 +146,15 @@ public class TestSimpleCapacityReplanner {
     }
   }
 
-  private Map<ReservationInterval, ReservationRequest> generateAllocation(
+  private Map<ReservationInterval, Resource> generateAllocation(
       int startTime, int[] alloc) {
-    Map<ReservationInterval, ReservationRequest> req =
-        new TreeMap<ReservationInterval, ReservationRequest>();
+    Map<ReservationInterval, Resource> req =
+        new TreeMap<ReservationInterval, Resource>();
     for (int i = 0; i < alloc.length; i++) {
       req.put(new ReservationInterval(startTime + i, startTime + i + 1),
-          ReservationRequest.newInstance(Resource.newInstance(1024, 1),
-              alloc[i]));
+          ReservationSystemUtil.toResource(
+              ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+                  alloc[i])));
     }
     return req;
   }


[3/6] hadoop git commit: YARN-3888. ApplicationMaster link is broken in RM WebUI when appstate is NEW. Contributed by Bibin A Chundatt

Posted by ji...@apache.org.
YARN-3888. ApplicationMaster link is broken in RM WebUI when appstate is
NEW. Contributed by Bibin A Chundatt


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/52148767
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/52148767
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/52148767

Branch: refs/heads/YARN-1197
Commit: 52148767924baf423172d26f2c6d8a4cfc6e143f
Parents: 1a0752d
Author: Xuan <xg...@apache.org>
Authored: Thu Jul 9 21:37:33 2015 -0700
Committer: Xuan <xg...@apache.org>
Committed: Thu Jul 9 21:37:33 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                           |  3 +++
 .../yarn/server/resourcemanager/webapp/RMAppsBlock.java   | 10 ++++++----
 2 files changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/52148767/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 89b5e9f..2a9ff98 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -607,6 +607,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3892. Fixed NPE on RMStateStore#serviceStop when
     CapacityScheduler#serviceInit fails. (Bibin A Chundatt via jianhe)
 
+    YARN-3888. ApplicationMaster link is broken in RM WebUI when appstate is NEW.
+    (Bibin A Chundatt via xgong)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52148767/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppsBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppsBlock.java
index d252c30..5e80d23 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppsBlock.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppsBlock.java
@@ -131,13 +131,15 @@ public class RMAppsBlock extends AppsBlock {
 
       String trackingURL =
           app.getTrackingUrl() == null
-              || app.getTrackingUrl().equals(UNAVAILABLE) ? null : app
-            .getTrackingUrl();
+              || app.getTrackingUrl().equals(UNAVAILABLE)
+              || app.getAppState() == YarnApplicationState.NEW ? null : app
+              .getTrackingUrl();
 
       String trackingUI =
           app.getTrackingUrl() == null
-              || app.getTrackingUrl().equals(UNAVAILABLE) ? "Unassigned" : app
-            .getAppState() == YarnApplicationState.FINISHED
+              || app.getTrackingUrl().equals(UNAVAILABLE)
+              || app.getAppState() == YarnApplicationState.NEW ? "Unassigned"
+              : app.getAppState() == YarnApplicationState.FINISHED
               || app.getAppState() == YarnApplicationState.FAILED
               || app.getAppState() == YarnApplicationState.KILLED ? "History"
               : "ApplicationMaster";


[4/6] hadoop git commit: HDFS-8749. Fix findbugs warnings in BlockManager.java. Contributed by Brahma Reddy Battula.

Posted by ji...@apache.org.
HDFS-8749. Fix findbugs warnings in BlockManager.java. Contributed by Brahma Reddy Battula.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d66302ed
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d66302ed
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d66302ed

Branch: refs/heads/YARN-1197
Commit: d66302ed9b2c25b560d8319d6d755aee7cfa4d67
Parents: 5214876
Author: Akira Ajisaka <aa...@apache.org>
Authored: Fri Jul 10 15:04:06 2015 +0900
Committer: Akira Ajisaka <aa...@apache.org>
Committed: Fri Jul 10 15:04:06 2015 +0900

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt                       | 3 +++
 .../apache/hadoop/hdfs/server/blockmanagement/BlockManager.java   | 2 --
 2 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d66302ed/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index e26e061..5c1208d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1026,6 +1026,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8729. Fix TestFileTruncate#testTruncateWithDataNodesRestartImmediately
     which occasionally failed. (Walter Su via jing9)
 
+    HDFS-8749. Fix findbugs warnings in BlockManager.java.
+    (Brahma Reddy Battula via aajisaka)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d66302ed/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 0b60a97..7dce2a8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -3596,8 +3596,6 @@ public class BlockManager implements BlockStatsMXBean {
       String src, BlockInfo[] blocks) {
     for (BlockInfo b: blocks) {
       if (!b.isComplete()) {
-        final BlockInfoUnderConstruction uc =
-            (BlockInfoUnderConstruction)b;
         final int numNodes = b.numNodes();
         final int min = getMinStorageNum(b);
         final BlockUCState state = b.getBlockUCState();


[5/6] hadoop git commit: HDFS-2956. calling fetchdt without a --renewer argument throws NPE (Contributed by Vinayakumar B)HDFS-2956. calling fetchdt without a --renewer argument throws NPE (Contributed by Vinayakumar B)

Posted by ji...@apache.org.
HDFS-2956. calling fetchdt without a --renewer argument throws NPE (Contributed by Vinayakumar B)HDFS-2956. calling fetchdt without a --renewer argument throws NPE (Contributed by Vinayakumar B)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b4890803
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b4890803
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b4890803

Branch: refs/heads/YARN-1197
Commit: b48908033fcac7a4bd4313c1fd1457999fba08e1
Parents: d66302e
Author: Vinayakumar B <vi...@apache.org>
Authored: Fri Jul 10 15:47:04 2015 +0530
Committer: Vinayakumar B <vi...@apache.org>
Committed: Fri Jul 10 15:47:04 2015 +0530

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 ++
 .../ClientNamenodeProtocolTranslatorPB.java     |  2 +-
 .../hdfs/tools/TestDelegationTokenFetcher.java  | 39 ++++++++++++++++++++
 3 files changed, 43 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4890803/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 5c1208d..13b2621 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1029,6 +1029,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8749. Fix findbugs warnings in BlockManager.java.
     (Brahma Reddy Battula via aajisaka)
 
+    HDFS-2956. calling fetchdt without a --renewer argument throws NPE
+    (vinayakumarb)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4890803/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 4ec6f9e..566d54f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -929,7 +929,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
       throws IOException {
     GetDelegationTokenRequestProto req = GetDelegationTokenRequestProto
         .newBuilder()
-        .setRenewer(renewer.toString())
+        .setRenewer(renewer == null ? "" : renewer.toString())
         .build();
     try {
       GetDelegationTokenResponseProto resp = rpcProxy.getDelegationToken(null, req);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4890803/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDelegationTokenFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDelegationTokenFetcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDelegationTokenFetcher.java
index ab3933b..80a1a6c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDelegationTokenFetcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDelegationTokenFetcher.java
@@ -18,7 +18,10 @@
 
 package org.apache.hadoop.hdfs.tools;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
@@ -28,12 +31,18 @@ import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.tools.FakeRenewer;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -105,4 +114,34 @@ public class TestDelegationTokenFetcher {
     Assert.assertFalse(p.getFileSystem(conf).exists(p));
 
   }
+
+  @Test
+  public void testDelegationTokenWithoutRenewerViaRPC() throws Exception {
+    conf.setBoolean(DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
+        .build();
+    try {
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      // Should be able to fetch token without renewer.
+      LocalFileSystem localFileSystem = FileSystem.getLocal(conf);
+      Path p = new Path(f.getRoot().getAbsolutePath(), tokenFile);
+      p = localFileSystem.makeQualified(p);
+      DelegationTokenFetcher.saveDelegationToken(conf, fs, null, p);
+      Credentials creds = Credentials.readTokenStorageFile(p, conf);
+      Iterator<Token<?>> itr = creds.getAllTokens().iterator();
+      assertTrue("token not exist error", itr.hasNext());
+      assertNotNull("Token should be there without renewer", itr.next());
+      try {
+        // Without renewer renewal of token should fail.
+        DelegationTokenFetcher.renewTokens(conf, p);
+        fail("Should have failed to renew");
+      } catch (AccessControlException e) {
+        GenericTestUtils.assertExceptionContains(
+            "tried to renew a token without a renewer", e);
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }


[2/6] hadoop git commit: HADOOP-12210. Collect network usage on the node. Contributed by Robert Grandl

Posted by ji...@apache.org.
HADOOP-12210. Collect network usage on the node. Contributed by Robert Grandl


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1a0752d8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1a0752d8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1a0752d8

Branch: refs/heads/YARN-1197
Commit: 1a0752d85a15499d120b4a79af9bd740fcd1f8e0
Parents: 0e602fa
Author: Chris Douglas <cd...@apache.org>
Authored: Mon Jul 6 17:28:20 2015 -0700
Committer: Chris Douglas <cd...@apache.org>
Committed: Thu Jul 9 17:48:43 2015 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |  2 +
 .../java/org/apache/hadoop/util/SysInfo.java    | 12 +++
 .../org/apache/hadoop/util/SysInfoLinux.java    | 93 +++++++++++++++++++-
 .../org/apache/hadoop/util/SysInfoWindows.java  | 15 ++++
 .../apache/hadoop/util/TestSysInfoLinux.java    | 40 ++++++++-
 .../gridmix/DummyResourceCalculatorPlugin.java  | 19 ++++
 .../yarn/util/ResourceCalculatorPlugin.java     | 16 ++++
 7 files changed, 195 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a0752d8/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index d9a9eba..3d4f1e4 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -693,6 +693,8 @@ Release 2.8.0 - UNRELEASED
     HADOOP-12180. Move ResourceCalculatorPlugin from YARN to Common. 
     (Chris Douglas via kasha)
 
+    HADOOP-12210. Collect network usage on the node (Robert Grandl via cdouglas)
+
   OPTIMIZATIONS
 
     HADOOP-11785. Reduce the number of listStatus operation in distcp

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a0752d8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java
index ec7fb24..24b339d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java
@@ -108,4 +108,16 @@ public abstract class SysInfo {
    */
   public abstract float getCpuUsage();
 
+  /**
+   * Obtain the aggregated number of bytes read over the network.
+   * @return total number of bytes read.
+   */
+  public abstract long getNetworkBytesRead();
+
+  /**
+   * Obtain the aggregated number of bytes written to the network.
+   * @return total number of bytes written.
+   */
+  public abstract long getNetworkBytesWritten();
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a0752d8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java
index 055298d..8801985 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java
@@ -83,9 +83,22 @@ public class SysInfoLinux extends SysInfo {
                       "[ \t]*([0-9]*)[ \t]*([0-9]*)[ \t].*");
   private CpuTimeTracker cpuTimeTracker;
 
+  /**
+   * Pattern for parsing /proc/net/dev.
+   */
+  private static final String PROCFS_NETFILE = "/proc/net/dev";
+  private static final Pattern PROCFS_NETFILE_FORMAT =
+      Pattern .compile("^[ \t]*([a-zA-Z]+[0-9]*):" +
+               "[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)" +
+               "[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)" +
+               "[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)" +
+               "[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+).*");
+
+
   private String procfsMemFile;
   private String procfsCpuFile;
   private String procfsStatFile;
+  private String procfsNetFile;
   private long jiffyLengthInMillis;
 
   private long ramSize = 0;
@@ -98,6 +111,8 @@ public class SysInfoLinux extends SysInfo {
   /* number of physical cores on the system. */
   private int numCores = 0;
   private long cpuFrequency = 0L; // CPU frequency on the system (kHz)
+  private long numNetBytesRead = 0L; // aggregated bytes read from network
+  private long numNetBytesWritten = 0L; // aggregated bytes written to network
 
   private boolean readMemInfoFile = false;
   private boolean readCpuInfoFile = false;
@@ -130,7 +145,7 @@ public class SysInfoLinux extends SysInfo {
 
   public SysInfoLinux() {
     this(PROCFS_MEMFILE, PROCFS_CPUINFO, PROCFS_STAT,
-        JIFFY_LENGTH_IN_MILLIS);
+         PROCFS_NETFILE, JIFFY_LENGTH_IN_MILLIS);
   }
 
   /**
@@ -139,16 +154,19 @@ public class SysInfoLinux extends SysInfo {
    * @param procfsMemFile fake file for /proc/meminfo
    * @param procfsCpuFile fake file for /proc/cpuinfo
    * @param procfsStatFile fake file for /proc/stat
+   * @param procfsNetFile fake file for /proc/net/dev
    * @param jiffyLengthInMillis fake jiffy length value
    */
   @VisibleForTesting
   public SysInfoLinux(String procfsMemFile,
                                        String procfsCpuFile,
                                        String procfsStatFile,
+                                       String procfsNetFile,
                                        long jiffyLengthInMillis) {
     this.procfsMemFile = procfsMemFile;
     this.procfsCpuFile = procfsCpuFile;
     this.procfsStatFile = procfsStatFile;
+    this.procfsNetFile = procfsNetFile;
     this.jiffyLengthInMillis = jiffyLengthInMillis;
     this.cpuTimeTracker = new CpuTimeTracker(jiffyLengthInMillis);
   }
@@ -338,6 +356,61 @@ public class SysInfoLinux extends SysInfo {
     }
   }
 
+  /**
+   * Read /proc/net/dev file, parse and calculate amount
+   * of bytes read and written through the network.
+   */
+  private void readProcNetInfoFile() {
+
+    numNetBytesRead = 0L;
+    numNetBytesWritten = 0L;
+
+    // Read "/proc/net/dev" file
+    BufferedReader in;
+    InputStreamReader fReader;
+    try {
+      fReader = new InputStreamReader(
+          new FileInputStream(procfsNetFile), Charset.forName("UTF-8"));
+      in = new BufferedReader(fReader);
+    } catch (FileNotFoundException f) {
+      return;
+    }
+
+    Matcher mat;
+    try {
+      String str = in.readLine();
+      while (str != null) {
+        mat = PROCFS_NETFILE_FORMAT.matcher(str);
+        if (mat.find()) {
+          assert mat.groupCount() >= 16;
+
+          // ignore loopback interfaces
+          if (mat.group(1).equals("lo")) {
+            str = in.readLine();
+            continue;
+          }
+          numNetBytesRead += Long.parseLong(mat.group(2));
+          numNetBytesWritten += Long.parseLong(mat.group(10));
+        }
+        str = in.readLine();
+      }
+    } catch (IOException io) {
+      LOG.warn("Error reading the stream " + io);
+    } finally {
+      // Close the streams
+      try {
+        fReader.close();
+        try {
+          in.close();
+        } catch (IOException i) {
+          LOG.warn("Error closing the stream " + in);
+        }
+      } catch (IOException i) {
+        LOG.warn("Error closing the stream " + fReader);
+      }
+    }
+  }
+
   /** {@inheritDoc} */
   @Override
   public long getPhysicalMemorySize() {
@@ -405,6 +478,20 @@ public class SysInfoLinux extends SysInfo {
     return overallCpuUsage;
   }
 
+  /** {@inheritDoc} */
+  @Override
+  public long getNetworkBytesRead() {
+    readProcNetInfoFile();
+    return numNetBytesRead;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getNetworkBytesWritten() {
+    readProcNetInfoFile();
+    return numNetBytesWritten;
+  }
+
   /**
    * Test the {@link SysInfoLinux}.
    *
@@ -424,6 +511,10 @@ public class SysInfoLinux extends SysInfo {
     System.out.println("CPU frequency (kHz) : " + plugin.getCpuFrequency());
     System.out.println("Cumulative CPU time (ms) : " +
             plugin.getCumulativeCpuTime());
+    System.out.println("Total network read (bytes) : "
+            + plugin.getNetworkBytesRead());
+    System.out.println("Total network written (bytes) : "
+            + plugin.getNetworkBytesWritten());
     try {
       // Sleep so we can compute the CPU usage
       Thread.sleep(500L);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a0752d8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java
index da4c1c5..f8542a3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java
@@ -178,4 +178,19 @@ public class SysInfoWindows extends SysInfo {
     refreshIfNeeded();
     return cpuUsage;
   }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getNetworkBytesRead() {
+    // TODO unimplemented
+    return 0L;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getNetworkBytesWritten() {
+    // TODO unimplemented
+    return 0L;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a0752d8/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoLinux.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoLinux.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoLinux.java
index 73edc77..2a31f31 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoLinux.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoLinux.java
@@ -44,8 +44,10 @@ public class TestSysInfoLinux {
     public FakeLinuxResourceCalculatorPlugin(String procfsMemFile,
                                              String procfsCpuFile,
                                              String procfsStatFile,
+			                                       String procfsNetFile,
                                              long jiffyLengthInMillis) {
-      super(procfsMemFile, procfsCpuFile, procfsStatFile, jiffyLengthInMillis);
+      super(procfsMemFile, procfsCpuFile, procfsStatFile, procfsNetFile,
+          jiffyLengthInMillis);
     }
     @Override
     long getCurrentTime() {
@@ -61,14 +63,17 @@ public class TestSysInfoLinux {
   private static final String FAKE_MEMFILE;
   private static final String FAKE_CPUFILE;
   private static final String FAKE_STATFILE;
+  private static final String FAKE_NETFILE;
   private static final long FAKE_JIFFY_LENGTH = 10L;
   static {
     int randomNum = (new Random()).nextInt(1000000000);
     FAKE_MEMFILE = TEST_ROOT_DIR + File.separator + "MEMINFO_" + randomNum;
     FAKE_CPUFILE = TEST_ROOT_DIR + File.separator + "CPUINFO_" + randomNum;
     FAKE_STATFILE = TEST_ROOT_DIR + File.separator + "STATINFO_" + randomNum;
+    FAKE_NETFILE = TEST_ROOT_DIR + File.separator + "NETINFO_" + randomNum;
     plugin = new FakeLinuxResourceCalculatorPlugin(FAKE_MEMFILE, FAKE_CPUFILE,
                                                    FAKE_STATFILE,
+                                                   FAKE_NETFILE,
                                                    FAKE_JIFFY_LENGTH);
   }
   static final String MEMINFO_FORMAT =
@@ -141,6 +146,17 @@ public class TestSysInfoLinux {
     "procs_running 1\n" +
     "procs_blocked 0\n";
 
+  static final String NETINFO_FORMAT =
+    "Inter-|   Receive                                                |  Transmit\n"+
+    "face |bytes    packets errs drop fifo frame compressed multicast|bytes    packets"+
+    "errs drop fifo colls carrier compressed\n"+
+    "   lo: 42236310  563003    0    0    0     0          0         0 42236310  563003    " +
+    "0    0    0     0       0          0\n"+
+    " eth0: %d 3452527    0    0    0     0          0    299787 %d 1866280    0    0    " +
+    "0     0       0          0\n"+
+    " eth1: %d 3152521    0    0    0     0          0    219781 %d 1866290    0    0    " +
+    "0     0       0          0\n";
+
   /**
    * Test parsing /proc/stat and /proc/cpuinfo
    * @throws IOException
@@ -320,4 +336,26 @@ public class TestSysInfoLinux {
       IOUtils.closeQuietly(fWriter);
     }
   }
+
+  /**
+   * Test parsing /proc/net/dev
+   * @throws IOException
+   */
+  @Test
+  public void parsingProcNetFile() throws IOException {
+    long numBytesReadIntf1 = 2097172468L;
+    long numBytesWrittenIntf1 = 1355620114L;
+    long numBytesReadIntf2 = 1097172460L;
+    long numBytesWrittenIntf2 = 1055620110L;
+    File tempFile = new File(FAKE_NETFILE);
+    tempFile.deleteOnExit();
+    FileWriter fWriter = new FileWriter(FAKE_NETFILE);
+    fWriter.write(String.format(NETINFO_FORMAT,
+                            numBytesReadIntf1, numBytesWrittenIntf1,
+                            numBytesReadIntf2, numBytesWrittenIntf2));
+    fWriter.close();
+    assertEquals(plugin.getNetworkBytesRead(), numBytesReadIntf1 + numBytesReadIntf2);
+    assertEquals(plugin.getNetworkBytesWritten(), numBytesWrittenIntf1 + numBytesWrittenIntf2);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a0752d8/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DummyResourceCalculatorPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DummyResourceCalculatorPlugin.java b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DummyResourceCalculatorPlugin.java
index fd4cb83..b86303b 100644
--- a/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DummyResourceCalculatorPlugin.java
+++ b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DummyResourceCalculatorPlugin.java
@@ -48,6 +48,12 @@ public class DummyResourceCalculatorPlugin extends ResourceCalculatorPlugin {
       "mapred.tasktracker.cumulativecputime.testing";
   /** CPU usage percentage for testing */
   public static final String CPU_USAGE = "mapred.tasktracker.cpuusage.testing";
+  /** cumulative number of bytes read over the network */
+  public static final String NETWORK_BYTES_READ =
+      "mapred.tasktracker.networkread.testing";
+  /** cumulative number of bytes written over the network */
+  public static final String NETWORK_BYTES_WRITTEN =
+      "mapred.tasktracker.networkwritten.testing";
   /** process cumulative CPU usage time for testing */
   public static final String PROC_CUMULATIVE_CPU_TIME =
       "mapred.tasktracker.proccumulativecputime.testing";
@@ -111,4 +117,17 @@ public class DummyResourceCalculatorPlugin extends ResourceCalculatorPlugin {
   public float getCpuUsage() {
     return getConf().getFloat(CPU_USAGE, -1);
   }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getNetworkBytesRead() {
+    return getConf().getLong(NETWORK_BYTES_READ, -1);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getNetworkBytesWritten() {
+    return getConf().getLong(NETWORK_BYTES_WRITTEN, -1);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a0752d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java
index 5e5f1b4..21724a9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java
@@ -124,6 +124,22 @@ public class ResourceCalculatorPlugin extends Configured {
     return sys.getCpuUsage();
   }
 
+   /**
+   * Obtain the aggregated number of bytes read over the network.
+   * @return total number of bytes read.
+   */
+  public long getNetworkBytesRead() {
+    return sys.getNetworkBytesRead();
+  }
+
+  /**
+   * Obtain the aggregated number of bytes written to the network.
+   * @return total number of bytes written.
+   */
+  public long getNetworkBytesWritten() {
+    return sys.getNetworkBytesWritten();
+  }
+
   /**
    * Create the ResourceCalculatorPlugin from the class name and configure it. If
    * class name is null, this method will try and return a memory calculator


[6/6] hadoop git commit: YARN-3445. Cache runningApps in RMNode for getting running apps on given NodeId. (Junping Du via mingma)

Posted by ji...@apache.org.
YARN-3445. Cache runningApps in RMNode for getting running apps on given NodeId. (Junping Du via mingma)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/08244264
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/08244264
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/08244264

Branch: refs/heads/YARN-1197
Commit: 08244264c0583472b9c4e16591cfde72c6db62a2
Parents: b489080
Author: Ming Ma <mi...@apache.org>
Authored: Fri Jul 10 08:30:10 2015 -0700
Committer: Ming Ma <mi...@apache.org>
Committed: Fri Jul 10 08:30:10 2015 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/sls/nodemanager/NodeInfo.java   |  8 +++-
 .../yarn/sls/scheduler/RMNodeWrapper.java       |  5 +++
 hadoop-yarn-project/CHANGES.txt                 |  3 ++
 .../server/resourcemanager/rmnode/RMNode.java   |  2 +
 .../resourcemanager/rmnode/RMNodeImpl.java      | 43 ++++++++++++++++----
 .../yarn/server/resourcemanager/MockNodes.java  |  5 +++
 .../resourcemanager/TestRMNodeTransitions.java  | 36 ++++++++++++++--
 7 files changed, 91 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index ee6eb7b..440779c 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -62,7 +62,8 @@ public class NodeInfo {
     private NodeState state;
     private List<ContainerId> toCleanUpContainers;
     private List<ApplicationId> toCleanUpApplications;
-    
+    private List<ApplicationId> runningApplications;
+
     public FakeRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
         Resource perNode, String rackName, String healthReport,
         int cmdPort, String hostName, NodeState state) {
@@ -77,6 +78,7 @@ public class NodeInfo {
       this.state = state;
       toCleanUpApplications = new ArrayList<ApplicationId>();
       toCleanUpContainers = new ArrayList<ContainerId>();
+      runningApplications = new ArrayList<ApplicationId>();
     }
 
     public NodeId getNodeID() {
@@ -135,6 +137,10 @@ public class NodeInfo {
       return toCleanUpApplications;
     }
 
+    public List<ApplicationId> getRunningApps() {
+      return runningApplications;
+    }
+
     public void updateNodeHeartbeatResponseForCleanup(
             NodeHeartbeatResponse response) {
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index b64be1b..a6633ae 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -119,6 +119,11 @@ public class RMNodeWrapper implements RMNode {
   }
 
   @Override
+  public List<ApplicationId> getRunningApps() {
+    return node.getRunningApps();
+  }
+
+  @Override
   public void updateNodeHeartbeatResponseForCleanup(
           NodeHeartbeatResponse nodeHeartbeatResponse) {
     node.updateNodeHeartbeatResponseForCleanup(nodeHeartbeatResponse);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 2a9ff98..db000d7 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -1678,6 +1678,9 @@ Release 2.6.0 - 2014-11-18
     YARN-2811. In Fair Scheduler, reservation fulfillments shouldn't ignore max
     share (Siqi Li via Sandy Ryza)
 
+    YARN-3445. Cache runningApps in RMNode for getting running apps on given
+    NodeId. (Junping Du via mingma)
+
   IMPROVEMENTS
 
     YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index 95eeaf6..0386be6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -119,6 +119,8 @@ public interface RMNode {
 
   public List<ApplicationId> getAppsToCleanup();
 
+  List<ApplicationId> getRunningApps();
+
   /**
    * Update a {@link NodeHeartbeatResponse} with the list of containers and
    * applications to clean up for this node.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index d1e6190..9bc91c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -123,11 +123,16 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       new HashSet<ContainerId>();
 
   /* the list of applications that have finished and need to be purged */
-  private final List<ApplicationId> finishedApplications = new ArrayList<ApplicationId>();
+  private final List<ApplicationId> finishedApplications =
+      new ArrayList<ApplicationId>();
+
+  /* the list of applications that are running on this node */
+  private final List<ApplicationId> runningApplications =
+      new ArrayList<ApplicationId>();
 
   private NodeHeartbeatResponse latestNodeHeartBeatResponse = recordFactory
       .newRecordInstance(NodeHeartbeatResponse.class);
-  
+
   private static final StateMachineFactory<RMNodeImpl,
                                            NodeState,
                                            RMNodeEventType,
@@ -136,7 +141,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
                                            NodeState,
                                            RMNodeEventType,
                                            RMNodeEvent>(NodeState.NEW)
-  
+
      //Transitions from NEW state
      .addTransition(NodeState.NEW, NodeState.RUNNING, 
          RMNodeEventType.STARTED, new AddNodeTransition())
@@ -383,6 +388,16 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   }
   
   @Override
+  public List<ApplicationId> getRunningApps() {
+    this.readLock.lock();
+    try {
+      return new ArrayList<ApplicationId>(this.runningApplications);
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
   public List<ContainerId> getContainersToCleanUp() {
 
     this.readLock.lock();
@@ -519,9 +534,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       LOG.warn("Cannot get RMApp by appId=" + appId
           + ", just added it to finishedApplications list for cleanup");
       rmNode.finishedApplications.add(appId);
+      rmNode.runningApplications.remove(appId);
       return;
     }
 
+    // Add running applications back due to Node add or Node reconnection.
+    rmNode.runningApplications.add(appId);
     context.getDispatcher().getEventHandler()
         .handle(new RMAppRunningOnNodeEvent(appId, nodeId));
   }
@@ -707,8 +725,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
 
     @Override
     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
-      rmNode.finishedApplications.add(((
-          RMNodeCleanAppEvent) event).getAppId());
+      ApplicationId appId = ((RMNodeCleanAppEvent) event).getAppId();
+      rmNode.finishedApplications.add(appId);
+      rmNode.runningApplications.remove(appId);
     }
   }
 
@@ -910,12 +929,22 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
             + "cleanup, no further processing");
         continue;
       }
-      if (finishedApplications.contains(containerId.getApplicationAttemptId()
-          .getApplicationId())) {
+
+      ApplicationId containerAppId =
+          containerId.getApplicationAttemptId().getApplicationId();
+
+      if (finishedApplications.contains(containerAppId)) {
         LOG.info("Container " + containerId
             + " belongs to an application that is already killed,"
             + " no further processing");
         continue;
+      } else if (!runningApplications.contains(containerAppId)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Container " + containerId
+              + " is the first container get launched for application "
+              + containerAppId);
+        }
+        runningApplications.add(containerAppId);
       }
 
       // Process running containers

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 2d863d1..095fe28 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -187,6 +187,11 @@ public class MockNodes {
     }
 
     @Override
+    public List<ApplicationId> getRunningApps() {
+      return null;
+    }
+
+    @Override
     public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) {
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index 01f4357..ece896b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -33,6 +33,7 @@ import java.util.List;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
@@ -485,9 +486,9 @@ public class TestRMNodeTransitions {
     NodeId nodeId = node.getNodeID();
 
     // Expire a container
-		ContainerId completedContainerId = BuilderUtils.newContainerId(
-				BuilderUtils.newApplicationAttemptId(
-						BuilderUtils.newApplicationId(0, 0), 0), 0);
+    ContainerId completedContainerId = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+            BuilderUtils.newApplicationId(0, 0), 0), 0);
     node.handle(new RMNodeCleanContainerEvent(nodeId, completedContainerId));
     Assert.assertEquals(1, node.getContainersToCleanUp().size());
 
@@ -512,6 +513,35 @@ public class TestRMNodeTransitions {
     Assert.assertEquals(finishedAppId, hbrsp.getApplicationsToCleanup().get(0));
   }
 
+  @Test(timeout=20000)
+  public void testUpdateHeartbeatResponseForAppLifeCycle() {
+    RMNodeImpl node = getRunningNode();
+    NodeId nodeId = node.getNodeID();
+
+    ApplicationId runningAppId = BuilderUtils.newApplicationId(0, 1);
+    // Create a running container
+    ContainerId runningContainerId = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+        runningAppId, 0), 0);
+
+    ContainerStatus status = ContainerStatus.newInstance(runningContainerId,
+        ContainerState.RUNNING, "", 0);
+    List<ContainerStatus> statusList = new ArrayList<ContainerStatus>();
+    statusList.add(status);
+    NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(true,
+        "", System.currentTimeMillis());
+    node.handle(new RMNodeStatusEvent(nodeId, nodeHealth,
+        statusList, null, null));
+
+    Assert.assertEquals(1, node.getRunningApps().size());
+
+    // Finish an application
+    ApplicationId finishedAppId = runningAppId;
+    node.handle(new RMNodeCleanAppEvent(nodeId, finishedAppId));
+    Assert.assertEquals(1, node.getAppsToCleanup().size());
+    Assert.assertEquals(0, node.getRunningApps().size());
+  }
+
   private RMNodeImpl getRunningNode() {
     return getRunningNode(null, 0);
   }