You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by am...@apache.org on 2011/06/14 09:44:17 UTC

svn commit: r1135396 - in /hadoop/common/trunk/mapreduce: ./ src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/ src/contrib/gridmix/src/java/org/apache/hadoop/mapred/g...

Author: amarrk
Date: Tue Jun 14 07:44:16 2011
New Revision: 1135396

URL: http://svn.apache.org/viewvc?rev=1135396&view=rev
Log:
MAPREDUCE-2106. [Gridmix] Cumulative CPU usage emulation in Gridmix. (amarrk)

Added:
    hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Progressive.java
    hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/
    hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/
    hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/CumulativeCpuUsageEmulatorPlugin.java
    hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageEmulatorPlugin.java
    hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageMatcher.java
    hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestResourceUsageEmulators.java
Modified:
    hadoop/common/trunk/mapreduce/CHANGES.txt
    hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java
    hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
    hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
    hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java
    hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java
    hadoop/common/trunk/mapreduce/src/docs/src/documentation/content/xdocs/gridmix.xml

Modified: hadoop/common/trunk/mapreduce/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/CHANGES.txt?rev=1135396&r1=1135395&r2=1135396&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/CHANGES.txt (original)
+++ hadoop/common/trunk/mapreduce/CHANGES.txt Tue Jun 14 07:44:16 2011
@@ -11,6 +11,9 @@ Trunk (unreleased changes)
 
   NEW FEATURES
 
+    MAPREDUCE-2106. [Gridmix] Cumulative CPU usage emulation in Gridmix. 
+    (amarrk)
+
     MAPREDUCE-2543. [Gridmix] High-Ram feature emulation in Gridmix. (amarrk)
 
     MAPREDUCE-2408. [Gridmix] Compression emulation in Gridmix. (amarrk)

Modified: hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java?rev=1135396&r1=1135395&r2=1135396&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java (original)
+++ hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java Tue Jun 14 07:44:16 2011
@@ -25,6 +25,7 @@ import org.apache.hadoop.io.DataInputBuf
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
 
 class GridmixKey extends GridmixRecord {
   static final byte REDUCE_SPEC = 0;
@@ -115,6 +116,22 @@ class GridmixKey extends GridmixRecord {
     setSize(origSize);
   }
 
+  /**
+   * Get the {@link ResourceUsageMetrics} stored in the key.
+   */
+  public ResourceUsageMetrics getReduceResourceUsageMetrics() {
+    assert REDUCE_SPEC == getType();
+    return spec.metrics;
+  }
+  
+  /**
+   * Store the {@link ResourceUsageMetrics} in the key.
+   */
+  public void setReduceResourceUsageMetrics(ResourceUsageMetrics metrics) {
+    assert REDUCE_SPEC == getType();
+    spec.setResourceUsageSpecification(metrics);
+  }
+  
   public byte getType() {
     return type;
   }
@@ -195,18 +212,35 @@ class GridmixKey extends GridmixRecord {
     long rec_in;
     long rec_out;
     long bytes_out;
+    private ResourceUsageMetrics metrics = null;
+    private int sizeOfResourceUsageMetrics = 0;
     public Spec() { }
 
     public void set(Spec other) {
       rec_in = other.rec_in;
       bytes_out = other.bytes_out;
       rec_out = other.rec_out;
+      setResourceUsageSpecification(other.metrics);
     }
 
+    /**
+     * Sets the {@link ResourceUsageMetrics} for this {@link Spec}.
+     */
+    public void setResourceUsageSpecification(ResourceUsageMetrics metrics) {
+      this.metrics = metrics;
+      if (metrics != null) {
+        this.sizeOfResourceUsageMetrics = metrics.size();
+      } else {
+        this.sizeOfResourceUsageMetrics = 0;
+      }
+    }
+    
     public int getSize() {
       return WritableUtils.getVIntSize(rec_in) +
              WritableUtils.getVIntSize(rec_out) +
-             WritableUtils.getVIntSize(bytes_out);
+             WritableUtils.getVIntSize(bytes_out) +
+             WritableUtils.getVIntSize(sizeOfResourceUsageMetrics) +
+             sizeOfResourceUsageMetrics;
     }
 
     @Override
@@ -214,6 +248,11 @@ class GridmixKey extends GridmixRecord {
       rec_in = WritableUtils.readVLong(in);
       rec_out = WritableUtils.readVLong(in);
       bytes_out = WritableUtils.readVLong(in);
+      sizeOfResourceUsageMetrics =  WritableUtils.readVInt(in);
+      if (sizeOfResourceUsageMetrics > 0) {
+        metrics = new ResourceUsageMetrics();
+        metrics.readFields(in);
+      }
     }
 
     @Override
@@ -221,6 +260,10 @@ class GridmixKey extends GridmixRecord {
       WritableUtils.writeVLong(out, rec_in);
       WritableUtils.writeVLong(out, rec_out);
       WritableUtils.writeVLong(out, bytes_out);
+      WritableUtils.writeVInt(out, sizeOfResourceUsageMetrics);
+      if (sizeOfResourceUsageMetrics > 0) {
+        metrics.write(out);
+      }
     }
   }
 

Modified: hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java?rev=1135396&r1=1135395&r2=1135396&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java (original)
+++ hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java Tue Jun 14 07:44:16 2011
@@ -28,6 +28,7 @@ import org.apache.hadoop.mapreduce.TaskT
 import org.apache.hadoop.tools.rumen.JobStory;
 import org.apache.hadoop.tools.rumen.JobStoryProducer;
 import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
+import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
 import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
 import org.apache.hadoop.tools.rumen.TaskInfo;
 import org.apache.hadoop.tools.rumen.ZombieJobProducer;
@@ -111,7 +112,7 @@ abstract class JobFactory<T> implements 
     public MinTaskInfo(TaskInfo info) {
       super(info.getInputBytes(), info.getInputRecords(),
             info.getOutputBytes(), info.getOutputRecords(),
-            info.getTaskMemory());
+            info.getTaskMemory(), info.getResourceUsageMetrics());
     }
     public long getInputBytes() {
       return Math.max(0, super.getInputBytes());

Modified: hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java?rev=1135396&r1=1135395&r2=1135396&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java (original)
+++ hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java Tue Jun 14 07:44:16 2011
@@ -22,6 +22,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.ResourceUsageMatcher;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -31,10 +32,14 @@ import org.apache.hadoop.mapreduce.Mappe
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
 import org.apache.hadoop.tools.rumen.TaskInfo;
 
 import java.io.IOException;
@@ -88,6 +93,101 @@ class LoadJob extends GridmixJob {
     return true;
   }
   
+  /**
+   * This is a progress based resource usage matcher.
+   */
+  @SuppressWarnings("unchecked")
+  static class ResourceUsageMatcherRunner extends Thread {
+    private final ResourceUsageMatcher matcher;
+    private final Progressive progress;
+    private final long sleepTime;
+    private static final String SLEEP_CONFIG = 
+      "gridmix.emulators.resource-usage.sleep-duration";
+    private static final long DEFAULT_SLEEP_TIME = 100; // 100ms
+    
+    ResourceUsageMatcherRunner(final TaskInputOutputContext context, 
+                               ResourceUsageMetrics metrics) {
+      Configuration conf = context.getConfiguration();
+      
+      // set the resource calculator plugin
+      Class<? extends ResourceCalculatorPlugin> clazz =
+        conf.getClass(TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN,
+                      null, ResourceCalculatorPlugin.class);
+      ResourceCalculatorPlugin plugin = 
+        ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf);
+      
+      // set the other parameters
+      this.sleepTime = conf.getLong(SLEEP_CONFIG, DEFAULT_SLEEP_TIME);
+      progress = new Progressive() {
+        @Override
+        public float getProgress() {
+          return context.getProgress();
+        }
+      };
+      
+      // instantiate a resource-usage-matcher
+      matcher = new ResourceUsageMatcher();
+      matcher.configure(conf, plugin, metrics, progress);
+    }
+    
+    protected void match() throws Exception {
+      // match the resource usage
+      matcher.matchResourceUsage();
+    }
+    
+    @Override
+    public void run() {
+      LOG.info("Resource usage matcher thread started.");
+      try {
+        while (progress.getProgress() < 1) {
+          // match
+          match();
+          
+          // sleep for some time
+          try {
+            Thread.sleep(sleepTime);
+          } catch (Exception e) {}
+        }
+        
+        // match for progress = 1
+        match();
+        LOG.info("Resource usage emulation complete! Matcher exiting");
+      } catch (Exception e) {
+        LOG.info("Exception while running the resource-usage-emulation matcher"
+                 + " thread! Exiting.", e);
+      }
+    }
+  }
+  
+  // Makes sure that the TaskTracker doesn't kill the map/reduce tasks while
+  // they are emulating
+  private static class StatusReporter extends Thread {
+    private TaskAttemptContext context;
+    StatusReporter(TaskAttemptContext context) {
+      this.context = context;
+    }
+    
+    @Override
+    public void run() {
+      LOG.info("Status reporter thread started.");
+      try {
+        while (context.getProgress() < 1) {
+          // report progress
+          context.progress();
+
+          // sleep for some time
+          try {
+            Thread.sleep(100); // sleep for 100ms
+          } catch (Exception e) {}
+        }
+        
+        LOG.info("Status reporter thread exiting");
+      } catch (Exception e) {
+        LOG.info("Exception while running the status reporter thread!", e);
+      }
+    }
+  }
+  
   public static class LoadMapper
   extends Mapper<NullWritable, GridmixRecord, GridmixKey, GridmixRecord> {
 
@@ -100,6 +200,9 @@ class LoadJob extends GridmixJob {
     private final GridmixKey key = new GridmixKey();
     private final GridmixRecord val = new GridmixRecord();
 
+    private ResourceUsageMatcherRunner matcher = null;
+    private StatusReporter reporter = null;
+    
     @Override
     protected void setup(Context ctxt) 
     throws IOException, InterruptedException {
@@ -133,6 +236,8 @@ class LoadJob extends GridmixJob {
           if (i == id) {
             spec.bytes_out = split.getReduceBytes(idx);
             spec.rec_out = split.getReduceRecords(idx);
+            spec.setResourceUsageSpecification(
+                   split.getReduceResourceUsageMetrics(idx));
             ++idx;
             id += maps;
           }
@@ -167,6 +272,13 @@ class LoadJob extends GridmixJob {
         : splitRecords;
       ratio = totalRecords / (1.0 * inputRecords);
       acc = 0.0;
+      
+      matcher = new ResourceUsageMatcherRunner(ctxt, 
+                      split.getMapResourceUsageMetrics());
+      
+      // start the status reporter thread
+      reporter = new StatusReporter(ctxt);
+      reporter.start();
     }
 
     @Override
@@ -184,6 +296,13 @@ class LoadJob extends GridmixJob {
         }
         context.write(key, val);
         acc -= 1.0;
+        
+        // match inline
+        try {
+          matcher.match();
+        } catch (Exception e) {
+          LOG.debug("Error in resource usage emulation! Message: ", e);
+        }
       }
     }
 
@@ -195,8 +314,18 @@ class LoadJob extends GridmixJob {
         while (factory.next(key, val)) {
           context.write(key, val);
           key.setSeed(r.nextLong());
+          
+          // match inline
+          try {
+            matcher.match();
+          } catch (Exception e) {
+            LOG.debug("Error in resource usage emulation! Message: ", e);
+          }
         }
       }
+      
+      // start the matcher thread since the map phase ends here
+      matcher.start();
     }
   }
 
