You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2009/10/18 21:27:15 UTC

svn commit: r826503 - in /hadoop/mapreduce/trunk: CHANGES.txt src/tools/org/apache/hadoop/tools/rumen/CDFRandomGenerator.java src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java

Author: cdouglas
Date: Sun Oct 18 19:27:15 2009
New Revision: 826503

URL: http://svn.apache.org/viewvc?rev=826503&view=rev
Log:
MAPREDUCE-931. Use built-in interpolation classes for making up task
runtimes in Rumen. Contributed by Dick King

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/CDFRandomGenerator.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=826503&r1=826502&r2=826503&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Sun Oct 18 19:27:15 2009
@@ -443,6 +443,9 @@
 
     MAPREDUCE-1063. Document gridmix benchmark. (cdouglas)
 
+    MAPREDUCE-931. Use built-in interpolation classes for making up task
+    runtimes in Rumen. (Dick King via cdouglas)
+
   BUG FIXES
 
     MAPREDUCE-878. Rename fair scheduler design doc to 

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/CDFRandomGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/CDFRandomGenerator.java?rev=826503&r1=826502&r2=826503&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/CDFRandomGenerator.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/CDFRandomGenerator.java Sun Oct 18 19:27:15 2009
@@ -39,16 +39,17 @@
   final Random random;
 
   CDFRandomGenerator(LoggedDiscreteCDF cdf) {
-    random = new Random();
-    rankings = new double[(int) cdf.getNumberValues() + 2];
-    values = new long[(int) cdf.getNumberValues() + 2];
-    initializeTables(cdf);
+    this(cdf, new Random());
   }
 
   CDFRandomGenerator(LoggedDiscreteCDF cdf, long seed) {
-    random = new Random(seed);
-    rankings = new double[(int) cdf.getNumberValues() + 2];
-    values = new long[(int) cdf.getNumberValues() + 2];
+    this(cdf, new Random(seed));
+  }
+
+  private CDFRandomGenerator(LoggedDiscreteCDF cdf, Random random) {
+    this.random = random;
+    rankings = new double[cdf.getRankings().size() + 2];
+    values = new long[cdf.getRankings().size() + 2];
     initializeTables(cdf);
   }
 

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java?rev=826503&r1=826502&r2=826503&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java Sun Oct 18 19:27:15 2009
@@ -56,6 +56,12 @@
   private final ClusterStory cluster;
   private JobConf jobConf;
 
+  private long seed;
+  private boolean hasRandomSeed = false;
+
+  private Map<LoggedDiscreteCDF, CDFRandomGenerator> interpolatorMap =
+      new HashMap<LoggedDiscreteCDF, CDFRandomGenerator>();
+
   // TODO: Fix ZombieJob to initialize this correctly from observed data
   double rackLocalOverNodeLocal = 1.5;
   double rackRemoteOverNodeLocal = 3.0;
@@ -80,6 +86,8 @@
     this.job = job;
     this.cluster = cluster;
     random = new Random(seed);
