You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by am...@apache.org on 2011/11/22 12:24:35 UTC

svn commit: r1204925 - in /hadoop/common/branches/branch-0.20-security: ./ src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ src/contrib/gridmix/src/tes...

Author: amarrk
Date: Tue Nov 22 11:24:34 2011
New Revision: 1204925

URL: http://svn.apache.org/viewvc?rev=1204925&view=rev
Log:
MAPREDUCE-3008. [Gridmix] Improve cumulative CPU usage emulation for short running tasks. (amarrk)

Modified:
    hadoop/common/branches/branch-0.20-security/CHANGES.txt
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/CumulativeCpuUsageEmulatorPlugin.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageEmulatorPlugin.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageMatcher.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/TotalHeapUsageEmulatorPlugin.java
    hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestResourceUsageEmulators.java

Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1204925&r1=1204924&r2=1204925&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Tue Nov 22 11:24:34 2011
@@ -48,6 +48,9 @@ Release 0.20.206.0 - unreleased
 
   IMPROVEMENTS
 
+    MAPREDUCE-3008. [Gridmix] Improve cumulative CPU usage emulation for 
+                    short running tasks. (amarrk)
+
     MAPREDUCE-2836. Provide option to fail jobs when submitted to
     non-existent fair scheduler pools. (Ahmed Radwan via todd)
 

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java?rev=1204925&r1=1204924&r2=1204925&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java Tue Nov 22 11:24:34 2011
@@ -17,12 +17,13 @@
  */
 package org.apache.hadoop.mapred.gridmix;
 
+import static org.apache.hadoop.mapred.Task.Counter.SPILLED_RECORDS;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TaskTracker;
 import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.ResourceUsageMatcher;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -72,7 +73,7 @@ class LoadJob extends GridmixJob {
           job.setNumReduceTasks(jobdesc.getNumberReduces());
           job.setMapOutputKeyClass(GridmixKey.class);
           job.setMapOutputValueClass(GridmixRecord.class);
-          job.setSortComparatorClass(GridmixKey.Comparator.class);
+          job.setSortComparatorClass(LoadSortComparator.class);
           job.setGroupingComparatorClass(SpecGroupingComparator.class);
           job.setInputFormatClass(LoadInputFormat.class);
           job.setOutputFormatClass(RawBytesOutputFormat.class);
@@ -95,17 +96,84 @@ class LoadJob extends GridmixJob {
   }
   
   /**
+   * This is a load matching key comparator which will make sure that the
+   * resource usage load is matched even when the framework is in control.
+   */
+  public static class LoadSortComparator extends GridmixKey.Comparator {
+    private ResourceUsageMatcherRunner matcher = null;
+    private boolean isConfigured = false;
+    
+    public LoadSortComparator() {
+      super();
+    }
+    
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      configure();
+      int ret = super.compare(b1, s1, l1, b2, s2, l2);
+      if (matcher != null) {
+        try {
+          matcher.match(); // match the resource usage now
+        } catch (Exception e) {}
+      }
+      return ret;
+    }
+    
+    //TODO Note that the sorter will be instantiated 2 times as follows
+    //       1. During the sort/spill in the map phase
+    //       2. During the merge in the sort phase
+    // We need the handle to the matcher thread only in (2).
+    // This logic can be relaxed to run only in (2).
+    private void configure() {
+      if (!isConfigured) {
+        ThreadGroup group = Thread.currentThread().getThreadGroup();
+        Thread[] threads = new Thread[group.activeCount() * 2];
+        group.enumerate(threads, true);
+        for (Thread t : threads) {
+          if (t != null && (t instanceof ResourceUsageMatcherRunner)) {
+            this.matcher = (ResourceUsageMatcherRunner) t;
+            isConfigured = true;
+            break;
+          }
+        }
+      }
+    }
+  }
+  
+  /**
    * This is a progress based resource usage matcher.
    */
   @SuppressWarnings("unchecked")