@@ -210,6 +339,9 @@ class LoadJob extends GridmixJob {
     private double ratio;
     private RecordFactory factory;
 
+    private ResourceUsageMatcherRunner matcher = null;
+    private StatusReporter reporter = null;
+    
     @Override
     protected void setup(Context context)
     throws IOException, InterruptedException {
@@ -220,11 +352,15 @@ class LoadJob extends GridmixJob {
       long outBytes = 0L;
       long outRecords = 0L;
       long inRecords = 0L;
+      ResourceUsageMetrics metrics = new ResourceUsageMetrics();
       for (GridmixRecord ignored : context.getValues()) {
         final GridmixKey spec = context.getCurrentKey();
         inRecords += spec.getReduceInputRecords();
         outBytes += spec.getReduceOutputBytes();
         outRecords += spec.getReduceOutputRecords();
+        if (spec.getReduceResourceUsageMetrics() != null) {
+          metrics = spec.getReduceResourceUsageMetrics();
+        }
       }
       if (0 == outRecords && inRecords > 0) {
         LOG.info("Spec output bytes w/o records. Using input record count");
@@ -252,6 +388,12 @@ class LoadJob extends GridmixJob {
                              context.getConfiguration(), 5*1024);
       ratio = outRecords / (1.0 * inRecords);
       acc = 0.0;
+      
+      matcher = new ResourceUsageMatcherRunner(context, metrics);
+      
+      // start the status reporter thread
+      reporter = new StatusReporter(context);
+      reporter.start();
     }
     @Override
     protected void reduce(GridmixKey key, Iterable<GridmixRecord> values,
@@ -262,6 +404,13 @@ class LoadJob extends GridmixJob {
         while (acc >= 1.0 && factory.next(null, val)) {
           context.write(NullWritable.get(), val);
           acc -= 1.0;
+          
+          // match inline
+          try {
+            matcher.match();
+          } catch (Exception e) {
+            LOG.debug("Error in resource usage emulation! Message: ", e);
+          }
         }
       }
     }
@@ -272,6 +421,13 @@ class LoadJob extends GridmixJob {
       while (factory.next(null, val)) {
         context.write(NullWritable.get(), val);
         val.setSeed(r.nextLong());
+        
+        // match inline
+        try {
+          matcher.match();
+        } catch (Exception e) {
+          LOG.debug("Error in resource usage emulation! Message: ", e);
+        }
       }
     }
   }
@@ -364,11 +520,13 @@ class LoadJob extends GridmixJob {
       final int nSpec = reds / maps + ((reds % maps) > i ? 1 : 0);
       final long[] specBytes = new long[nSpec];
       final long[] specRecords = new long[nSpec];
+      final ResourceUsageMetrics[] metrics = new ResourceUsageMetrics[nSpec];
       for (int j = 0; j < nSpec; ++j) {
         final TaskInfo info =
           jobdesc.getTaskInfo(TaskType.REDUCE, i + j * maps);
         specBytes[j] = info.getOutputBytes();
         specRecords[j] = info.getOutputRecords();
+        metrics[j] = info.getResourceUsageMetrics();
         if (LOG.isDebugEnabled()) {
           LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i,
                     i + j * maps, info.getOutputRecords(), 
@@ -381,7 +539,8 @@ class LoadJob extends GridmixJob {
                       maps, i, info.getInputBytes(), info.getInputRecords(),
                       info.getOutputBytes(), info.getOutputRecords(),
                       reduceByteRatio, reduceRecordRatio, specBytes, 
-                      specRecords));
+                      specRecords, info.getResourceUsageMetrics(),
+                      metrics));
     }
     pushDescription(id(), splits);
   }