+    this.seed = seed;
+    hasRandomSeed = true;
   }
 
   /**
@@ -173,20 +181,23 @@
       }
 
       int avgHostPerSplit;
-      if (splitsList.size() == 0) avgHostPerSplit = 3;
-      else {
+      if (splitsList.size() == 0) {
+        avgHostPerSplit = 3;
+      } else {
         avgHostPerSplit = totalHosts / splitsList.size();
-        if (avgHostPerSplit == 0) avgHostPerSplit = 3;
+        if (avgHostPerSplit == 0) {
+          avgHostPerSplit = 3;
+        }
       }
-      
+
       for (int i = splitsList.size(); i < totalMaps; i++) {
         if (cluster == null) {
           splitsList.add(new FileSplit(emptyPath, 0, 0, new String[0]));
         } else {
           MachineNode[] mNodes = cluster.getRandomMachines(avgHostPerSplit);
           String[] hosts = new String[mNodes.length];
-          for (int j=0; j<hosts.length; ++j) {
-            hosts[j]=mNodes[j].getName();
+          for (int j = 0; j < hosts.length; ++j) {
+            hosts[j] = mNodes[j].getName();
           }
           // TODO set size of a split to 0 now.
           splitsList.add(new FileSplit(emptyPath, 0, 0, hosts));
@@ -286,9 +297,10 @@
         taskId.getId(), attemptId.getId());
   }
 
-
   private LoggedTask sanitizeLoggedTask(LoggedTask task) {
-    if (task == null) return null;
+    if (task == null) {
+      return null;
+    }
     if (task.getTaskType() == null) {
       LOG.warn("Task " + task.getTaskID() + " has nulll TaskType");
       return null;
@@ -299,17 +311,19 @@
     }
     return task;
   }
-  
+
   private LoggedTaskAttempt sanitizeLoggedTaskAttempt(LoggedTaskAttempt attempt) {
-    if (attempt == null) return null;
+    if (attempt == null) {
+      return null;
+    }
     if (attempt.getResult() == null) {
       LOG.warn("TaskAttempt " + attempt.getResult() + " has nulll Result");
       return null;
     }
-    
+
     return attempt;
   }
-  
+
   /**
    * Build task mapping and task attempt mapping, to be later used to find
    * information of a particular {@link TaskID} or {@link TaskAttemptID}.
@@ -512,9 +526,13 @@
   private int getLocality(LoggedTask loggedTask, LoggedTaskAttempt loggedAttempt) {
     int distance = cluster.getMaximumDistance();
     String rackHostName = loggedAttempt.getHostName();
-    if (rackHostName == null) return distance;
+    if (rackHostName == null) {
+      return distance;
+    }
     MachineNode mn = getMachineNode(rackHostName);
-    if (mn == null) return distance;
+    if (mn == null) {
+      return distance;
+    }
     List<LoggedLocation> locations = loggedTask.getPreferredLocations();
     if (locations != null) {
       for (LoggedLocation location : locations) {
@@ -696,40 +714,33 @@
             "state is neither SUCCEEDED nor FAILED: " + state);
       }
       return reduceTime;
-    } catch (IllegalArgumentException e) {
-      if (e.getMessage().startsWith("no value to use to make up runtime")) {
-        return 0;
-      }
-      throw e;
+    } catch (NoValueToMakeUpRuntime e) {
+      return 0;
     }
   }
 
   private long makeUpMapRuntime(State state, int locality) {
     long runtime;
     // make up runtime
-    if (state == State.SUCCEEDED) {
-      /**
-       * MapCDFs is a ArrayList of 4 possible groups: distance=0, 1, 2, and
-       * the last group is "distance cannot be determined". All pig jobs would
-       * have only the 4th group, and pig tasks usually do not have any
-       * locality, so this group should count as "distance=2". However,
-       * setup/cleanup tasks are also counted in the 4th group. These tasks do
-       * not make sense.
-       */
-      runtime = makeUpRuntime(job.getSuccessfulMapAttemptCDFs().get(locality));
-      if (runtime < 0) {
-        runtime = makeUpRuntime(job.getSuccessfulMapAttemptCDFs());
-      }
-    } else if (state == State.FAILED) {
-      runtime = makeUpRuntime(job.getFailedMapAttemptCDFs().get(locality));
-      if (runtime < 0) {
-        runtime = makeUpRuntime(job.getFailedMapAttemptCDFs());
-      }  
+    if (state == State.SUCCEEDED || state == State.FAILED) {
+      List<LoggedDiscreteCDF> cdfList =
+          state == State.SUCCEEDED ? job.getSuccessfulMapAttemptCDFs() : job
+              .getFailedMapAttemptCDFs();
+      // XXX MapCDFs is a ArrayList of 4 possible groups: distance=0, 1, 2, and
+      // the last group is "distance cannot be determined". All pig jobs
+      // would have only the 4th group, and pig tasks usually do not have
+      // any locality, so this group should count as "distance=2".
+      // However, setup/cleanup tasks are also counted in the 4th group.
+      // These tasks do not make sense.
+      try {
+        runtime = makeUpRuntime(cdfList.get(locality));
+      } catch (NoValueToMakeUpRuntime e) {
+        runtime = makeUpRuntime(cdfList);
+      }
     } else {
       throw new IllegalArgumentException(
           "state is neither SUCCEEDED nor FAILED: " + state);
     }
-    
     return runtime;
   }
 
@@ -741,7 +752,7 @@
    *          A list of CDFs for the distribution of runtime for the 1st, 2nd,
    *          ... map attempts for the job.
    */
