You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2016/02/11 09:00:10 UTC

[36/50] hadoop git commit: YARN-4360. Improve GreedyReservationAgent to support "early" allocations, and performance improvements (curino via asuresh)

YARN-4360. Improve GreedyReservationAgent to support "early" allocations, and performance improvements (curino via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5cf5c41a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5cf5c41a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5cf5c41a

Branch: refs/heads/yarn-2877
Commit: 5cf5c41a895f5ab8bf6270089f8cfdea50573a97
Parents: a429f85
Author: Arun Suresh <as...@apache.org>
Authored: Wed Feb 10 09:11:15 2016 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Wed Feb 10 09:11:15 2016 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../reservation/CapacityOverTimePolicy.java     |   9 +
 .../planning/AlignedPlannerWithGreedy.java      |   4 +-
 .../planning/GreedyReservationAgent.java        |  40 ++-
 .../reservation/planning/IterativePlanner.java  | 239 ++++++++++++------
 .../planning/StageAllocatorGreedyRLE.java       | 245 +++++++++++++++++++
 .../reservation/ReservationSystemTestUtil.java  |  19 +-
 .../planning/TestGreedyReservationAgent.java    | 120 +++++++--
 8 files changed, 565 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cf5c41a/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 3a0e0b1..ff37e1a 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -799,6 +799,9 @@ Release 2.8.0 - UNRELEASED
 
     YARN-4662. Document some newly added metrics. (Jian He via xgong)
 
+    YARN-4360. Improve GreedyReservationAgent to support "early" allocations,
+    and performance improvements (curino via asuresh)
+
   OPTIMIZATIONS
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cf5c41a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
index 424b543..80f6c88 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
@@ -214,6 +214,15 @@ public class CapacityOverTimePolicy implements SharingPolicy {
     RLESparseResourceAllocation used =
         plan.getConsumptionForUserOverTime(user, start, end);
 
+    // add back in old reservation used resources if any
+    ReservationAllocation old = plan.getReservationById(oldId);
+    if (old != null) {
+      used =
+          RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
+              Resources.clone(plan.getTotalCapacity()), used,
+              old.getResourcesOverTime(), RLEOperator.subtract, start, end);
+    }
+
     instRLEQuota =
         RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
             planTotalCapacity, instRLEQuota, used, RLEOperator.subtract, start,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cf5c41a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
index a389928..b23cf1e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
@@ -58,13 +58,13 @@ public class AlignedPlannerWithGreedy implements ReservationAgent {
     // LowCostAligned planning algorithm
     ReservationAgent algAligned =
         new IterativePlanner(new StageEarliestStartByDemand(),
-            new StageAllocatorLowCostAligned(smoothnessFactor));
+            new StageAllocatorLowCostAligned(smoothnessFactor), false);
     listAlg.add(algAligned);
 
     // Greedy planning algorithm
     ReservationAgent algGreedy =
         new IterativePlanner(new StageEarliestStartByJobArrival(),
-            new StageAllocatorGreedy());
+            new StageAllocatorGreedy(), false);
     listAlg.add(algGreedy);
 
     // Set planner:

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cf5c41a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
index db82a66..915a834 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
@@ -45,9 +46,44 @@ public class GreedyReservationAgent implements ReservationAgent {
       .getLogger(GreedyReservationAgent.class);
 
   // Greedy planner
-  private final ReservationAgent planner = new IterativePlanner(
-      new StageEarliestStartByJobArrival(), new StageAllocatorGreedy());
+  private final ReservationAgent planner;
 
+  public final static String GREEDY_FAVOR_EARLY_ALLOCATION =
+      "yarn.resourcemanager.reservation-system.favor-early-allocation";
+
+  public final static boolean DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION = true;
+
+  private final boolean allocateLeft;
+
+  public GreedyReservationAgent() {
+    this(new Configuration());
+  }
+
+  public GreedyReservationAgent(Configuration yarnConfiguration) {
+
+    allocateLeft =
+        yarnConfiguration.getBoolean(GREEDY_FAVOR_EARLY_ALLOCATION,
+            DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION);
+
+    if (allocateLeft) {
+      LOG.info("Initializing the GreedyReservationAgent to favor \"early\""
+          + " (left) allocations (controlled by parameter: "
+          + GREEDY_FAVOR_EARLY_ALLOCATION + ")");
+    } else {
+      LOG.info("Initializing the GreedyReservationAgent to favor \"late\""
+          + " (right) allocations (controlled by parameter: "
+          + GREEDY_FAVOR_EARLY_ALLOCATION + ")");
+    }
+
+    planner =
+        new IterativePlanner(new StageEarliestStartByJobArrival(),
+            new StageAllocatorGreedyRLE(allocateLeft), allocateLeft);
+
+  }
+
+  public boolean isAllocateLeft(){
+    return allocateLeft;
+  }
   @Override
   public boolean createReservation(ReservationId reservationId, String user,
       Plan plan, ReservationDefinition contract) throws PlanningException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cf5c41a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
index 77362d5..24d237a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
@@ -19,9 +19,11 @@
 package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.ListIterator;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.ReservationId;
@@ -32,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -69,11 +72,13 @@ public class IterativePlanner extends PlanningAlgorithm {
   // Phase algorithms
   private StageEarliestStart algStageEarliestStart = null;
   private StageAllocator algStageAllocator = null;
+  private final boolean allocateLeft;
 
   // Constructor
   public IterativePlanner(StageEarliestStart algEarliestStartTime,
-      StageAllocator algStageAllocator) {
+      StageAllocator algStageAllocator, boolean allocateLeft) {
 
+    this.allocateLeft = allocateLeft;
     setAlgStageEarliestStart(algEarliestStartTime);
     setAlgStageAllocator(algStageAllocator);
 
@@ -85,61 +90,49 @@ public class IterativePlanner extends PlanningAlgorithm {
       String user) throws PlanningException {
 
     // Initialize
-    initialize(plan, reservation);
-
-    // If the job has been previously reserved, logically remove its allocation
-    ReservationAllocation oldReservation =
-        plan.getReservationById(reservationId);
-    if (oldReservation != null) {
-      ignoreOldAllocation(oldReservation);
-    }
+    initialize(plan, reservationId, reservation);
 
     // Create the allocations data structure
     RLESparseResourceAllocation allocations =
         new RLESparseResourceAllocation(plan.getResourceCalculator());
 
-    // Get a reverse iterator for the set of stages
-    ListIterator<ReservationRequest> li =
-        reservation
-            .getReservationRequests()
-            .getReservationResources()
-            .listIterator(
-                reservation.getReservationRequests().getReservationResources()
-                    .size());
+    StageProvider stageProvider = new StageProvider(allocateLeft, reservation);
 
     // Current stage
     ReservationRequest currentReservationStage;
 
-    // Index, points on the current node
-    int index =
-        reservation.getReservationRequests().getReservationResources().size();
-
     // Stage deadlines
     long stageDeadline = stepRoundDown(reservation.getDeadline(), step);
     long successorStartingTime = -1;
+    long predecessorEndTime = stepRoundDown(reservation.getArrival(), step);
+    long stageArrivalTime = -1;
 
     // Iterate the stages in reverse order
-    while (li.hasPrevious()) {
+    while (stageProvider.hasNext()) {
 
       // Get current stage
-      currentReservationStage = li.previous();
-      index -= 1;
+      currentReservationStage = stageProvider.next();
 
       // Validate that the ReservationRequest respects basic constraints
       validateInputStage(plan, currentReservationStage);
 
       // Compute an adjusted earliestStart for this resource
       // (we need this to provision some space for the ORDER contracts)
-      long stageArrivalTime = reservation.getArrival();
-      if (jobType == ReservationRequestInterpreter.R_ORDER
-          || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
-        stageArrivalTime =
-            computeEarliestStartingTime(plan, reservation, index,
-                currentReservationStage, stageDeadline);
-      }
-      stageArrivalTime = stepRoundUp(stageArrivalTime, step);
-      stageArrivalTime = Math.max(stageArrivalTime, reservation.getArrival());
 
+      if (allocateLeft) {
+        stageArrivalTime = predecessorEndTime;
+      } else {
+        stageArrivalTime = reservation.getArrival();
+        if (jobType == ReservationRequestInterpreter.R_ORDER
+            || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
+          stageArrivalTime =
+              computeEarliestStartingTime(plan, reservation,
+                  stageProvider.getCurrentIndex(), currentReservationStage,
+                  stageDeadline);
+        }
+        stageArrivalTime = stepRoundUp(stageArrivalTime, step);
+        stageArrivalTime = Math.max(stageArrivalTime, reservation.getArrival());
+      }
       // Compute the allocation of a single stage
       Map<ReservationInterval, Resource> curAlloc =
           computeStageAllocation(plan, currentReservationStage,
@@ -155,7 +148,7 @@ public class IterativePlanner extends PlanningAlgorithm {
         }
 
         // Otherwise, the job cannot be allocated
-        return null;
+        throw new PlanningException("The request cannot be satisfied");
 
       }
 
@@ -177,33 +170,41 @@ public class IterativePlanner extends PlanningAlgorithm {
       if (jobType == ReservationRequestInterpreter.R_ORDER
           || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
 
+        // CHECK ORDER_NO_GAP
         // Verify that there is no gap, in case the job is ORDER_NO_GAP
+        // note that the test is different left-to-right and right-to-left
         if (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP
             && successorStartingTime != -1
-            && successorStartingTime > stageEndTime) {
-
-          return null;
-
+            && ((allocateLeft && predecessorEndTime < stageStartTime) ||
+                (!allocateLeft && (stageEndTime < successorStartingTime))
+               )
+            || (!isNonPreemptiveAllocation(curAlloc))) {
+          throw new PlanningException(
+              "The allocation found does not respect ORDER_NO_GAP");
         }
 
-        // Store the stageStartTime and set the new stageDeadline
-        successorStartingTime = stageStartTime;
-        stageDeadline = stageStartTime;
-
+        if (allocateLeft) {
+          // Store the stageStartTime and set the new stageDeadline
+          predecessorEndTime = stageEndTime;
+        } else {
+          // Store the stageStartTime and set the new stageDeadline
+          successorStartingTime = stageStartTime;
+          stageDeadline = stageStartTime;
+        }
       }
-
     }
 
     // If the allocation is empty, return an error
     if (allocations.isEmpty()) {
-      return null;
+      throw new PlanningException("The request cannot be satisfied");
     }
 
     return allocations;
 
   }
 
-  protected void initialize(Plan plan, ReservationDefinition reservation) {
+  protected void initialize(Plan plan, ReservationId reservationId,
+      ReservationDefinition reservation) throws PlanningException {
 
     // Get plan step & capacity
     capacity = plan.getTotalCapacity();
@@ -214,13 +215,26 @@ public class IterativePlanner extends PlanningAlgorithm {
     jobArrival = stepRoundUp(reservation.getArrival(), step);
     jobDeadline = stepRoundDown(reservation.getDeadline(), step);
 
-    // Dirty read of plan load
-    planLoads = getAllLoadsInInterval(plan, jobArrival, jobDeadline);
-
     // Initialize the plan modifications
     planModifications =
         new RLESparseResourceAllocation(plan.getResourceCalculator());
 
+    // Dirty read of plan load
+
+    // planLoads are not used by other StageAllocators... and don't deal
+    // well with huge reservation ranges
+    if (this.algStageAllocator instanceof StageAllocatorLowCostAligned) {
+      planLoads = getAllLoadsInInterval(plan, jobArrival, jobDeadline);
+      ReservationAllocation oldRes = plan.getReservationById(reservationId);
+      if (oldRes != null) {
+        planModifications =
+            RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
+                plan.getTotalCapacity(), planModifications,
+                oldRes.getResourcesOverTime(), RLEOperator.subtract,
+                jobArrival, jobDeadline);
+      }
+    }
+
   }
 
   private Map<Long, Resource> getAllLoadsInInterval(Plan plan, long startTime,
@@ -240,32 +254,6 @@ public class IterativePlanner extends PlanningAlgorithm {
 
   }
 
-  private void ignoreOldAllocation(ReservationAllocation oldReservation) {
-
-    // If there is no old reservation, return
-    if (oldReservation == null) {
-      return;
-    }
-
-    // Subtract each allocation interval from the planModifications
-    for (Entry<ReservationInterval, Resource> entry : oldReservation
-        .getAllocationRequests().entrySet()) {
-
-      // Read the entry
-      ReservationInterval interval = entry.getKey();
-      Resource resource = entry.getValue();
-
-      // Find the actual request
-      Resource negativeResource = Resources.multiply(resource, -1);
-
-      // Insert it into planModifications as a 'negative' request, to
-      // represent available resources
-      planModifications.addInterval(interval, negativeResource);
-
-    }
-
-  }
-
   private void validateInputStage(Plan plan, ReservationRequest rr)
       throws ContractValidationException {
 
@@ -291,13 +279,56 @@ public class IterativePlanner extends PlanningAlgorithm {
         rr.getCapability(), plan.getMaximumAllocation())) {
 
       throw new ContractValidationException(
-          "Individual capability requests should not exceed cluster's " +
-          "maxAlloc");
+          "Individual capability requests should not exceed cluster's "
+              + "maxAlloc");
 
     }
 
   }
 
+  private boolean isNonPreemptiveAllocation(
+      Map<ReservationInterval, Resource> curAlloc) {
+
+    // Checks whether a stage allocation is non preemptive or not.
+    // Assumption: the intervals are non-intersecting (as returned by
+    // computeStageAllocation()).
+    // For a non-preemptive allocation, only two end points appear exactly once
+
+    Set<Long> endPoints = new HashSet<Long>(2 * curAlloc.size());
+    for (Entry<ReservationInterval, Resource> entry : curAlloc.entrySet()) {
+
+      ReservationInterval interval = entry.getKey();
+      Resource resource = entry.getValue();
+
+      // Ignore intervals with no allocation
+      if (Resources.equals(resource, Resource.newInstance(0, 0))) {
+        continue;
+      }
+
+      // Get endpoints
+      Long left = interval.getStartTime();
+      Long right = interval.getEndTime();
+
+      // Add left endpoint if we haven't seen it before, remove otherwise
+      if (!endPoints.contains(left)) {
+        endPoints.add(left);
+      } else {
+        endPoints.remove(left);
+      }
+
+      // Add right endpoint if we haven't seen it before, remove otherwise
+      if (!endPoints.contains(right)) {
+        endPoints.add(right);
+      } else {
+        endPoints.remove(right);
+      }
+    }
+
+    // Non-preemptive only if endPoints is of size 2
+    return (endPoints.size() == 2);
+
+  }
+
   // Call algEarliestStartTime()
   protected long computeEarliestStartingTime(Plan plan,
       ReservationDefinition reservation, int index,
@@ -335,4 +366,60 @@ public class IterativePlanner extends PlanningAlgorithm {
 
   }
 
+  /**
+   * Helper class that provide a list of ReservationRequests and iterates
+   * forward or backward depending whether we are allocating left-to-right or
+   * right-to-left.
+   */
+  public static class StageProvider {
+
+    private final boolean allocateLeft;
+
+    private ListIterator<ReservationRequest> li;
+
+    public StageProvider(boolean allocateLeft,
+        ReservationDefinition reservation) {
+
+      this.allocateLeft = allocateLeft;
+      int startingIndex;
+      if (allocateLeft) {
+        startingIndex = 0;
+      } else {
+        startingIndex =
+            reservation.getReservationRequests().getReservationResources()
+                .size();
+      }
+      // Get a reverse iterator for the set of stages
+      li =
+          reservation.getReservationRequests().getReservationResources()
+              .listIterator(startingIndex);
+
+    }
+
+    public boolean hasNext() {
+      if (allocateLeft) {
+        return li.hasNext();
+      } else {
+        return li.hasPrevious();
+      }
+    }
+
+    public ReservationRequest next() {
+      if (allocateLeft) {
+        return li.next();
+      } else {
+        return li.previous();
+      }
+    }
+
+    public int getCurrentIndex() {
+      if (allocateLeft) {
+        return li.nextIndex() - 1;
+      } else {
+        return li.previousIndex() + 1;
+      }
+    }
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cf5c41a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
new file mode 100644
index 0000000..c5a3192
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * Computes the stage allocation according to the greedy allocation rule. The
+ * greedy rule repeatedly allocates requested containers at the leftmost or
+ * rightmost possible interval. This implementation leverages the
+ * run-length-encoding of the time-series we operate on and proceed more quickly
+ * than the baseline.
+ */
+
+public class StageAllocatorGreedyRLE implements StageAllocator {
+
+  private final boolean allocateLeft;
+
+  public StageAllocatorGreedyRLE(boolean allocateLeft) {
+    this.allocateLeft = allocateLeft;
+  }
+
+  @Override
+  public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
+      Map<Long, Resource> planLoads,
+      RLESparseResourceAllocation planModifications, ReservationRequest rr,
+      long stageEarliestStart, long stageDeadline, String user,
+      ReservationId oldId) throws PlanningException {
+
+    // abort early if the interval is not satisfiable
+    if (stageEarliestStart + rr.getDuration() > stageDeadline) {
+      return null;
+    }
+
+    Map<ReservationInterval, Resource> allocationRequests =
+        new HashMap<ReservationInterval, Resource>();
+
+    Resource totalCapacity = plan.getTotalCapacity();
+
+    // compute the gang as a resource and get the duration
+    Resource sizeOfGang =
+        Resources.multiply(rr.getCapability(), rr.getConcurrency());
+    long dur = rr.getDuration();
+    long step = plan.getStep();
+
+    // ceil the duration to the next multiple of the plan step
+    if (dur % step != 0) {
+      dur += (step - (dur % step));
+    }
+
+    // we know for sure that this division has no remainder (part of contract
+    // with user, validate before
+    int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();
+
+    // get available resources from plan
+    RLESparseResourceAllocation netRLERes =
+        plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart,
+            stageDeadline);
+
+    // remove plan modifications
+    netRLERes =
+        RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
+            totalCapacity, netRLERes, planModifications, RLEOperator.subtract,
+            stageEarliestStart, stageDeadline);
+
+    // loop trying to place until we are done, or we are considering
+    // an invalid range of times
+    while (gangsToPlace > 0 && stageEarliestStart + dur <= stageDeadline) {
+
+      // as we run along we remember how many gangs we can fit, and what
+      // was the most constraining moment in time (we will restart just
+      // after that to place the next batch)
+      int maxGang = gangsToPlace;
+      long minPoint = -1;
+
+      // focus our attention to a time-range under consideration
+      NavigableMap<Long, Resource> partialMap =
+          netRLERes.getRangeOverlapping(stageEarliestStart, stageDeadline)
+              .getCumulative();
+
+      // revert the map for right-to-left allocation
+      if (!allocateLeft) {
+        partialMap = partialMap.descendingMap();
+      }
+
+      Iterator<Entry<Long, Resource>> netIt = partialMap.entrySet().iterator();
+
+      long oldT = stageDeadline;
+
+      // internal loop, tries to allocate as many gang as possible starting
+      // at a given point in time, if it fails we move to the next time
+      // interval (with outside loop)
+      while (maxGang > 0 && netIt.hasNext()) {
+
+        long t;
+        Resource curAvailRes;
+
+        Entry<Long, Resource> e = netIt.next();
+        if (allocateLeft) {
+          t = Math.max(e.getKey(), stageEarliestStart);
+          curAvailRes = e.getValue();
+        } else {
+          t = oldT;
+          oldT = e.getKey();
+          //attention: higher means lower, because we reversed the map direction
+          curAvailRes = partialMap.higherEntry(t).getValue();
+        }
+
+        // check exit/skip conditions/
+        if (curAvailRes == null) {
+          //skip undefined regions (should not happen beside borders)
+          continue;
+        }
+        if (exitCondition(t, stageEarliestStart, stageDeadline, dur)) {
+          break;
+        }
+
+        // compute maximum number of gangs we could fit
+        int curMaxGang =
+            (int) Math.floor(Resources.divide(plan.getResourceCalculator(),
+                totalCapacity, curAvailRes, sizeOfGang));
+        curMaxGang = Math.min(gangsToPlace, curMaxGang);
+
+        // compare with previous max, and set it. also remember *where* we found
+        // the minimum (useful for next attempts)
+        if (curMaxGang <= maxGang) {
+          maxGang = curMaxGang;
+          minPoint = t;
+        }
+      }
+
+      // update data structures that retain the progress made so far
+      gangsToPlace =
+          trackProgress(planModifications, rr, stageEarliestStart,
+              stageDeadline, allocationRequests, dur, gangsToPlace, maxGang);
+
+      // reset the next range of time-intervals to deal with
+      if (allocateLeft) {
+        // set earliest start to the min of the constraining "range" or my the
+        // end of this allocation
+        stageEarliestStart =
+            Math.min(partialMap.higherKey(minPoint), stageEarliestStart + dur);
+      } else {
+        // same as above moving right-to-left
+        stageDeadline =
+            Math.max(partialMap.higherKey(minPoint), stageDeadline - dur);
+      }
+    }
+
+    // if no gangs are left to place we succeed and return the allocation
+    if (gangsToPlace == 0) {
+      return allocationRequests;
+    } else {
+      // If we are here is because we did not manage to satisfy this request.
+      // So we need to remove unwanted side-effect from tempAssigned (needed
+      // for ANY).
+      for (Map.Entry<ReservationInterval, Resource> tempAllocation :
+          allocationRequests.entrySet()) {
+        planModifications.removeInterval(tempAllocation.getKey(),
+            tempAllocation.getValue());
+      }
+      // and return null to signal failure in this allocation
+      return null;
+    }
+
+  }
+
+  private int trackProgress(RLESparseResourceAllocation planModifications,
+      ReservationRequest rr, long stageEarliestStart, long stageDeadline,
+      Map<ReservationInterval, Resource> allocationRequests, long dur,
+      int gangsToPlace, int maxGang) {
+    // if we were able to place any gang, record this, and decrement
+    // gangsToPlace
+    if (maxGang > 0) {
+      gangsToPlace -= maxGang;
+
+      ReservationInterval reservationInt =
+          computeReservationInterval(stageEarliestStart, stageDeadline, dur);
+      Resource reservationRes =
+          Resources.multiply(rr.getCapability(), rr.getConcurrency() * maxGang);
+      // remember occupied space (plan is read-only till we find a plausible
+      // allocation for the entire request). This is needed since we might be
+      // placing other ReservationRequest within the same
+      // ReservationDefinition,
+      // and we must avoid double-counting the available resources
+      planModifications.addInterval(reservationInt, reservationRes);
+      allocationRequests.put(reservationInt, reservationRes);
+
+    }
+    return gangsToPlace;
+  }
+
+  private ReservationInterval computeReservationInterval(
+      long stageEarliestStart, long stageDeadline, long dur) {
+    ReservationInterval reservationInt;
+    if (allocateLeft) {
+      reservationInt =
+          new ReservationInterval(stageEarliestStart, stageEarliestStart + dur);
+    } else {
+      reservationInt =
+          new ReservationInterval(stageDeadline - dur, stageDeadline);
+    }
+    return reservationInt;
+  }
+
+
+  private boolean exitCondition(long t, long stageEarliestStart,
+      long stageDeadline, long dur) {
+    if (allocateLeft) {
+      return t >= stageEarliestStart + dur;
+    } else {
+      return t < stageDeadline - dur;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cf5c41a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
index 0aedc6a..4aef7ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
@@ -19,9 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anySetOf;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 import java.io.FileWriter;
 import java.io.IOException;
@@ -75,12 +73,14 @@ public class ReservationSystemTestUtil {
   public static ReservationSchedulerConfiguration createConf(
       String reservationQ, long timeWindow, float instConstraint,
       float avgConstraint) {
-    ReservationSchedulerConfiguration conf =
-        mock(ReservationSchedulerConfiguration.class);
+
+    ReservationSchedulerConfiguration realConf = new CapacitySchedulerConfiguration();
+    ReservationSchedulerConfiguration conf = spy(realConf);
     when(conf.getReservationWindow(reservationQ)).thenReturn(timeWindow);
     when(conf.getInstantaneousMaxCapacity(reservationQ))
         .thenReturn(instConstraint);
     when(conf.getAverageCapacity(reservationQ)).thenReturn(avgConstraint);
+
     return conf;
   }
 
@@ -177,10 +177,15 @@ public class ReservationSystemTestUtil {
 
   public static ReservationDefinition createSimpleReservationDefinition(
       long arrival, long deadline, long duration) {
+    return createSimpleReservationDefinition(arrival, deadline, duration, 1);
+  }
+
+  public static ReservationDefinition createSimpleReservationDefinition(
+      long arrival, long deadline, long duration, int parallelism) {
     // create a request with a single atomic ask
     ReservationRequest r =
-        ReservationRequest.newInstance(Resource.newInstance(1024, 1), 1, 1,
-            duration);
+        ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+            parallelism, parallelism, duration);
     ReservationDefinition rDef = new ReservationDefinitionPBImpl();
     ReservationRequests reqs = new ReservationRequestsPBImpl();
     reqs.setReservationResources(Collections.singletonList(r));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cf5c41a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
index f81e7ec..b8a618b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
@@ -26,6 +26,8 @@ import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -55,8 +57,12 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 import org.mortbay.log.Log;
 
+@RunWith(Parameterized.class)
 public class TestGreedyReservationAgent {
 
   ReservationAgent agent;
@@ -66,6 +72,17 @@ public class TestGreedyReservationAgent {
   Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
   Random rand = new Random();
   long step;
+  boolean allocateLeft;
+
+  public TestGreedyReservationAgent(Boolean b){
+    this.allocateLeft = b;
+  }
+
+  @Parameters
+  public static Collection<Object[]> data() {
+      return Arrays.asList(new Object[][] {
+               {true}, {false}});
+  }
 
   @Before
   public void setup() throws Exception {
@@ -90,7 +107,11 @@ public class TestGreedyReservationAgent {
     CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
     policy.init(reservationQ, conf);
 
-    agent = new GreedyReservationAgent();
+    // setting conf to
+    conf.setBoolean(GreedyReservationAgent.GREEDY_FAVOR_EARLY_ALLOCATION,
+        allocateLeft);
+
+    agent = new GreedyReservationAgent(conf);
 
     QueueMetrics queueMetrics = mock(QueueMetrics.class);
     RMContext context = ReservationSystemTestUtil.createMockRMContext();
@@ -130,13 +151,21 @@ public class TestGreedyReservationAgent {
     System.out.println(plan.toString());
     System.out.println(plan.toCumulativeString());
 
-    for (long i = 10 * step; i < 20 * step; i++) {
-      assertTrue(
-          "Agent-based allocation unexpected",
-          Resources.equals(cs.getResourcesAtTime(i),
-              Resource.newInstance(2048 * 10, 2 * 10)));
+    if(allocateLeft){
+      for (long i = 5 * step; i < 15 * step; i++) {
+        assertTrue(
+            "Agent-based allocation unexpected",
+            Resources.equals(cs.getResourcesAtTime(i),
+                Resource.newInstance(2048 * 10, 2 * 10)));
+      }
+    } else {
+      for (long i = 10 * step; i < 20 * step; i++) {
+        assertTrue(
+            "Agent-based allocation unexpected",
+            Resources.equals(cs.getResourcesAtTime(i),
+                Resource.newInstance(2048 * 10, 2 * 10)));
+      }
     }
-
   }
 
   @SuppressWarnings("javadoc")
@@ -212,18 +241,33 @@ public class TestGreedyReservationAgent {
     System.out.println(plan.toString());
     System.out.println(plan.toCumulativeString());
 
-    for (long i = 90 * step; i < 100 * step; i++) {
-      assertTrue(
-          "Agent-based allocation unexpected",
-          Resources.equals(cs.getResourcesAtTime(i),
-              Resource.newInstance(2048 * 20, 2 * 20)));
-    }
-    // RR2 is pushed out by the presence of RR
-    for (long i = 80 * step; i < 90 * step; i++) {
-      assertTrue(
-          "Agent-based allocation unexpected",
-          Resources.equals(cs2.getResourcesAtTime(i),
-              Resource.newInstance(2048 * 20, 2 * 20)));
+    if (allocateLeft) {
+      for (long i = 5 * step; i < 15 * step; i++) {
+        assertTrue(
+            "Agent-based allocation unexpected",
+            Resources.equals(cs.getResourcesAtTime(i),
+                Resource.newInstance(2048 * 20, 2 * 20)));
+      }
+      for (long i = 15 * step; i < 25 * step; i++) {
+        // RR2 is pushed out by the presence of RR
+        assertTrue(
+            "Agent-based allocation unexpected",
+            Resources.equals(cs2.getResourcesAtTime(i),
+                Resource.newInstance(2048 * 20, 2 * 20)));
+      }
+    } else {
+      for (long i = 90 * step; i < 100 * step; i++) {
+        assertTrue(
+            "Agent-based allocation unexpected",
+            Resources.equals(cs.getResourcesAtTime(i),
+                Resource.newInstance(2048 * 20, 2 * 20)));
+      }
+      for (long i = 80 * step; i < 90 * step; i++) {
+        assertTrue(
+            "Agent-based allocation unexpected",
+            Resources.equals(cs2.getResourcesAtTime(i),
+                Resource.newInstance(2048 * 20, 2 * 20)));
+      }
     }
   }
 
@@ -274,10 +318,18 @@ public class TestGreedyReservationAgent {
 
     ReservationAllocation cs = plan.getReservationById(reservationID);
 
-    assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1));
-    assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
-    assertTrue(cs.toString(), check(cs, 40 * step, 50 * step, 20, 1024, 1));
-    assertTrue(cs.toString(), check(cs, 50 * step, 70 * step, 10, 1024, 1));
+    if (allocateLeft) {
+      assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1));
+      assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
+      assertTrue(cs.toString(), check(cs, 32 * step, 42 * step, 20, 1024, 1));
+      assertTrue(cs.toString(), check(cs, 42 * step, 62 * step, 10, 1024, 1));
+
+    } else {
+      assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1));
+      assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
+      assertTrue(cs.toString(), check(cs, 40 * step, 50 * step, 20, 1024, 1));
+      assertTrue(cs.toString(), check(cs, 50 * step, 70 * step, 10, 1024, 1));
+    }
     System.out.println("--------AFTER ORDER ALLOCATION (queue: "
         + reservationID + ")----------");
     System.out.println(plan.toString());
@@ -466,7 +518,12 @@ public class TestGreedyReservationAgent {
 
     ReservationAllocation cs = plan.getReservationById(reservationID);
 
-    assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 20, 1024, 1));
+    if (allocateLeft) {
+      assertTrue(cs.toString(), check(cs, 100 * step, 110 * step, 5, 1024, 1));
+    } else {
+      assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 20, 1024, 1));
+    }
+
     System.out.println("--------AFTER ANY ALLOCATION (queue: " + reservationID
         + ")----------");
     System.out.println(plan.toString());
@@ -551,8 +608,13 @@ public class TestGreedyReservationAgent {
 
     ReservationAllocation cs = plan.getReservationById(reservationID);
 
-    assertTrue(cs.toString(), check(cs, 100 * step, 110 * step, 20, 1024, 1));
-    assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 25, 1024, 1));
+    if (allocateLeft) {
+      assertTrue(cs.toString(), check(cs, 100 * step, 110 * step, 25, 1024, 1));
+      assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 20, 1024, 1));
+    } else {
+      assertTrue(cs.toString(), check(cs, 100 * step, 110 * step, 20, 1024, 1));
+      assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 25, 1024, 1));
+    }
 
     System.out.println("--------AFTER ALL ALLOCATION (queue: " + reservationID
         + ")----------");
@@ -695,14 +757,18 @@ public class TestGreedyReservationAgent {
 
   public static void main(String[] arg) {
 
+    boolean left = false;
     // run a stress test with by default 1000 random jobs
     int numJobs = 1000;
     if (arg.length > 0) {
       numJobs = Integer.parseInt(arg[0]);
     }
+    if (arg.length > 1) {
+      left = Boolean.parseBoolean(arg[1]);
+    }
 
     try {
-      TestGreedyReservationAgent test = new TestGreedyReservationAgent();
+      TestGreedyReservationAgent test = new TestGreedyReservationAgent(left);
       test.setup();
       test.testStress(numJobs);
     } catch (Exception e) {