You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2014/07/26 21:53:36 UTC

svn commit: r1613720 - in /hadoop/common/branches/branch-1: ./ src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/

Author: kasha
Date: Sat Jul 26 19:53:36 2014
New Revision: 1613720

URL: http://svn.apache.org/r1613720
Log:
MAPREDUCE-5966. MR1 FairScheduler use of custom weight adjuster is not thread safe for comparisons. (Anubhav Dhoot via kasha)

Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java
    hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java
    hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java
    hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java
    hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1613720&r1=1613719&r2=1613720&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Sat Jul 26 19:53:36 2014
@@ -230,6 +230,9 @@ Release 1.3.0 - unreleased
     MAPREDUCE-5979. FairScheduler: zero weight can cause sort failures. 
     (Anubhav Dhoot via kasha)
 
+    MAPREDUCE-5966. MR1 FairScheduler use of custom weight adjuster is not 
+    thread safe for comparisons. (Anubhav Dhoot via kasha)
+
 Release 1.2.2 - unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=1613720&r1=1613719&r2=1613720&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Sat Jul 26 19:53:36 2014
@@ -701,11 +701,15 @@ public class FairScheduler extends TaskS
       
       updateRunnability(); // Set job runnability based on user/pool limits 
       
-      // Update demands of jobs and pools
+      // Update demands and weights of jobs and pools
       for (Pool pool: poolMgr.getPools()) {
         pool.getMapSchedulable().updateDemand();
         pool.getReduceSchedulable().updateDemand();
+
+        pool.getMapSchedulable().updateWeight();
+        pool.getReduceSchedulable().updateWeight();
       }
+
       
       // Compute fair shares based on updated demands
       List<PoolSchedulable> mapScheds = getPoolSchedulables(TaskType.MAP);

Modified: hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java?rev=1613720&r1=1613719&r2=1613720&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java (original)
+++ hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java Sat Jul 26 19:53:36 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapred;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.mapred.FairScheduler.JobInfo;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -29,6 +30,7 @@ public class JobSchedulable extends Sche
   protected JobInProgress job;
   protected TaskType taskType;
   private int demand = 0;
+  private double weight = 1.0;
 
   public JobSchedulable(FairScheduler scheduler, JobInProgress job, 
       TaskType taskType) {
@@ -133,9 +135,14 @@ public class JobSchedulable extends Sche
   
   @Override
   public double getWeight() {
-    return scheduler.getJobWeight(job, taskType);
+    return weight;
   }
-  
+
+  @Override
+  public void updateWeight() {
+    weight = scheduler.getJobWeight(job, taskType);
+  }
+
   @Override
   public int getMinShare() {
     return 0;
@@ -181,7 +188,6 @@ public class JobSchedulable extends Sche
     }
   }
 
-  
   @Override
   protected String getMetricsContextName() {
     return "jobs";

Modified: hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java?rev=1613720&r1=1613719&r2=1613720&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java (original)
+++ hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java Sat Jul 26 19:53:36 2014
@@ -95,7 +95,18 @@ public class PoolSchedulable extends Sch
           + "; maxTasks is " + maxTasks);
     }
   }
-  
+
+  /**
+   * Ask jobs in the pool to update weights. Its own weight is computed on
+   * the fly
+   */
+  @Override
+  public void updateWeight() {
+    for (JobSchedulable sched: jobScheds) {
+      sched.updateWeight();
+    }
+  }
+
   /**
    * Distribute the pool's fair share among its jobs
    */
@@ -160,6 +171,9 @@ public class PoolSchedulable extends Sch
     } else {
       throw new RuntimeException("Unsupported pool scheduling mode " + mode);
     }