-  private long makeUpRuntime(ArrayList<LoggedDiscreteCDF> mapAttemptCDFs) {
+  private long makeUpRuntime(List<LoggedDiscreteCDF> mapAttemptCDFs) {
     int total = 0;
     for (LoggedDiscreteCDF cdf : mapAttemptCDFs) {
       total += cdf.getNumberValues();
@@ -763,39 +774,72 @@
     throw new IllegalStateException("not possible to get here");
   }
 
-  // return -1 if we fail to makeup runtime with available info.
   private long makeUpRuntime(LoggedDiscreteCDF loggedDiscreteCDF) {
-    ArrayList<LoggedSingleRelativeRanking> rankings = new ArrayList<LoggedSingleRelativeRanking>(
-        loggedDiscreteCDF.getRankings());
-    if (loggedDiscreteCDF.getNumberValues() == 0) {
-      return -1;
+    /*
+     * We need this odd-looking code because if a seed exists we need to ensure
+     * that only one interpolator is generated per LoggedDiscreteCDF, but if no
+     * seed exists then the potentially lengthy process of making an
+     * interpolator can happen outside the lock. makeUpRuntimeCore only locks
+     * around the two hash map accesses.
+     */
+    if (hasRandomSeed) {
+      synchronized (interpolatorMap) {
+        return makeUpRuntimeCore(loggedDiscreteCDF);
+      }
     }
 
-    LoggedSingleRelativeRanking ranking = new LoggedSingleRelativeRanking();
-    ranking.setDatum(loggedDiscreteCDF.getMaximum());
-    ranking.setRelativeRanking(1.0);
-    rankings.add(ranking);
-
-    ranking = new LoggedSingleRelativeRanking();
-    ranking.setDatum(loggedDiscreteCDF.getMinimum());
-    ranking.setRelativeRanking(0.0);
-    rankings.add(0, ranking);
-
-    double r = random.nextDouble();
-    LoggedSingleRelativeRanking prevRanking = rankings.get(0);
-    for (LoggedSingleRelativeRanking ranking2 : rankings) {
-      double r2 = ranking2.getRelativeRanking();
-      if (r < r2) {
-        double r1 = prevRanking.getRelativeRanking();
-        double f1 = prevRanking.getDatum();
-        double f2 = ranking2.getDatum();
-        double runtime = (r - r1) / (r2 - r1) * (f2 - f1) + f1;
-        return (long) runtime;
+    return makeUpRuntimeCore(loggedDiscreteCDF);
+  }
+
+  private long makeUpRuntimeCore(LoggedDiscreteCDF loggedDiscreteCDF) {
+    CDFRandomGenerator interpolator;
+
+    synchronized (interpolatorMap) {
+      interpolator = interpolatorMap.get(loggedDiscreteCDF);
+    }
+
+    if (interpolator == null) {
+      if (loggedDiscreteCDF.getNumberValues() == 0) {
+        throw new NoValueToMakeUpRuntime("no value to use to make up runtime");
+      }
+
+      interpolator =
+          hasRandomSeed ? new CDFPiecewiseLinearRandomGenerator(
+              loggedDiscreteCDF, ++seed)
+              : new CDFPiecewiseLinearRandomGenerator(loggedDiscreteCDF);
+
+      /*
+       * It doesn't matter if we compute and store an interpolator twice because
+       * the two instances will be semantically identical and stateless, unless
+       * we're seeded, in which case we're not stateless but this code will be
+       * called synchronizedly.
+       */
+      synchronized (interpolatorMap) {
+        interpolatorMap.put(loggedDiscreteCDF, interpolator);
       }
-      prevRanking = ranking2;
     }
 
-    return rankings.get(rankings.size() - 1).getDatum();
+    return interpolator.randomValue();
+  }
+
+  static private class NoValueToMakeUpRuntime extends IllegalArgumentException {
+    static final long serialVersionUID = 1L;
+
+    NoValueToMakeUpRuntime() {
+      super();
+    }
+
+    NoValueToMakeUpRuntime(String detailMessage) {
+      super(detailMessage);
+    }
+
+    NoValueToMakeUpRuntime(String detailMessage, Throwable cause) {
+      super(detailMessage, cause);
+    }
+
+    NoValueToMakeUpRuntime(Throwable cause) {
+      super(cause);
+    }
   }
 
   private State makeUpState(int taskAttemptNumber, double[] numAttempts) {