-  static class ResourceUsageMatcherRunner extends Thread {
+  static class ResourceUsageMatcherRunner extends Thread 
+  implements Progressive {
     private final ResourceUsageMatcher matcher;
-    private final Progressive progress;
+    private final BoostingProgress progress;
     private final long sleepTime;
     private static final String SLEEP_CONFIG = 
       "gridmix.emulators.resource-usage.sleep-duration";
     private static final long DEFAULT_SLEEP_TIME = 100; // 100ms
     
+    /**
+     * This is a progress bar that can be boosted for weaker use-cases.
+     */
+    private static class BoostingProgress implements Progressive {
+      private float boostValue = 0f;
+      TaskInputOutputContext context;
+      
+      BoostingProgress(TaskInputOutputContext context) {
+        this.context = context;
+      }
+      
+      void setBoostValue(float boostValue) {
+        this.boostValue = boostValue;
+      }
+      
+      @Override
+      public float getProgress() {
+        return Math.min(1f, context.getProgress() + boostValue);
+      }
+    }
+    
     ResourceUsageMatcherRunner(final TaskInputOutputContext context, 
                                ResourceUsageMetrics metrics) {
       Configuration conf = context.getConfiguration();
@@ -119,19 +187,14 @@ class LoadJob extends GridmixJob {
       
       // set the other parameters
       this.sleepTime = conf.getLong(SLEEP_CONFIG, DEFAULT_SLEEP_TIME);
-      progress = new Progressive() {
-        @Override
-        public float getProgress() {
-          return context.getProgress();
-        }
-      };
+      progress = new BoostingProgress(context);
       
       // instantiate a resource-usage-matcher
       matcher = new ResourceUsageMatcher();
       matcher.configure(conf, plugin, metrics, progress);
     }
     
-    protected void match() throws Exception {
+    protected void match() throws IOException, InterruptedException {
       // match the resource usage
       matcher.matchResourceUsage();
     }
@@ -158,21 +221,34 @@ class LoadJob extends GridmixJob {
                  + " thread! Exiting.", e);
       }
     }
+
+    @Override
+    public float getProgress() {
+      return matcher.getProgress();
+    }
+    
+    // boost the progress bar as fasten up the emulation cycles.
+    void boost(float value) {
+      progress.setBoostValue(value);
+    }
   }
   
   // Makes sure that the TaskTracker doesn't kill the map/reduce tasks while
   // they are emulating
   private static class StatusReporter extends Thread {
     private TaskInputOutputContext context;
-    StatusReporter(TaskInputOutputContext context) {
+    private final Progressive progress;
+
+    StatusReporter(TaskInputOutputContext context, Progressive progress) {
       this.context = context;
+      this.progress = progress;
     }
 
     @Override
     public void run() {
       LOG.info("Status reporter thread started.");
       try {
-        while (context.getProgress() < 1) {
+        while (!isInterrupted() && progress.getProgress() < 1) {
           // report progress
           context.progress();
 
@@ -275,7 +351,7 @@ class LoadJob extends GridmixJob {
                       split.getMapResourceUsageMetrics());
       
       // start the status reporter thread
-      reporter = new StatusReporter(ctxt);
+      reporter = new StatusReporter(ctxt, matcher);
       reporter.start();
     }
 
@@ -322,6 +398,17 @@ class LoadJob extends GridmixJob {
         }
       }
       
+      // check if the thread will get a chance to run or not
+      //  check if there will be a sort&spill->merge phase or not
+      //  check if the final sort&spill->merge phase is gonna happen or not
+      if (context.getNumReduceTasks() > 0 
+          && context.getCounter(SPILLED_RECORDS).getValue() == 0) {
+        LOG.info("Boosting the map phase progress.");
+        // add the sort phase progress to the map phase and emulate
+        matcher.boost(0.33f);
+        matcher.match();
+      }
+      
       // start the matcher thread since the map phase ends here
       matcher.start();
     }
@@ -390,7 +477,7 @@ class LoadJob extends GridmixJob {
       matcher = new ResourceUsageMatcherRunner(context, metrics);
       
       // start the status reporter thread
-      reporter = new StatusReporter(context);
+      reporter = new StatusReporter(context, matcher);
       reporter.start();
     }
     @Override
@@ -524,8 +611,12 @@ class LoadJob extends GridmixJob {
         specRecords[j] = info.getOutputRecords();
         metrics[j] = info.getResourceUsageMetrics();
         if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i,
-              i + j * maps, info.getOutputRecords(), info.getOutputBytes()));
+          LOG.debug(String.format("SPEC(%d) %d -> %d %d %d %d %d %d %d", id(), 
+              i, i + j * maps, info.getOutputRecords(), info.getOutputBytes(), 
+              info.getResourceUsageMetrics().getCumulativeCpuUsage(),
+              info.getResourceUsageMetrics().getPhysicalMemoryUsage(),
+              info.getResourceUsageMetrics().getVirtualMemoryUsage(),
+              info.getResourceUsageMetrics().getHeapUsage()));
         }
       }
       final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/CumulativeCpuUsageEmulatorPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/CumulativeCpuUsageEmulatorPlugin.java?rev=1204925&r1=1204924&r2=1204925&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/CumulativeCpuUsageEmulatorPlugin.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/CumulativeCpuUsageEmulatorPlugin.java Tue Nov 22 11:24:34 2011
@@ -67,7 +67,7 @@ implements ResourceUsageEmulatorPlugin {
   private float emulationInterval; // emulation interval
   private long targetCpuUsage = 0;
   private float lastSeenProgress = 0;
-  private long lastSeenCpuUsageCpuUsage = 0;
+  private long lastSeenCpuUsage = 0;
   
   // Configuration parameters
   public static final String CPU_EMULATION_PROGRESS_INTERVAL = 
@@ -229,6 +229,15 @@ implements ResourceUsageEmulatorPlugin {
     return progress * progress * progress * progress;
   }
   
+  private synchronized long getCurrentCPUUsage() {
+    return monitor.getProcResourceValues().getCumulativeCpuTime();
+  }
+  
+  @Override
+  public float getProgress() {
+    return Math.min(1f, ((float)getCurrentCPUUsage())/targetCpuUsage);
+  }
+  
   @Override
   //TODO Multi-threading for speedup?
   public void emulate() throws IOException, InterruptedException {
@@ -249,10 +258,9 @@ implements ResourceUsageEmulatorPlugin {
         //   Note that (Cc-Cl)/(Pc-Pl) is termed as 'rate' in the following 
         //   section
         
-        long currentCpuUsage = 
-          monitor.getProcResourceValues().getCumulativeCpuTime();
+        long currentCpuUsage = getCurrentCPUUsage();
         // estimate the cpu usage rate
-        float rate = (currentCpuUsage - lastSeenCpuUsageCpuUsage)
+        float rate = (currentCpuUsage - lastSeenCpuUsage)
                      / (currentProgress - lastSeenProgress);
         long projectedUsage = 
           currentCpuUsage + (long)((1 - currentProgress) * rate);
@@ -264,8 +272,7 @@ implements ResourceUsageEmulatorPlugin {
             (long)(targetCpuUsage 
                    * getWeightForProgressInterval(currentProgress));
           
-          while (monitor.getProcResourceValues().getCumulativeCpuTime() 
-                 < currentWeighedTarget) {
+          while (getCurrentCPUUsage() < currentWeighedTarget) {
             emulatorCore.compute();
             // sleep for 100ms
             try {
@@ -281,8 +288,7 @@ implements ResourceUsageEmulatorPlugin {
         // set the last seen progress
         lastSeenProgress = progress.getProgress();
         // set the last seen usage
-        lastSeenCpuUsageCpuUsage = 
-          monitor.getProcResourceValues().getCumulativeCpuTime();
+        lastSeenCpuUsage = getCurrentCPUUsage();
       }
     }
   }
@@ -310,6 +316,6 @@ implements ResourceUsageEmulatorPlugin {
     
     // initialize the states
     lastSeenProgress = 0;
-    lastSeenCpuUsageCpuUsage = 0;
+    lastSeenCpuUsage = 0;
   }
 }
\ No newline at end of file

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageEmulatorPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageEmulatorPlugin.java?rev=1204925&r1=1204924&r2=1204925&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageEmulatorPlugin.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageEmulatorPlugin.java Tue Nov 22 11:24:34 2011
@@ -42,7 +42,7 @@ import org.apache.hadoop.conf.Configurat
  * For configuring GridMix to load and and use a resource usage emulator, 
  * see {@link ResourceUsageMatcher}. 
  */
-public interface ResourceUsageEmulatorPlugin {
+public interface ResourceUsageEmulatorPlugin extends Progressive {
   /**
    * Initialize the plugin. This might involve
    *   - initializing the variables

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageMatcher.java?rev=1204925&r1=1204924&r2=1204925&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageMatcher.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageMatcher.java Tue Nov 22 11:24:34 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.mapred.gridmix.emulators.resourceusage;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -35,7 +36,7 @@ import org.apache.hadoop.util.Reflection
  * <p>Note that the order in which the emulators are invoked is same as the 
  * order in which they are configured.
  */
-public class ResourceUsageMatcher {
+public class ResourceUsageMatcher implements Progressive {
   /**
    * Configuration key to set resource usage emulators.
    */
@@ -71,10 +72,31 @@ public class ResourceUsageMatcher {
     }
   }
   
-  public void matchResourceUsage() throws Exception {
+  public void matchResourceUsage() throws IOException, InterruptedException {
     for (ResourceUsageEmulatorPlugin emulator : emulationPlugins) {
       // match the resource usage
       emulator.emulate();
     }
   }
+  
+  /**
+   * Returns the average progress.
+   */
+  @Override
+  public float getProgress() {
+    if (emulationPlugins.size() > 0) {
+      // return the average progress
+      float progress = 0f;
+      for (ResourceUsageEmulatorPlugin emulator : emulationPlugins) {
+        // consider weighted progress of each emulator
+        progress += emulator.getProgress();
+      }
+
+      return progress / emulationPlugins.size();
+    }
+    
+    // if no emulators are configured then return 1
+    return 1f;
+    
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/TotalHeapUsageEmulatorPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/TotalHeapUsageEmulatorPlugin.java?rev=1204925&r1=1204924&r2=1204925&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/TotalHeapUsageEmulatorPlugin.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/TotalHeapUsageEmulatorPlugin.java Tue Nov 22 11:24:34 2011
@@ -187,6 +187,11 @@ implements ResourceUsageEmulatorPlugin {
   }
   
   @Override
+  public float getProgress() {
+    return Math.min(1f, ((float)getTotalHeapUsageInMB())/targetHeapUsageInMB);
+  }
+  
+  @Override
   public void emulate() throws IOException, InterruptedException {
     if (enabled) {
       float currentProgress = progress.getProgress();

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestResourceUsageEmulators.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestResourceUsageEmulators.java?rev=1204925&r1=1204924&r2=1204925&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestResourceUsageEmulators.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestResourceUsageEmulators.java Tue Nov 22 11:24:34 2011
@@ -134,6 +134,14 @@ public class TestResourceUsageEmulators 
              ? fs.getFileStatus(testPath).getModificationTime() 
              : 0;
     }
+    
+    @Override
+    public float getProgress() {
+      try {
+        return fs.exists(touchPath) ? 1.0f : 0f;
+      } catch (IOException ioe) {}
+      return 0f;
+    }
   }
   
   /**