Modified: hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java?rev=1135396&r1=1135395&r2=1135396&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java (original)
+++ hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java Tue Jun 14 07:44:16 2011
@@ -23,6 +23,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
 
 class LoadSplit extends CombineFileSplit {
   private int id;
@@ -40,6 +41,9 @@ class LoadSplit extends CombineFileSplit
   private long[] reduceOutputBytes = new long[0];
   private long[] reduceOutputRecords = new long[0];
 
+  private ResourceUsageMetrics mapMetrics;
+  private ResourceUsageMetrics[] reduceMetrics;
+
   LoadSplit() {
     super();
   }
@@ -47,7 +51,9 @@ class LoadSplit extends CombineFileSplit
   public LoadSplit(CombineFileSplit cfsplit, int maps, int id, long inputBytes, 
                    long inputRecords, long outputBytes, long outputRecords, 
                    double[] reduceBytes, double[] reduceRecords, 
-                   long[] reduceOutputBytes, long[] reduceOutputRecords)
+                   long[] reduceOutputBytes, long[] reduceOutputRecords,
+                   ResourceUsageMetrics metrics,
+                   ResourceUsageMetrics[] rMetrics)
   throws IOException {
     super(cfsplit);
     this.id = id;
@@ -61,6 +67,8 @@ class LoadSplit extends CombineFileSplit
     nSpec = reduceOutputBytes.length;
     this.reduceOutputBytes = reduceOutputBytes;
     this.reduceOutputRecords = reduceOutputRecords;
+    this.mapMetrics = metrics;
+    this.reduceMetrics = rMetrics;
   }
 
   public int getId() {
@@ -98,6 +106,15 @@ class LoadSplit extends CombineFileSplit
   public long getReduceRecords(int i) {
     return reduceOutputRecords[i];
   }
+  
+  public ResourceUsageMetrics getMapResourceUsageMetrics() {
+    return mapMetrics;
+  }
+  
+  public ResourceUsageMetrics getReduceResourceUsageMetrics(int i) {
+    return reduceMetrics[i];
+  }
+  
   @Override
   public void write(DataOutput out) throws IOException {
     super.write(out);
@@ -117,6 +134,12 @@ class LoadSplit extends CombineFileSplit
       WritableUtils.writeVLong(out, reduceOutputBytes[i]);
       WritableUtils.writeVLong(out, reduceOutputRecords[i]);
     }
+    mapMetrics.write(out);
+    int numReduceMetrics = (reduceMetrics == null) ? 0 : reduceMetrics.length;
+    WritableUtils.writeVInt(out, numReduceMetrics);
+    for (int i = 0; i < numReduceMetrics; ++i) {
+      reduceMetrics[i].write(out);
+    }
   }
   @Override
   public void readFields(DataInput in) throws IOException {
@@ -145,5 +168,13 @@ class LoadSplit extends CombineFileSplit
       reduceOutputBytes[i] = WritableUtils.readVLong(in);
       reduceOutputRecords[i] = WritableUtils.readVLong(in);
     }
+    mapMetrics = new ResourceUsageMetrics();
+    mapMetrics.readFields(in);
+    int numReduceMetrics = WritableUtils.readVInt(in);
+    reduceMetrics = new ResourceUsageMetrics[numReduceMetrics];
+    for (int i = 0; i < numReduceMetrics; ++i) {
+      reduceMetrics[i] = new ResourceUsageMetrics();
+      reduceMetrics[i].readFields(in);
+    }
   }
 }

Added: hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Progressive.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Progressive.java?rev=1135396&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Progressive.java (added)
+++ hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Progressive.java Tue Jun 14 07:44:16 2011
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+/**
+ * Used to track progress of tasks.
+ */
+public interface Progressive {
+  public float getProgress();
+}
\ No newline at end of file