+    for (JobSchedulable sched: jobScheds) {
+      sched.updateWeight();
+    }
     Collections.sort(jobScheds, comparator);
     for (JobSchedulable sched: jobScheds) {
       Task task = sched.assignTask(tts, currentTime, visited);

Modified: hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java?rev=1613720&r1=1613719&r2=1613720&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java (original)
+++ hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java Sat Jul 26 19:53:36 2014
@@ -95,7 +95,10 @@ abstract class Schedulable {
   
   /** Refresh the Schedulable's demand and those of its children if any. */
   public abstract void updateDemand();
-  
+
+  /** Refresh the Schedulable's weight and those of its children if any */
+  public abstract void updateWeight();
+
   /** 
    * Distribute the fair share assigned to this Schedulable among its 
    * children (used in pools where the internal scheduler is fair sharing). 

Modified: hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java?rev=1613720&r1=1613719&r2=1613720&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java (original)
+++ hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java Sat Jul 26 19:53:36 2014
@@ -109,6 +109,9 @@ public class FakeSchedulable extends Sch
   public void updateDemand() {}
 
   @Override
+  public void updateWeight() {}
+
+  @Override
   public TaskType getTaskType() {
     return TaskType.MAP;
   }

Modified: hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1613720&r1=1613719&r2=1613720&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Sat Jul 26 19:53:36 2014
@@ -31,13 +31,14 @@ import java.util.IdentityHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.LinkedBlockingDeque;
 
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.mapred.FairScheduler.JobInfo;
 import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
 import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
@@ -3092,8 +3093,6 @@ public class TestFairScheduler extends T
     assertNull(scheduler.assignTasks(tracker("tt2")));
   }
 
-
-
   class TestJobSchedulableSort extends JobSchedulable {
 
     private final double testFairShare;
@@ -3154,10 +3153,6 @@ public class TestFairScheduler extends T
 
   public void testFairShareComparator()
   {
-    List<TestJobSchedulableSort> jobs = new ArrayList<TestJobSchedulableSort>();
-    final int iterations = 100;
-    int jobCount = 100;
-
     Comparator<Schedulable> comparator = new
         SchedulingAlgorithms.FairShareComparator();
 
@@ -3186,8 +3181,54 @@ public class TestFairScheduler extends T
     // s3 has a higher running task to weight ratio (infinity)
     assertTrue(comparator.compare(s1, s3) < 0);
   }
-  
-  
+
+  /**
+   * This test verifies that sorting of JobSchedulables with a custom
+   * weightadjuster that returns different values when called does not break
+   * the sorting. If the weight changes during the sort,
+   * the sort would fail in jdk7
+   */
+  public void testJobSchedulableSortingWithCustomWeightAdjuster() throws
+      IOException, InterruptedException {
+    final int iterations = 100, jobCount = 100, racks = 100, nodesPerRack = 2;
+    final int totalTaskTrackers = nodesPerRack * racks;
+
+    setUpCluster(racks, nodesPerRack, true);
+
+    scheduler.weightAdjuster = new WeightAdjuster() {
+      Random r = new Random();
+
+      @Override
+      public double adjustWeight(JobInProgress job, TaskType taskType, double
+          curWeight) {
+        return curWeight * r.nextInt(100);
+      }
+    };
+
+    for (int j = 0; j < jobCount; j++) {
+      advanceTime(100);
+      submitJob(JobStatus.RUNNING, 2 * iterations, iterations);
+      scheduler.updateMetrics();
+    }
+
+    final LinkedBlockingDeque<Task> tasks =
+        new LinkedBlockingDeque<Task>();
+
+    final String taskTrackerNamePrefix = "tt";
+
+    Random r1 = new Random();
+    for (int i = 0; i < iterations; i++) {
+
+      int randomTaskTrackerId = r1.nextInt(totalTaskTrackers) + 1;
+      String taskTrackerName = taskTrackerNamePrefix + randomTaskTrackerId;
+      List<Task> assignedTasks = scheduler.assignTasks(tracker
+          (taskTrackerName));
+      if (assignedTasks != null) {
+        tasks.addAll(assignedTasks);
+      }
+    }
+  }
+
   /**
    * Ask scheduler to update metrics and then verify that they're all
    * correctly published to the metrics context