Added: hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/CumulativeCpuUsageEmulatorPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/CumulativeCpuUsageEmulatorPlugin.java?rev=1135396&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/CumulativeCpuUsageEmulatorPlugin.java (added)
+++ hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/CumulativeCpuUsageEmulatorPlugin.java Tue Jun 14 07:44:16 2011
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix.emulators.resourceusage;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.gridmix.Progressive;
+import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
+import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
+
+/**
+ * <p>A {@link ResourceUsageEmulatorPlugin} that emulates the cumulative CPU 
+ * usage by performing certain CPU intensive operations. Performing such CPU 
+ * intensive operations essentially uses up some CPU. Every 
+ * {@link ResourceUsageEmulatorPlugin} is configured with a feedback module i.e 
+ * a {@link ResourceCalculatorPlugin}, to monitor the resource usage.</p>
+ * 
+ * <p>{@link CumulativeCpuUsageEmulatorPlugin} emulates the CPU usage in steps. 
+ * The frequency of emulation can be configured via 
+ * {@link #CPU_EMULATION_FREQUENCY}.
+ * CPU usage values are matched via emulation only on the interval boundaries.
+ * </p>
+ *  
+ * {@link CumulativeCpuUsageEmulatorPlugin} is a wrapper program for managing 
+ * the CPU usage emulation feature. It internally uses an emulation algorithm 
+ * (called as core and described using {@link CpuUsageEmulatorCore}) for 
+ * performing the actual emulation. Multiple calls to this core engine should 
+ * use up some amount of CPU.<br>
+ * 
+ * <p>{@link CumulativeCpuUsageEmulatorPlugin} provides a calibration feature 
+ * via {@link #initialize(Configuration, ResourceUsageMetrics, 
+ *                        ResourceCalculatorPlugin, Progressive)} to calibrate 
+ *  the plugin and its core for the underlying hardware. As a result of 
+ *  calibration, every call to the emulation engine's core should roughly use up
+ *  1% of the total usage value to be emulated. This makes sure that the 
+ *  underlying hardware is profiled before use and that the plugin doesn't 
+ *  accidently overuse the CPU. With 1% as the unit emulation target value for 
+ *  the core engine, there will be roughly 100 calls to the engine resulting in 
+ *  roughly 100 calls to the feedback (resource usage monitor) module. 
+ *  Excessive usage of the feedback module is discouraged as 
+ *  it might result into excess CPU usage resulting into no real CPU emulation.
+ *  </p>
+ */
+public class CumulativeCpuUsageEmulatorPlugin 
+implements ResourceUsageEmulatorPlugin {
+  protected CpuUsageEmulatorCore emulatorCore;
+  private ResourceCalculatorPlugin monitor;
+  private Progressive progress;
+  private boolean enabled = true;
+  private float emulationInterval; // emulation interval
+  private long targetCpuUsage = 0;
+  private float lastSeenProgress = 0;
+  private long lastSeenCpuUsageCpuUsage = 0;
+  
+  // Configuration parameters
+  public static final String CPU_EMULATION_FREQUENCY = 
+    "gridmix.emulators.resource-usage.cpu.frequency";
+  private static final float DEFAULT_EMULATION_FREQUENCY = 0.1F; // 10 times
+
+  /**
+   * This is the core CPU usage emulation algorithm. This is the core engine
+   * which actually performs some CPU intensive operations to consume some
+   * amount of CPU. Multiple calls of {@link #compute()} should help the 
+   * plugin emulate the desired level of CPU usage. This core engine can be
+   * calibrated using the {@link #calibrate(ResourceCalculatorPlugin, long)}
+   * API to suit the underlying hardware better. It also can be used to optimize
+   * the emulation cycle.
+   */
+  public interface CpuUsageEmulatorCore {
+    /**
+     * Performs some computation to use up some CPU.
+     */
+    public void compute();
+    
+    /**
+     * Allows the core to calibrate itself.
+     */
+    public void calibrate(ResourceCalculatorPlugin monitor, 
+                          long totalCpuUsage);
+  }
+  
+  /**
+   * This is the core engine to emulate the CPU usage. The only responsibility 
+   * of this class is to perform certain math intensive operations to make sure 
+   * that some desired value of CPU is used.
+   */
+  public static class DefaultCpuUsageEmulator implements CpuUsageEmulatorCore {
+    // number of times to loop for performing the basic unit computation
+    private int numIterations;
+    private final Random random;
+    
+    /**
+     * This is to fool the JVM and make it think that we need the value 
+     * stored in the unit computation i.e {@link #compute()}. This will prevent
+     * the JVM from optimizing the code.
+     */
+    protected double returnValue;
+    
+    /**
+     * Initialized the {@link DefaultCpuUsageEmulator} with default values. 
+     * Note that the {@link DefaultCpuUsageEmulator} should be calibrated 
+     * (see {@link #calibrate(ResourceCalculatorPlugin, long)}) when initialized
+     * using this constructor.
+     */
+    public DefaultCpuUsageEmulator() {
+      this(-1);
+    }
+    
+    DefaultCpuUsageEmulator(int numIterations) {
+      this.numIterations = numIterations;
+      random = new Random();
+    }
+    
+    /**
+     * This will consume some desired level of CPU. This API will try to use up
+     * 'X' percent of the target cumulative CPU usage. Currently X is set to 
+     * 10%.
+     */
+    public void compute() {
+      for (int i = 0; i < numIterations; ++i) {
+        performUnitComputation();
+      }
+    }
+    
+    // Perform unit computation. The complete CPU emulation will be based on 
+    // multiple invocations to this unit computation module.
+    protected void performUnitComputation() {
+      //TODO can this be configurable too. Users/emulators should be able to 
+      // pick and choose what MATH operations to run.
+      // Example :
+      //           BASIC : ADD, SUB, MUL, DIV
+      //           ADV   : SQRT, SIN, COSIN..
+      //           COMPO : (BASIC/ADV)*
+      // Also define input generator. For now we can use the random number 
+      // generator. Later this can be changed to accept multiple sources.
+      
+      int randomData = random.nextInt();
+      int randomDataCube = randomData * randomData * randomData;
+      double randomDataCubeRoot = Math.cbrt(randomData);
+      returnValue = Math.log(Math.tan(randomDataCubeRoot 
+                                      * Math.exp(randomDataCube)) 
+                             * Math.sqrt(randomData));
+    }
+    
+    /**
+     * This will calibrate the algorithm such that a single invocation of
+     * {@link #compute()} emulates roughly 1% of the total desired resource 
+     * usage value.
+     */
+    public void calibrate(ResourceCalculatorPlugin monitor, 
+                          long totalCpuUsage) {
+      long initTime = monitor.getProcResourceValues().getCumulativeCpuTime();
+      
+      long defaultLoopSize = 0;
+      long finalTime = initTime;
+      
+      //TODO Make this configurable
+      while (finalTime - initTime < 100) { // 100 ms
+        ++defaultLoopSize;
+        performUnitComputation(); //perform unit computation
+        finalTime = monitor.getProcResourceValues().getCumulativeCpuTime();
+      }
+      
+      long referenceRuntime = finalTime - initTime;
+      
+      // time for one loop = (final-time - init-time) / total-loops
+      float timePerLoop = ((float)referenceRuntime) / defaultLoopSize;
+      
+      // compute the 1% of the total CPU usage desired
+      //TODO Make this configurable
+      long onePercent = totalCpuUsage / 100;
+      
+      // num-iterations for 1% = (total-desired-usage / 100) / time-for-one-loop
+      numIterations = Math.max(1, (int)((float)onePercent/timePerLoop));
+      
+      System.out.println("Calibration done. Basic computation runtime : " 
+          + timePerLoop + " milliseconds. Optimal number of iterations (1%): " 
+          + numIterations);
+    }
+  }
+  
+  public CumulativeCpuUsageEmulatorPlugin() {
+    this(new DefaultCpuUsageEmulator());
+  }
+  
+  /**
+   * For testing.
+   */
+  public CumulativeCpuUsageEmulatorPlugin(CpuUsageEmulatorCore core) {
+    emulatorCore = core;
+  }
+  
+  // Note that this weighing function uses only the current progress. In future,
+  // this might depend on progress, emulation-interval and expected target.
+  private float getWeightForProgressInterval(float progress) {
+    // we want some kind of exponential growth function that gives less weight
+    // on lower progress boundaries but high (exact emulation) near progress 
+    // value of 1.
+    // so here is how the current growth function looks like
+    //    progress    weight
+    //      0.1       0.0001
+    //      0.2       0.0016
+    //      0.3       0.0081
+    //      0.4       0.0256
+    //      0.5       0.0625
+    //      0.6       0.1296
+    //      0.7       0.2401
+    //      0.8       0.4096
+    //      0.9       0.6561
+    //      1.0       1.000
+    
+    return progress * progress * progress * progress;
+  }
+  
+  @Override
+  //TODO Multi-threading for speedup?
+  public void emulate() throws IOException, InterruptedException {
+    if (enabled) {
+      float currentProgress = progress.getProgress();
+      if (lastSeenProgress < currentProgress 
+          && ((currentProgress - lastSeenProgress) >= emulationInterval
+              || currentProgress == 1)) {
+        // Estimate the final cpu usage
+        //
+        //   Consider the following
+        //     Cl/Cc/Cp : Last/Current/Projected Cpu usage
+        //     Pl/Pc/Pp : Last/Current/Projected progress
+        //   Then
+        //     (Cp-Cc)/(Pp-Pc) = (Cc-Cl)/(Pc-Pl)
+        //   Solving this for Cp, we get
+        //     Cp = Cc + (1-Pc)*(Cc-Cl)/Pc-Pl)
+        //   Note that (Cc-Cl)/(Pc-Pl) is termed as 'rate' in the following 
+        //   section
+        
+        long currentCpuUsage = 
+          monitor.getProcResourceValues().getCumulativeCpuTime();
+        // estimate the cpu usage rate
+        float rate = (currentCpuUsage - lastSeenCpuUsageCpuUsage)
+                     / (currentProgress - lastSeenProgress);
+        long projectedUsage = 
+          currentCpuUsage + (long)((1 - currentProgress) * rate);
+        
+        if (projectedUsage < targetCpuUsage) {
+          // determine the correction factor between the current usage and the
+          // expected usage and add some weight to the target
+          long currentWeighedTarget = 
+            (long)(targetCpuUsage 
+                   * getWeightForProgressInterval(currentProgress));
+          
+          while (monitor.getProcResourceValues().getCumulativeCpuTime() 
+                 < currentWeighedTarget) {
+            emulatorCore.compute();
+            // sleep for 100ms
+            try {
+              Thread.sleep(100);
+            } catch (InterruptedException ie) {
+              String message = 
+                "CumulativeCpuUsageEmulatorPlugin got interrupted. Exiting.";
+              throw new RuntimeException(message);
+            }
+          }
+        }
+        
+        // set the last seen progress
+        lastSeenProgress = progress.getProgress();
+        // set the last seen usage
+        lastSeenCpuUsageCpuUsage = 
+          monitor.getProcResourceValues().getCumulativeCpuTime();
+      }
+    }
+  }
+
+  @Override
+  public void initialize(Configuration conf, ResourceUsageMetrics metrics,
+                         ResourceCalculatorPlugin monitor,
+                         Progressive progress) {
+    // get the target CPU usage
+    targetCpuUsage = metrics.getCumulativeCpuUsage();
+    if (targetCpuUsage <= 0 ) {
+      enabled = false;
+      return;
+    } else {
+      enabled = true;
+    }
+    
+    this.monitor = monitor;
+    this.progress = progress;
+    emulationInterval =  conf.getFloat(CPU_EMULATION_FREQUENCY, 
+                                       DEFAULT_EMULATION_FREQUENCY);
+    
+    // calibrate the core cpu-usage utility
+    emulatorCore.calibrate(monitor, targetCpuUsage);
+    
+    // initialize the states
+    lastSeenProgress = 0;
+    lastSeenCpuUsageCpuUsage = 0;
+  }
+}
\ No newline at end of file

Added: hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageEmulatorPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageEmulatorPlugin.java?rev=1135396&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageEmulatorPlugin.java (added)
+++ hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageEmulatorPlugin.java Tue Jun 14 07:44:16 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix.emulators.resourceusage;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.gridmix.Progressive;
+import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
+import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * <p>Each resource to be emulated should have a corresponding implementation 
+ * class that implements {@link ResourceUsageEmulatorPlugin}.</p>
+ * <br><br>
+ * {@link ResourceUsageEmulatorPlugin} will be configured using the 
+ * {@link #initialize(Configuration, ResourceUsageMetrics, 
+ *                    ResourceCalculatorPlugin, Progressive)} call.
+ * Every 
+ * {@link ResourceUsageEmulatorPlugin} is also configured with a feedback module
+ * i.e a {@link ResourceCalculatorPlugin}, to monitor the current resource 
+ * usage. {@link ResourceUsageMetrics} decides the final resource usage value to
+ * emulate. {@link Progressive} keeps track of the task's progress.</p>
+ * 
+ * <br><br>
+ * 
+ * For configuring GridMix to load and and use a resource usage emulator, 
+ * see {@link ResourceUsageMatcher}. 
+ */
+public interface ResourceUsageEmulatorPlugin {
+  /**
+   * Initialize the plugin. This might involve
+   *   - initializing the variables
+   *   - calibrating the plugin
+   */
+  void initialize(Configuration conf, ResourceUsageMetrics metrics, 
+                  ResourceCalculatorPlugin monitor,
+                  Progressive progress);
+
+  /**
+   * Emulate the resource usage to match the usage target. The plugin can use
+   * the given {@link ResourceCalculatorPlugin} to query for the current 
+   * resource usage.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  void emulate() throws IOException, InterruptedException;
+}

Added: hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageMatcher.java?rev=1135396&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageMatcher.java (added)
+++ hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageMatcher.java Tue Jun 14 07:44:16 2011
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix.emulators.resourceusage;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.gridmix.Progressive;
+import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
+import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * <p>This is the driver class for managing all the resource usage emulators.
+ * {@link ResourceUsageMatcher} expects a comma separated list of 
+ * {@link ResourceUsageEmulatorPlugin} implementations specified using 
+ * {@link #RESOURCE_USAGE_EMULATION_PLUGINS} as the configuration parameter.</p>
+ * 
+ * <p>Note that the order in which the emulators are invoked is same as the 
+ * order in which they are configured.
+ */
+public class ResourceUsageMatcher {
+  /**
+   * Configuration key to set resource usage emulators.
+   */
+  public static final String RESOURCE_USAGE_EMULATION_PLUGINS =
+    "gridmix.emulators.resource-usage.plugins";
+  
+  private List<ResourceUsageEmulatorPlugin> emulationPlugins = 
+    new ArrayList<ResourceUsageEmulatorPlugin>();
+  
+  /**
+   * Configure the {@link ResourceUsageMatcher} to load the configured plugins
+   * and initialize them.
+   */
+  @SuppressWarnings("unchecked")
+  public void configure(Configuration conf, ResourceCalculatorPlugin monitor, 
+                        ResourceUsageMetrics metrics, Progressive progress) {
+    Class[] plugins = 
+      conf.getClasses(RESOURCE_USAGE_EMULATION_PLUGINS, 
+                      ResourceUsageEmulatorPlugin.class);
+    if (plugins == null) {
+      System.out.println("No resource usage emulator plugins configured.");
+    } else {
+      for (Class<? extends ResourceUsageEmulatorPlugin> plugin : plugins) {
+        if (plugin != null) {
+          emulationPlugins.add(ReflectionUtils.newInstance(plugin, conf));
+        }
+      }
+    }
+
+    // initialize the emulators once all the configured emulator plugins are
+    // loaded
+    for (ResourceUsageEmulatorPlugin emulator : emulationPlugins) {
+      emulator.initialize(conf, metrics, monitor, progress);
+    }
+  }
+  
+  public void matchResourceUsage() throws Exception {
+    for (ResourceUsageEmulatorPlugin emulator : emulationPlugins) {
+      // match the resource usage
+      emulator.emulate();
+    }
+  }
+}
\ No newline at end of file

Modified: hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java?rev=1135396&r1=1135395&r2=1135396&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java (original)
+++ hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java Tue Jun 14 07:44:16 2011
@@ -176,7 +176,8 @@ public class TestGridmixRecord {
       a.setReduceOutputBytes(out_bytes);
       final int min = WritableUtils.getVIntSize(in_rec)
                     + WritableUtils.getVIntSize(out_rec)
-                    + WritableUtils.getVIntSize(out_bytes);
+                    + WritableUtils.getVIntSize(out_bytes)
+                    + WritableUtils.getVIntSize(0);
       assertEquals(min + 2, a.fixedBytes()); // meta + vint min
       final int size = r.nextInt(1024) + a.fixedBytes() + 1;
       setSerialize(a, r.nextLong(), size, out);
@@ -207,7 +208,7 @@ public class TestGridmixRecord {
 
   @Test
   public void testKeySpec() throws Exception {
-    final int min = 5;
+    final int min = 6;
     final int max = 300;
     final GridmixKey a = new GridmixKey(GridmixKey.REDUCE_SPEC, 1, 0L);
     final GridmixKey b = new GridmixKey(GridmixKey.REDUCE_SPEC, 1, 0L);

Added: hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestResourceUsageEmulators.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestResourceUsageEmulators.java?rev=1135396&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestResourceUsageEmulators.java (added)
+++ hadoop/common/trunk/mapreduce/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestResourceUsageEmulators.java Tue Jun 14 07:44:16 2011
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
+import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
+import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin.ProcResourceValues;
+import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
+import org.apache.hadoop.mapred.DummyResourceCalculatorPlugin;
+import org.apache.hadoop.mapred.gridmix.LoadJob.ResourceUsageMatcherRunner;
+import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.CumulativeCpuUsageEmulatorPlugin;
+import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.ResourceUsageEmulatorPlugin;
+import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.ResourceUsageMatcher;
+import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.CumulativeCpuUsageEmulatorPlugin.DefaultCpuUsageEmulator;
+
+/**
+ * Test Gridmix's resource emulator framework and supported plugins.
+ */
+public class TestResourceUsageEmulators {
+  /**
+   * A {@link ResourceUsageEmulatorPlugin} implementation for testing purpose.
+   * It essentially creates a file named 'test' in the test directory.
+   */
+  static class TestResourceUsageEmulatorPlugin 
+  implements ResourceUsageEmulatorPlugin {
+    static final Path rootTempDir =
+        new Path(System.getProperty("test.build.data", "/tmp"));
+    static final Path tempDir = 
+      new Path(rootTempDir, "TestResourceUsageEmulatorPlugin");
+    static final String DEFAULT_IDENTIFIER = "test";
+    
+    private Path touchPath = null;
+    private FileSystem fs = null;
+    
+    @Override
+    public void emulate() throws IOException, InterruptedException {
+      // add some time between 2 calls to emulate()
+      try {
+        Thread.sleep(1000); // sleep for 1s
+      } catch (Exception e){}
+      
+      try {
+        fs.delete(touchPath, false); // delete the touch file
+        //TODO Search for a better touch utility
+        fs.create(touchPath).close(); // recreate it
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+    
+    protected String getIdentifier() {
+      return DEFAULT_IDENTIFIER;
+    }
+    
+    private static Path getFilePath(String id) {
+      return new Path(tempDir, id);
+    }
+    
+    private static Path getInitFilePath(String id) {
+      return new Path(tempDir, id + ".init");
+    }
+    
+    @Override
+    public void initialize(Configuration conf, ResourceUsageMetrics metrics,
+        ResourceCalculatorPlugin monitor, Progressive progress) {
+      // add some time between 2 calls to initialize()
+      try {
+        Thread.sleep(1000); // sleep for 1s
+      } catch (Exception e){}
+      
+      try {
+        fs = FileSystem.getLocal(conf);
+        
+        Path initPath = getInitFilePath(getIdentifier());
+        fs.delete(initPath, false); // delete the old file
+        fs.create(initPath).close(); // create a new one
+        
+        touchPath = getFilePath(getIdentifier());
+        fs.delete(touchPath, false);
+      } catch (Exception e) {
+        
+      } finally {
+        if (fs != null) {
+          try {
+            fs.deleteOnExit(tempDir);
+          } catch (IOException ioe){}
+        }
+      }
+    }
+    
+    // test if the emulation framework successfully loaded this plugin
+    static long testInitialization(String id, Configuration conf) 
+    throws IOException {
+      Path testPath = getInitFilePath(id);
+      FileSystem fs = FileSystem.getLocal(conf);
+      return fs.exists(testPath) 
+             ? fs.getFileStatus(testPath).getModificationTime() 
+             : 0;
+    }
+    
+    // test if the emulation framework successfully loaded this plugin
+    static long testEmulation(String id, Configuration conf) 
+    throws IOException {
+      Path testPath = getFilePath(id);
+      FileSystem fs = FileSystem.getLocal(conf);
+      return fs.exists(testPath) 
+             ? fs.getFileStatus(testPath).getModificationTime() 
+             : 0;
+    }
+  }
+  
+  /**
+   * Test implementation of {@link ResourceUsageEmulatorPlugin} which creates
+   * a file named 'others' in the test directory.
+   */
+  static class TestOthers extends TestResourceUsageEmulatorPlugin {
+    static final String ID = "others";
+    
+    @Override
+    protected String getIdentifier() {
+      return ID;
+    }
+  }
+  
+  /**
+   * Test implementation of {@link ResourceUsageEmulatorPlugin} which creates
+   * a file named 'cpu' in the test directory.
+   */
+  static class TestCpu extends TestResourceUsageEmulatorPlugin {
+    static final String ID = "cpu";
+    
+    @Override
+    protected String getIdentifier() {
+      return ID;
+    }
+  }
+  
+  /**
+   * Test {@link ResourceUsageMatcher}.
+   */
+  @Test
+  public void testResourceUsageMatcher() throws Exception {
+    ResourceUsageMatcher matcher = new ResourceUsageMatcher();
+    Configuration conf = new Configuration();
+    conf.setClass(ResourceUsageMatcher.RESOURCE_USAGE_EMULATION_PLUGINS, 
+                  TestResourceUsageEmulatorPlugin.class, 
+                  ResourceUsageEmulatorPlugin.class);
+    long currentTime = System.currentTimeMillis();
+    
+    matcher.configure(conf, null, null, null);
+    
+    matcher.matchResourceUsage();
+    
+    String id = TestResourceUsageEmulatorPlugin.DEFAULT_IDENTIFIER;
+    long result = 
+      TestResourceUsageEmulatorPlugin.testInitialization(id, conf);
+    assertTrue("Resource usage matcher failed to initialize the configured"
+               + " plugin", result > currentTime);
+    result = TestResourceUsageEmulatorPlugin.testEmulation(id, conf);
+    assertTrue("Resource usage matcher failed to load and emulate the"
+               + " configured plugin", result > currentTime);
+    
+    // test plugin order to first emulate cpu and then others
+    conf.setStrings(ResourceUsageMatcher.RESOURCE_USAGE_EMULATION_PLUGINS, 
+                    TestCpu.class.getName() + "," + TestOthers.class.getName());
+    
+    matcher.configure(conf, null, null, null);
+
+    // test the initialization order
+    long time1 = 
+           TestResourceUsageEmulatorPlugin.testInitialization(TestCpu.ID, conf);
+    long time2 = 
+           TestResourceUsageEmulatorPlugin.testInitialization(TestOthers.ID, 
+                                                              conf);
+    assertTrue("Resource usage matcher failed to initialize the configured"
+               + " plugins in order", time1 < time2);
+    
+    matcher.matchResourceUsage();
+
+    // Note that the cpu usage emulator plugin is configured 1st and then the
+    // others plugin.
+    time1 = 
+      TestResourceUsageEmulatorPlugin.testInitialization(TestCpu.ID, conf);
+    time2 = 
+      TestResourceUsageEmulatorPlugin.testInitialization(TestOthers.ID, 
+                                                         conf);
+    assertTrue("Resource usage matcher failed to load the configured plugins", 
+               time1 < time2);
+  }
+  
+  /**
+   * Fakes the cumulative usage using {@link FakeCpuUsageEmulatorCore}.
+   */
+  static class FakeResourceUsageMonitor extends DummyResourceCalculatorPlugin {
+    private FakeCpuUsageEmulatorCore core;
+    
+    public FakeResourceUsageMonitor(FakeCpuUsageEmulatorCore core) {
+      this.core = core;
+    }
+    
+    /**
+     * A dummy CPU usage monitor. Every call to 
+     * {@link ResourceCalculatorPlugin#getCumulativeCpuTime()} will return the 
+     * value of {@link FakeCpuUsageEmulatorCore#getNumCalls()}.
+     */
+    @Override
+    public long getCumulativeCpuTime() {
+      return core.getCpuUsage();
+    }
+
+    /**
+     * Returns a {@link ProcResourceValues} with cumulative cpu usage  
+     * computed using {@link #getCumulativeCpuTime()}.
+     */
+    @Override
+    public ProcResourceValues getProcResourceValues() {
+      long usageValue = getCumulativeCpuTime();
+      return new ProcResourceValues(usageValue, -1, -1);
+    }
+  }
+  
+  /**
+   * A dummy {@link Progressive} implementation that allows users to set the
+   * progress for testing. The {@link Progressive#getProgress()} call will 
+   * return the last progress value set using 
+   * {@link FakeProgressive#setProgress(float)}.
+   */
+  static class FakeProgressive implements Progressive {
+    private float progress = 0F;
+    @Override
+    public float getProgress() {
+      return progress;
+    }
+    
+    void setProgress(float progress) {
+      this.progress = progress;
+    }
+  }
+  
+  /**
+   * A dummy reporter for {@link LoadJob.ResourceUsageMatcherRunner}.
+   */
+  private static class DummyReporter extends StatusReporter {
+    private Progressive progress;
+    
+    DummyReporter(Progressive progress) {
+      this.progress = progress;
+    }
+    
+    @Override
+    public org.apache.hadoop.mapreduce.Counter getCounter(Enum<?> name) {
+      return null;
+    }
+    
+    @Override
+    public org.apache.hadoop.mapreduce.Counter getCounter(String group,
+                                                          String name) {
+      return null;
+    }
+    
+    @Override
+    public void progress() {
+    }
+    
+    @Override
+    public float getProgress() {
+      return progress.getProgress();
+    }
+    
+    @Override
+    public void setStatus(String status) {
+    }
+  }
+  
+  // Extends ResourceUsageMatcherRunner for testing.
+  @SuppressWarnings("unchecked")
+  private static class FakeResourceUsageMatcherRunner 
+  extends ResourceUsageMatcherRunner {
+    FakeResourceUsageMatcherRunner(TaskInputOutputContext context, 
+                                   ResourceUsageMetrics metrics) {
+      super(context, metrics);
+    }
+    
+    // test ResourceUsageMatcherRunner
+    void test() throws Exception {
+      super.match();
+    }
+  }
+  
+  /**
+   * Test {@link LoadJob.ResourceUsageMatcherRunner}.
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testResourceUsageMatcherRunner() throws Exception {
+    Configuration conf = new Configuration();
+    FakeProgressive progress = new FakeProgressive();
+    
+    // set the resource calculator plugin
+    conf.setClass(TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN,
+                  DummyResourceCalculatorPlugin.class, 
+                  ResourceCalculatorPlugin.class);
+    // set the resources
+    // set the resource implementation class
+    conf.setClass(ResourceUsageMatcher.RESOURCE_USAGE_EMULATION_PLUGINS, 
+                  TestResourceUsageEmulatorPlugin.class, 
+                  ResourceUsageEmulatorPlugin.class);
+    
+    long currentTime = System.currentTimeMillis();
+    
+    // initialize the matcher class
+    TaskAttemptID id = new TaskAttemptID("test", 1, TaskType.MAP, 1, 1);
+    StatusReporter reporter = new DummyReporter(progress);
+    TaskInputOutputContext context = 
+      new MapContextImpl(conf, id, null, null, null, reporter, null);
+    FakeResourceUsageMatcherRunner matcher = 
+      new FakeResourceUsageMatcherRunner(context, null);
+    
+    // check if the matcher initialized the plugin
+    String identifier = TestResourceUsageEmulatorPlugin.DEFAULT_IDENTIFIER;
+    long initTime = 
+      TestResourceUsageEmulatorPlugin.testInitialization(identifier, conf);
+    assertTrue("ResourceUsageMatcherRunner failed to initialize the"
+               + " configured plugin", initTime > currentTime);
+    
+    // check the progress
+    assertEquals("Progress mismatch in ResourceUsageMatcherRunner", 
+                 0, progress.getProgress(), 0D);
+    
+    // call match() and check progress
+    progress.setProgress(0.01f);
+    currentTime = System.currentTimeMillis();
+    matcher.test();
+    long emulateTime = 
+      TestResourceUsageEmulatorPlugin.testEmulation(identifier, conf);
+    assertTrue("ProgressBasedResourceUsageMatcher failed to load and emulate"
+               + " the configured plugin", emulateTime > currentTime);
+  }
+  
+  /**
+   * Test {@link CumulativeCpuUsageEmulatorPlugin}'s core CPU usage emulation 
+   * engine.
+   */
+  @Test
+  public void testCpuUsageEmulator() throws IOException {
+    // test CpuUsageEmulator calibration with fake resource calculator plugin
+    long target = 100000L; // 100 secs
+    int unitUsage = 50;
+    FakeCpuUsageEmulatorCore fakeCpuEmulator = new FakeCpuUsageEmulatorCore();
+    fakeCpuEmulator.setUnitUsage(unitUsage);
+    FakeResourceUsageMonitor fakeMonitor = 
+      new FakeResourceUsageMonitor(fakeCpuEmulator);
+    
+    // calibrate for 100ms
+    fakeCpuEmulator.calibrate(fakeMonitor, target);
+    
+    // by default, CpuUsageEmulator.calibrate() will consume 100ms of CPU usage
+    assertEquals("Fake calibration failed", 
+                 100, fakeMonitor.getCumulativeCpuTime());
+    assertEquals("Fake calibration failed", 
+                 100, fakeCpuEmulator.getCpuUsage());
+    // by default, CpuUsageEmulator.performUnitComputation() will be called 
+    // twice
+    assertEquals("Fake calibration failed", 
+                 2, fakeCpuEmulator.getNumCalls());
+  }
+  
+  /**
+   * This is a dummy class that fakes CPU usage.
+   */
+  private static class FakeCpuUsageEmulatorCore 
+  extends DefaultCpuUsageEmulator {
+    private int numCalls = 0;
+    private int unitUsage = 1;
+    private int cpuUsage = 0;
+    
+    @Override
+    protected void performUnitComputation() {
+      ++numCalls;
+      cpuUsage += unitUsage;
+    }
+    
+    int getNumCalls() {
+      return numCalls;
+    }
+    
+    int getCpuUsage() {
+      return cpuUsage;
+    }
+    
+    void reset() {
+      numCalls = 0;
+      cpuUsage = 0;
+    }
+    
+    void setUnitUsage(int unitUsage) {
+      this.unitUsage = unitUsage;
+    }
+  }
+  
+  // Creates a ResourceUsageMetrics object from the target usage
+  private static ResourceUsageMetrics createMetrics(long target) {
+    ResourceUsageMetrics metrics = new ResourceUsageMetrics();
+    metrics.setCumulativeCpuUsage(target);
+    metrics.setVirtualMemoryUsage(target);
+    metrics.setPhysicalMemoryUsage(target);
+    metrics.setHeapUsage(target);
+    return metrics;
+  }
+  
+  /**
+   * Test {@link CumulativeCpuUsageEmulatorPlugin}.
+   */
+  @Test
+  public void testCumulativeCpuUsageEmulatorPlugin() throws Exception {
+    Configuration conf = new Configuration();
+    long targetCpuUsage = 1000L;
+    int unitCpuUsage = 50;
+    
+    // fake progress indicator
+    FakeProgressive fakeProgress = new FakeProgressive();
+    
+    // fake cpu usage generator
+    FakeCpuUsageEmulatorCore fakeCore = new FakeCpuUsageEmulatorCore();
+    fakeCore.setUnitUsage(unitCpuUsage);
+    
+    // a cumulative cpu usage emulator with fake core
+    CumulativeCpuUsageEmulatorPlugin cpuPlugin = 
+      new CumulativeCpuUsageEmulatorPlugin(fakeCore);
+    
+    // test with invalid or missing resource usage value
+    ResourceUsageMetrics invalidUsage = createMetrics(0);
+    cpuPlugin.initialize(conf, invalidUsage, null, null);
+    
+    // test if disabled cpu emulation plugin's emulate() call is a no-operation
+    // this will test if the emulation plugin is disabled or not
+    int numCallsPre = fakeCore.getNumCalls();
+    long cpuUsagePre = fakeCore.getCpuUsage();
+    cpuPlugin.emulate();
+    int numCallsPost = fakeCore.getNumCalls();
+    long cpuUsagePost = fakeCore.getCpuUsage();
+    
+    //  test if no calls are made cpu usage emulator core
+    assertEquals("Disabled cumulative CPU usage emulation plugin works!", 
+                 numCallsPre, numCallsPost);
+    
+    //  test if no calls are made cpu usage emulator core
+    assertEquals("Disabled cumulative CPU usage emulation plugin works!", 
+                 cpuUsagePre, cpuUsagePost);
+    
+    // test with valid resource usage value
+    ResourceUsageMetrics metrics = createMetrics(targetCpuUsage);
+    
+    // fake monitor
+    ResourceCalculatorPlugin monitor = new FakeResourceUsageMonitor(fakeCore);
+    
+    // test with default emulation interval
+    testEmulationAccuracy(conf, fakeCore, monitor, metrics, cpuPlugin, 
+                          targetCpuUsage, targetCpuUsage / unitCpuUsage);
+    
+    // test with custom value for emulation interval of 20%
+    conf.setFloat(CumulativeCpuUsageEmulatorPlugin.CPU_EMULATION_FREQUENCY,
+                  0.2F);
+    testEmulationAccuracy(conf, fakeCore, monitor, metrics, cpuPlugin, 
+                          targetCpuUsage, targetCpuUsage / unitCpuUsage);
+    
+    // test if emulation interval boundary is respected (unit usage = 1)
+    //  test the case where the current progress is less than threshold
+    fakeProgress = new FakeProgressive(); // initialize
+    fakeCore.reset();
+    fakeCore.setUnitUsage(1);
+    conf.setFloat(CumulativeCpuUsageEmulatorPlugin.CPU_EMULATION_FREQUENCY,
+                  0.25F);
+    cpuPlugin.initialize(conf, metrics, monitor, fakeProgress);
+    // take a snapshot after the initialization
+    long initCpuUsage = monitor.getCumulativeCpuTime();
+    long initNumCalls = fakeCore.getNumCalls();
+    // test with 0 progress
+    testEmulationBoundary(0F, fakeCore, fakeProgress, cpuPlugin, initCpuUsage, 
+                          initNumCalls, "[no-op, 0 progress]");
+    // test with 24% progress
+    testEmulationBoundary(0.24F, fakeCore, fakeProgress, cpuPlugin, 
+                          initCpuUsage, initNumCalls, "[no-op, 24% progress]");
+    // test with 25% progress
+    //  target = 1000ms, target emulation at 25% = 250ms, 
+    //  weighed target = 1000 * 0.25^4 (we are using progress^4 as the weight)
+    //                 ~ 4
+    //  but current usage = init-usage = 100, hence expected = 100
+    testEmulationBoundary(0.25F, fakeCore, fakeProgress, cpuPlugin, 
+                          initCpuUsage, initNumCalls, "[op, 25% progress]");
+    
+    // test with 80% progress
+    //  target = 1000ms, target emulation at 80% = 800ms, 
+    //  weighed target = 1000 * 0.25^4 (we are using progress^4 as the weight)
+    //                 ~ 410
+    //  current-usage = init-usage = 100, hence expected-usage = 410
+    testEmulationBoundary(0.80F, fakeCore, fakeProgress, cpuPlugin, 410, 410, 
+                          "[op, 80% progress]");
+    
+    // now test if the final call with 100% progress ramps up the CPU usage
+    testEmulationBoundary(1F, fakeCore, fakeProgress, cpuPlugin, targetCpuUsage,
+                          targetCpuUsage, "[op, 100% progress]");
+    
+    // test if emulation interval boundary is respected (unit usage = 50)
+    //  test the case where the current progress is less than threshold
+    fakeProgress = new FakeProgressive(); // initialize
+    fakeCore.reset();
+    fakeCore.setUnitUsage(unitCpuUsage);
+    conf.setFloat(CumulativeCpuUsageEmulatorPlugin.CPU_EMULATION_FREQUENCY,
+                  0.40F);
+    cpuPlugin.initialize(conf, metrics, monitor, fakeProgress);
+    // take a snapshot after the initialization
+    initCpuUsage = monitor.getCumulativeCpuTime();
+    initNumCalls = fakeCore.getNumCalls();
+    // test with 0 progress
+    testEmulationBoundary(0F, fakeCore, fakeProgress, cpuPlugin, initCpuUsage, 
+                          initNumCalls, "[no-op, 0 progress]");
+    // test with 39% progress
+    testEmulationBoundary(0.39F, fakeCore, fakeProgress, cpuPlugin, 
+                          initCpuUsage, initNumCalls, "[no-op, 39% progress]");
+    // test with 40% progress
+    //  target = 1000ms, target emulation at 40% = 4000ms, 
+    //  weighed target = 1000 * 0.40^4 (we are using progress^4 as the weight)
+    //                 ~ 26
+    // current-usage = init-usage = 100, hence expected-usage = 100
+    testEmulationBoundary(0.40F, fakeCore, fakeProgress, cpuPlugin, 
+                          initCpuUsage, initNumCalls, "[op, 40% progress]");
+    
+    // test with 90% progress
+    //  target = 1000ms, target emulation at 90% = 900ms, 
+    //  weighed target = 1000 * 0.90^4 (we are using progress^4 as the weight)
+    //                 ~ 657
+    //  current-usage = init-usage = 100, hence expected-usage = 657 but 
+    //  the fake-core increases in steps of 50, hence final target = 700
+    testEmulationBoundary(0.90F, fakeCore, fakeProgress, cpuPlugin, 700, 
+                          700 / unitCpuUsage, "[op, 90% progress]");
+    
+    // now test if the final call with 100% progress ramps up the CPU usage
+    testEmulationBoundary(1F, fakeCore, fakeProgress, cpuPlugin, targetCpuUsage,
+                          targetCpuUsage / unitCpuUsage, "[op, 100% progress]");
+  }
+  
+  // test whether the CPU usage emulator achieves the desired target using
+  // desired calls to the underling core engine.
+  private static void testEmulationAccuracy(Configuration conf, 
+                        FakeCpuUsageEmulatorCore fakeCore,
+                        ResourceCalculatorPlugin monitor,
+                        ResourceUsageMetrics metrics,
+                        CumulativeCpuUsageEmulatorPlugin cpuPlugin,
+                        long expectedTotalCpuUsage, long expectedTotalNumCalls) 
+  throws Exception {
+    FakeProgressive fakeProgress = new FakeProgressive();
+    fakeCore.reset();
+    cpuPlugin.initialize(conf, metrics, monitor, fakeProgress);
+    int numLoops = 0;
+    while (fakeProgress.getProgress() < 1) {
+      ++numLoops;
+      float progress = (float)numLoops / 100;
+      fakeProgress.setProgress(progress);
+      cpuPlugin.emulate();
+    }
+    
+    // test if the resource plugin shows the expected invocations
+    assertEquals("Cumulative cpu usage emulator plugin failed (num calls)!", 
+                 expectedTotalNumCalls, fakeCore.getNumCalls(), 0L);
+    // test if the resource plugin shows the expected usage
+    assertEquals("Cumulative cpu usage emulator plugin failed (total usage)!", 
+                 expectedTotalCpuUsage, fakeCore.getCpuUsage(), 0L);
+  }
+  
+  // tests if the CPU usage emulation plugin emulates only at the expected
+  // progress gaps
+  private static void testEmulationBoundary(float progress, 
+      FakeCpuUsageEmulatorCore fakeCore, FakeProgressive fakeProgress, 
+      CumulativeCpuUsageEmulatorPlugin cpuPlugin, long expectedTotalCpuUsage, 
+      long expectedTotalNumCalls, String info) throws Exception {
+    fakeProgress.setProgress(progress);
+    cpuPlugin.emulate();
+    
+    assertEquals("Emulation interval test for cpu usage failed " + info + "!", 
+                 expectedTotalCpuUsage, fakeCore.getCpuUsage(), 0L);
+    assertEquals("Emulation interval test for num calls failed " + info + "!", 
+                 expectedTotalNumCalls, fakeCore.getNumCalls(), 0L);
+  }
+}
\ No newline at end of file

Modified: hadoop/common/trunk/mapreduce/src/docs/src/documentation/content/xdocs/gridmix.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/docs/src/documentation/content/xdocs/gridmix.xml?rev=1135396&r1=1135395&r2=1135396&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/docs/src/documentation/content/xdocs/gridmix.xml (original)
+++ hadoop/common/trunk/mapreduce/src/docs/src/documentation/content/xdocs/gridmix.xml Tue Jun 14 07:44:16 2011
@@ -663,8 +663,53 @@ hadoop jar &lt;gridmix-jar&gt; org.apach
      </ul>
      <p>High-Ram feature emulation can be disabled by setting  
         <code>gridmix.highram-emulation.enable</code> to
-        <code>false</code>. By default High-Ram feature emulation is enabled.
-        Note that this feature works only for jobs of type <em>LOADJOB</em>.
+        <code>false</code>.
+     </p>
+    </section>
+    
+    <section id="resource-usage-emulation">
+      <title>Emulating resource usages</title>
+      <p>Usages of resources like CPU, physical memory, virtual memory, JVM heap
+         etc are recorded by MapReduce using its task counters. This information
+         is used by GridMix to emulate the resource usages in the simulated 
+         tasks. Emulating resource usages will help GridMix exert similar load 
+         on the test cluster as seen in the actual cluster.
+      </p>
+      <p>MapReduce tasks use up resources during its entire lifetime. GridMix
+         also tries to mimic this behavior by spanning resource usage emulation
+         across the entire lifetime of the simulated task. Each resource to be
+         emulated should have an <em>emulator</em> associated with it.
+         Each such <em>emulator</em> should implement the 
+         <code>org.apache.hadoop.mapred.gridmix.emulators.resourceusage
+         .ResourceUsageEmulatorPlugin</code> interface. Resource 
+         <em>emulators</em> in GridMix are <em>plugins</em> that can be 
+         configured (plugged in or out) before every run. GridMix users can 
+         configure multiple emulator <em>plugins</em> by passing a comma 
+         separated list of <em>emulators</em> as a value for the 
+         <code>gridmix.emulators.resource-usage.plugins</code> parameter. 
+      </p>
+      <p>List of <em>emulators</em> shipped with GridMix:
+      </p>
+     <ul>
+       <li>Cumulative CPU usage <em>emulator</em>: 
+           GridMix uses the cumulative CPU usage value published by Rumen 
+           and makes sure that the total cumulative CPU usage of the simulated 
+           task is close to the value published by Rumen. GridMix can be 
+           configured to emulate cumulative CPU usage by adding 
+           <code>org.apache.hadoop.mapred.gridmix.emulators.resourceusage
+           .CumulativeCpuUsageEmulatorPlugin</code> to the list of emulator 
+           <em>plugins</em> configured for the 
+           <code>gridmix.emulators.resource-usage.plugins</code> parameter.
+           CPU usage emulator is designed in such a way that
+           it only emulates at specific progress boundaries of the task. This 
+           interval can be configured using 
+           <code>gridmix.emulators.resource-usage.cpu.frequency</code>. The 
+           default value for this parameter is <code>0.1</code> i.e 
+           <code>10%</code>.
+       </li>
+     </ul>
+     <p>Note that GridMix will emulate resource usages only for jobs of type 
+        <em>LOADJOB</em>.
      </p>
     </section>
     
@@ -677,10 +722,6 @@ hadoop jar &lt;gridmix-jar&gt; org.apach
       the following characteristics of job load are not currently captured in
       job traces and cannot be accurately reproduced in GridMix:</p>
       <ul>
-	<li><em>CPU Usage</em> - We have no data for per-task CPU usage, so we
-	cannot even attempt an approximation. GridMix tasks are never
-	CPU-bound independent of I/O, though this surely happens in
-	practice.</li>
 	<li><em>Filesystem Properties</em> - No attempt is made to match block
 	sizes, namespace hierarchies, or any property of input, intermediate
 	or output data other than the bytes/records consumed and emitted from