You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2009/12/11 20:33:02 UTC

svn commit: r889778 [2/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/ src/tools/org/apache/hadoop/tools/rumen/

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java?rev=889778&r1=889777&r2=889778&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java Fri Dec 11 19:32:58 2009
@@ -26,9 +26,16 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.tools.rumen.JobStory;
 import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
 import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.TaskInfo;
 import org.apache.hadoop.tools.rumen.ZombieJobProducer;
 
 import org.apache.commons.logging.Log;
@@ -47,6 +54,7 @@
   public static final Log LOG = LogFactory.getLog(JobFactory.class);
 
   private final Path scratch;
+  private final float rateFactor;
   private final Configuration conf;
   private final ReaderThread rThread;
   private final AtomicInteger sequence;
@@ -83,6 +91,7 @@
       Path scratch, Configuration conf, CountDownLatch startFlag) {
     sequence = new AtomicInteger(0);
     this.scratch = scratch;
+    this.rateFactor = conf.getFloat(Gridmix.GRIDMIX_SUB_MUL, 1.0f);
     this.jobProducer = jobProducer;
     this.conf = new Configuration(conf);
     this.submitter = submitter;
@@ -90,6 +99,61 @@
     this.rThread = new ReaderThread();
   }
 
+  static class MinTaskInfo extends TaskInfo {
+    public MinTaskInfo(TaskInfo info) {
+      super(info.getInputBytes(), info.getInputRecords(),
+            info.getOutputBytes(), info.getOutputRecords(),
+            info.getTaskMemory());
+    }
+    public long getInputBytes() {
+      return Math.max(0, super.getInputBytes());
+    }
+    public int getInputRecords() {
+      return Math.max(0, super.getInputRecords());
+    }
+    public long getOutputBytes() {
+      return Math.max(0, super.getOutputBytes());
+    }
+    public int getOutputRecords() {
+      return Math.max(0, super.getOutputRecords());
+    }
+    public long getTaskMemory() {
+      return Math.max(0, super.getTaskMemory());
+    }
+  }
+
+  static class FilterJobStory implements JobStory {
+
+    protected final JobStory job;
+
+    public FilterJobStory(JobStory job) {
+      this.job = job;
+    }
+    public JobConf getJobConf() { return job.getJobConf(); }
+    public String getName() { return job.getName(); }
+    public JobID getJobID() { return job.getJobID(); }
+    public String getUser() { return job.getUser(); }
+    public long getSubmissionTime() { return job.getSubmissionTime(); }
+    public InputSplit[] getInputSplits() { return job.getInputSplits(); }
+    public int getNumberMaps() { return job.getNumberMaps(); }
+    public int getNumberReduces() { return job.getNumberReduces(); }
+    public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
+      return job.getTaskInfo(taskType, taskNumber);
+    }
+    public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType, int taskNumber,
+        int taskAttemptNumber) {
+      return job.getTaskAttemptInfo(taskType, taskNumber, taskAttemptNumber);
+    }
+    public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(
+        int taskNumber, int taskAttemptNumber, int locality) {
+      return job.getMapTaskAttemptInfoAdjusted(
+          taskNumber, taskAttemptNumber, locality);
+    }
+    public Values getOutcome() {
+      return job.getOutcome();
+    }
+  }
+
   /**
    * Worker thread responsible for reading descriptions, assigning sequence
    * numbers, and normalizing time.
@@ -107,7 +171,12 @@
       } while (job != null
           && (job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS ||
               job.getSubmissionTime() < 0));
-      return job;
+      return null == job ? null : new FilterJobStory(job) {
+          @Override
+          public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
+            return new MinTaskInfo(this.job.getTaskInfo(taskType, taskNumber));
+          }
+        };
     }
 
     @Override
@@ -133,11 +202,12 @@
             }
             final long current = job.getSubmissionTime();
             if (current < last) {
-              throw new IOException(
-                  "JobStories are not ordered by submission time.");
+              LOG.warn("Job " + job.getJobID() + " out of order");
+              continue;
             }
             last = current;
-            submitter.add(new GridmixJob(conf, initTime + (current - first),
+            submitter.add(new GridmixJob(conf, initTime +
+                  Math.round(rateFactor * (current - first)),
                 job, scratch, sequence.getAndIncrement()));
           } catch (IOException e) {
             JobFactory.this.error = e;
@@ -179,8 +249,8 @@
   /**
    * Wait for the reader thread to exhaust the job trace.
    */
-  public void join() throws InterruptedException {
-    rThread.join();
+  public void join(long millis) throws InterruptedException {
+    rThread.join(millis);
   }
 
   /**

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java?rev=889778&r1=889777&r2=889778&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java Fri Dec 11 19:32:58 2009
@@ -47,7 +47,8 @@
   private final MonitorThread mThread;
   private final BlockingQueue<Job> runningJobs;
   private final long pollDelayMillis;
-  private volatile boolean graceful = false;
+  private boolean graceful = false;
+  private boolean shutdown = false;
 
   /**
    * Create a JobMonitor with a default polling interval of 5s.
@@ -59,7 +60,7 @@
   /**
    * Create a JobMonitor that sleeps for the specified duration after
    * polling a still-running job.
-   * @param pollDelaySec Delay after polling a running job
+   * @param pollDelay Delay after polling a running job
    * @param unit Time unit for pollDelaySec (rounded to milliseconds)
    */
   public JobMonitor(int pollDelay, TimeUnit unit) {
@@ -80,14 +81,14 @@
    * Temporary hook for recording job success.
    */
   protected void onSuccess(Job job) {
-    LOG.info(job.getJobName() + " succeeded");
+    LOG.info(job.getJobName() + " (" + job.getID() + ")" + " success");
   }
 
   /**
    * Temporary hook for recording job failure.
    */
   protected void onFailure(Job job) {
-    LOG.info(job.getJobName() + " failed");
+    LOG.info(job.getJobName() + " (" + job.getID() + ")" + " failure");
   }
 
   /**
@@ -128,20 +129,30 @@
 
     @Override
     public void run() {
-      boolean interrupted = false;
+      boolean graceful;
+      boolean shutdown;
       while (true) {
         try {
           synchronized (mJobs) {
+            graceful = JobMonitor.this.graceful;
+            shutdown = JobMonitor.this.shutdown;
             runningJobs.drainTo(mJobs);
           }
 
-          final boolean graceful = JobMonitor.this.graceful;
           // shutdown conditions; either shutdown requested and all jobs
           // have completed or abort requested and there are recently
-          // submitted jobs not yet accounted for
-          if (interrupted && ((!graceful && runningJobs.isEmpty()) ||
-                               (graceful && mJobs.isEmpty()))) {
-            break;
+          // submitted jobs not in the monitored set
+          if (shutdown) {
+            if (!graceful) {
+              while (!runningJobs.isEmpty()) {
+                synchronized (mJobs) {
+                  runningJobs.drainTo(mJobs);
+                }
+              }
+              break;
+            } else if (mJobs.isEmpty()) {
+              break;
+            }
           }
           while (!mJobs.isEmpty()) {
             Job job;
@@ -161,14 +172,16 @@
                 // reset it here
                 Thread.currentThread().interrupt();
               } else {
-                LOG.warn("Lost job " + job.getJobName(), e);
+                LOG.warn("Lost job " + (null == job.getJobName()
+                     ? "<unknown>" : job.getJobName()), e);
                 continue;
               }
             }
             synchronized (mJobs) {
               if (!mJobs.offer(job)) {
-                LOG.error("Lost job " + job.getJobName()); // should never
-                                                           // happen
+                LOG.error("Lost job " + (null == job.getJobName()
+                     ? "<unknown>" : job.getJobName())); // should never
+                                                         // happen
               }
             }
             break;
@@ -176,7 +189,7 @@
           try {
             TimeUnit.MILLISECONDS.sleep(pollDelayMillis);
           } catch (InterruptedException e) {
-            interrupted = true;
+            shutdown = true;
             continue;
           }
         } catch (Throwable e) {
@@ -198,8 +211,8 @@
    * called. Note that, since submission may be sporatic, this will hang
    * if no form of shutdown has been requested.
    */
-  public void join() throws InterruptedException {
-    mThread.join();
+  public void join(long millis) throws InterruptedException {
+    mThread.join(millis);
   }
 
   /**
@@ -207,7 +220,10 @@
    * Upstream submitter is assumed dead.
    */
   public void abort() {
-    graceful = false;
+    synchronized (mJobs) {
+      graceful = false;
+      shutdown = true;
+    }
     mThread.interrupt();
   }
 
@@ -216,7 +232,10 @@
    * Upstream submitter is assumed dead.
    */
   public void shutdown() {
-    graceful = true;
+    synchronized (mJobs) {
+      graceful = true;
+      shutdown = true;
+    }
     mThread.interrupt();
   }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java?rev=889778&r1=889777&r2=889778&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java Fri Dec 11 19:32:58 2009
@@ -18,6 +18,7 @@
 package org.apache.hadoop.mapred.gridmix;
 
 import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
@@ -96,6 +97,10 @@
               " (" + job.getJob().getID() + ")");
         } catch (IOException e) {
           LOG.warn("Failed to submit " + job.getJob().getJobName(), e);
+          if (e.getCause() instanceof ClosedByInterruptException) {
+            throw new InterruptedException("Failed to submit " +
+                job.getJob().getJobName());
+          }
         } catch (ClassNotFoundException e) {
           LOG.warn("Failed to submit " + job.getJob().getJobName(), e);
         }
@@ -144,11 +149,11 @@
    * Continue running until all queued jobs have been submitted to the
    * cluster.
    */
-  public void join() throws InterruptedException {
+  public void join(long millis) throws InterruptedException {
     if (!shutdown) {
       throw new IllegalStateException("Cannot wait for active submit thread");
     }
-    sched.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+    sched.awaitTermination(millis, TimeUnit.MILLISECONDS);
   }
 
   /**

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReadRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReadRecordFactory.java?rev=889778&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReadRecordFactory.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReadRecordFactory.java Fri Dec 11 19:32:58 2009
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+
+/**
+ * For every record consumed, read key + val bytes from the stream provided.
+ */
+class ReadRecordFactory extends RecordFactory {
+
+  /**
+   * Size of internal, scratch buffer to read from internal stream.
+   */
+  public static final String GRIDMIX_READ_BUF_SIZE = "gridmix.read.buffer.size";
+
+  private final byte[] buf;
+  private final InputStream src;
+  private final RecordFactory factory;
+
+  /**
+   * @param targetBytes Expected byte count.
+   * @param targetRecords Expected record count.
+   * @param src Stream to read bytes.
+   * @param conf Used to establish read buffer size. @see #GRIDMIX_READ_BUF_SIZE
+   */
+  public ReadRecordFactory(long targetBytes, long targetRecords,
+      InputStream src, Configuration conf) {
+    this(new AvgRecordFactory(targetBytes, targetRecords, conf), src, conf);
+  }
+
+  /**
+   * @param factory Factory to draw record sizes.
+   * @param src Stream to read bytes.
+   * @param conf Used to establish read buffer size. @see #GRIDMIX_READ_BUF_SIZE
+   */
+  public ReadRecordFactory(RecordFactory factory, InputStream src,
+      Configuration conf) {
+    this.src = src;
+    this.factory = factory;
+    buf = new byte[conf.getInt(GRIDMIX_READ_BUF_SIZE, 64 * 1024)];
+  }
+
+  @Override
+  public boolean next(GridmixKey key, GridmixRecord val) throws IOException {
+    if (!factory.next(key, val)) {
+      return false;
+    }
+    for (int len = (null == key ? 0 : key.getSize()) + val.getSize();
+         len > 0; len -= buf.length) {
+      IOUtils.readFully(src, buf, 0, Math.min(buf.length, len));
+    }
+    return true;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return factory.getProgress();
+  }
+
+  @Override
+  public void close() throws IOException {
+    IOUtils.cleanup(null, src);
+    factory.close();
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RecordFactory.java?rev=889778&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RecordFactory.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RecordFactory.java Fri Dec 11 19:32:58 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Interface for producing records as inputs and outputs to tasks.
+ */
+abstract class RecordFactory implements Closeable {
+
+  /**
+   * Transform the given record or perform some operation.
+   * @return true if the record should be emitted.
+   */
+  public abstract boolean next(GridmixKey key, GridmixRecord val)
+    throws IOException;
+
+  /**
+   * Estimate of exhausted record capacity.
+   */
+  public abstract float getProgress() throws IOException;
+
+}

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java?rev=889778&r1=889777&r2=889778&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java Fri Dec 11 19:32:58 2009
@@ -19,6 +19,7 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -27,7 +28,6 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -79,21 +79,58 @@
     public void close() { }
   }
 
+  static double[] getDistr(Random r, double mindist, int size) {
+    assert 0.0 <= mindist && mindist <= 1.0;
+    final double min = mindist / size;
+    final double rem = 1.0 - min * size;
+    final double[] tmp = new double[size];
+    for (int i = 0; i < tmp.length - 1; ++i) {
+      tmp[i] = r.nextDouble() * rem;
+    }
+    tmp[tmp.length - 1] = rem;
+    Arrays.sort(tmp);
+
+    final double[] ret = new double[size];
+    ret[0] = tmp[0] + min;
+    for (int i = 1; i < size; ++i) {
+      ret[i] = tmp[i] - tmp[i-1] + min;
+    }
+    return ret;
+  }
+
   /**
    * Generate random task data for a synthetic job.
    */
   static class MockJob implements JobStory {
 
-    public static final String MIN_BYTES_IN =   "gridmix.test.min.bytes.in";
-    public static final String VAR_BYTES_IN =   "gridmix.test.var.bytes.in";
-    public static final String MIN_BYTES_OUT =  "gridmix.test.min.bytes.out";
-    public static final String VAR_BYTES_OUT =  "gridmix.test.var.bytes.out";
-
-    public static final String MIN_REC_SIZE =   "gridmix.test.min.rec.bytes";
-    public static final String VAR_REC_SIZE =   "gridmix.test.var.rec.bytes";
-
-    public static final String MAX_MAPS =       "gridmix.test.max.maps";
-    public static final String MAX_REDS =       "gridmix.test.max.reduces";
+    static final int MIN_REC = 1 << 14;
+    static final int MIN_BYTES = 1 << 20;
+    static final int VAR_REC = 1 << 14;
+    static final int VAR_BYTES = 4 << 20;
+    static final int MAX_MAP = 5;
+    static final int MAX_RED = 3;
+
+    static void initDist(Random r, double min, int[] recs, long[] bytes,
+        long tot_recs, long tot_bytes) {
+      final double[] recs_dist = getDistr(r, min, recs.length);
+      final double[] bytes_dist = getDistr(r, min, recs.length);
+      long totalbytes = 0L;
+      int totalrecs = 0;
+      for (int i = 0; i < recs.length; ++i) {
+        recs[i] = (int) Math.round(tot_recs * recs_dist[i]);
+        bytes[i] = Math.round(tot_bytes * bytes_dist[i]);
+        totalrecs += recs[i];
+        totalbytes += bytes[i];
+      }
+      // Add/remove excess
+      recs[0] += totalrecs - tot_recs;
+      bytes[0] += totalbytes - tot_bytes;
+      if (LOG.isInfoEnabled()) {
+        LOG.info("DIST: " + Arrays.toString(recs) + " " +
+            tot_recs + "/" + totalrecs + " " +
+            Arrays.toString(bytes) + " " + tot_bytes + "/" + totalbytes);
+      }
+    }
 
     private static final AtomicInteger seq = new AtomicInteger(0);
     // set timestamps in the past
@@ -101,97 +138,65 @@
       new AtomicLong(System.currentTimeMillis() -
         TimeUnit.MILLISECONDS.convert(60, TimeUnit.DAYS));
 
+    private final int id;
     private final String name;
     private final int[] m_recsIn, m_recsOut, r_recsIn, r_recsOut;
     private final long[] m_bytesIn, m_bytesOut, r_bytesIn, r_bytesOut;
     private final long submitTime;
 
-    public MockJob() {
-      this(new Configuration(false));
-    }
-
     public MockJob(Configuration conf) {
-      this(conf.getInt(MIN_BYTES_IN, 1 << 20),
-           conf.getInt(VAR_BYTES_IN, 5 << 20),
-           conf.getInt(MIN_BYTES_OUT, 1 << 20),
-           conf.getInt(VAR_BYTES_OUT, 5 << 20),
-           conf.getInt(MIN_REC_SIZE , 100),
-           conf.getInt(VAR_REC_SIZE , 1 << 15),
-           conf.getInt(MAX_MAPS, 5),
-           conf.getInt(MAX_REDS, 3));
-    }
-
-    public MockJob(int min_bytes_in, int var_bytes_in,
-                   int min_bytes_out, int var_bytes_out,
-                   int min_rec_size, int var_rec_size,
-                   int max_maps, int max_reds) {
       final Random r = new Random();
-      name = String.format("MOCKJOB%05d", seq.getAndIncrement());
+      final long seed = r.nextLong();
+      r.setSeed(seed);
+      id = seq.getAndIncrement();
+      name = String.format("MOCKJOB%05d", id);
+      LOG.info(name + " (" + seed + ")");
       submitTime = timestamp.addAndGet(TimeUnit.MILLISECONDS.convert(
             r.nextInt(10), TimeUnit.SECONDS));
-      int iMapBTotal = 0, oMapBTotal = 0, iRedBTotal = 0, oRedBTotal = 0;
-      int iMapRTotal = 0, oMapRTotal = 0, iRedRTotal = 0, oRedRTotal = 0;
-
-      final int iAvgMapRec = r.nextInt(var_rec_size) + min_rec_size;
-      final int oAvgMapRec = r.nextInt(var_rec_size) + min_rec_size;
-
-      // MAP
-      m_bytesIn = new long[r.nextInt(max_maps) + 1];
-      m_bytesOut = new long[m_bytesIn.length];
-      m_recsIn = new int[m_bytesIn.length];
-      m_recsOut = new int[m_bytesIn.length];
-      for (int i = 0; i < m_bytesIn.length; ++i) {
-        m_bytesIn[i] = r.nextInt(var_bytes_in) + min_bytes_in;
-        iMapBTotal += m_bytesIn[i];
-        m_recsIn[i] = (int)(m_bytesIn[i] / iAvgMapRec);
-        iMapRTotal += m_recsIn[i];
-
-        m_bytesOut[i] = r.nextInt(var_bytes_out) + min_bytes_out;
-        oMapBTotal += m_bytesOut[i];
-        m_recsOut[i] = (int)(m_bytesOut[i] / oAvgMapRec);
-        oMapRTotal += m_recsOut[i];
-      }
-
-      // REDUCE
-      r_bytesIn = new long[r.nextInt(max_reds) + 1];
-      r_bytesOut = new long[r_bytesIn.length];
-      r_recsIn = new int[r_bytesIn.length];
-      r_recsOut = new int[r_bytesIn.length];
-      iRedBTotal = oMapBTotal;
-      iRedRTotal = oMapRTotal;
-      for (int j = 0; iRedBTotal > 0; ++j) {
-        int i = j % r_bytesIn.length;
-        final int inc = r.nextInt(var_bytes_out) + min_bytes_out;
-        r_bytesIn[i] += inc;
-        iRedBTotal -= inc;
-        if (iRedBTotal < 0) {
-          r_bytesIn[i] += iRedBTotal;
-          iRedBTotal = 0;
-        }
-        iRedRTotal += r_recsIn[i];
-        r_recsIn[i] = (int)(r_bytesIn[i] / oAvgMapRec);
-        iRedRTotal -= r_recsIn[i];
-        if (iRedRTotal < 0) {
-          r_recsIn[i] += iRedRTotal;
-          iRedRTotal = 0;
-        }
 
-        r_bytesOut[i] = r.nextInt(var_bytes_in) + min_bytes_in;
-        oRedBTotal += r_bytesOut[i];
-        r_recsOut[i] = (int)(r_bytesOut[i] / iAvgMapRec);
-        oRedRTotal += r_recsOut[i];
-      }
-      r_recsIn[0] += iRedRTotal;
+      m_recsIn = new int[r.nextInt(MAX_MAP) + 1];
+      m_bytesIn = new long[m_recsIn.length];
+      m_recsOut = new int[m_recsIn.length];
+      m_bytesOut = new long[m_recsIn.length];
+
+      r_recsIn = new int[r.nextInt(MAX_RED) + 1];
+      r_bytesIn = new long[r_recsIn.length];
+      r_recsOut = new int[r_recsIn.length];
+      r_bytesOut = new long[r_recsIn.length];
+
+      // map input
+      final long map_recs = r.nextInt(VAR_REC) + MIN_REC;
+      final long map_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
+      initDist(r, 0.5, m_recsIn, m_bytesIn, map_recs, map_bytes);
+
+      // shuffle
+      final long shuffle_recs = r.nextInt(VAR_REC) + MIN_REC;
+      final long shuffle_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
+      initDist(r, 0.4, m_recsOut, m_bytesOut, shuffle_recs, shuffle_bytes);
+      initDist(r, 0.8, r_recsIn, r_bytesIn, shuffle_recs, shuffle_bytes);
+
+      // reduce output
+      final long red_recs = r.nextInt(VAR_REC) + MIN_REC;
+      final long red_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
+      initDist(r, 0.4, r_recsOut, r_bytesOut, red_recs, red_bytes);
 
       if (LOG.isDebugEnabled()) {
-        iRedRTotal = 0;
-        iRedBTotal = 0;
-        for (int i = 0; i < r_bytesIn.length; ++i) {
+        int iMapBTotal = 0, oMapBTotal = 0, iRedBTotal = 0, oRedBTotal = 0;
+        int iMapRTotal = 0, oMapRTotal = 0, iRedRTotal = 0, oRedRTotal = 0;
+        for (int i = 0; i < m_recsIn.length; ++i) {
+          iMapRTotal += m_recsIn[i];
+          iMapBTotal += m_bytesIn[i];
+          oMapRTotal += m_recsOut[i];
+          oMapBTotal += m_bytesOut[i];
+        }
+        for (int i = 0; i < r_recsIn.length; ++i) {
           iRedRTotal += r_recsIn[i];
           iRedBTotal += r_bytesIn[i];
+          oRedRTotal += r_recsOut[i];
+          oRedBTotal += r_bytesOut[i];
         }
         LOG.debug(String.format("%s: M (%03d) %6d/%10d -> %6d/%10d" +
-                             " R (%03d) %6d/%10d -> %6d/%10d @%d", name,
+                                   " R (%03d) %6d/%10d -> %6d/%10d @%d", name,
             m_bytesIn.length, iMapRTotal, iMapBTotal, oMapRTotal, oMapBTotal,
             r_bytesIn.length, iRedRTotal, iRedBTotal, oRedRTotal, oRedBTotal,
             submitTime));
@@ -210,7 +215,7 @@
 
     @Override
     public JobID getJobID() {
-      return null;
+      return new JobID("job_mock_" + name, id);
     }
 
     @Override

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFilePool.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFilePool.java?rev=889778&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFilePool.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFilePool.java Fri Dec 11 19:32:58 2009
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Random;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+
+public class TestFilePool {
+
+  static final Log LOG = LogFactory.getLog(TestFileQueue.class);
+  static final int NFILES = 26;
+  static final Path base = getBaseDir();
+
+  static Path getBaseDir() {
+    try {
+      final Configuration conf = new Configuration();
+      final FileSystem fs = FileSystem.getLocal(conf).getRaw();
+      return new Path(System.getProperty("test.build.data", "/tmp"),
+          "testFilePool").makeQualified(fs);
+    } catch (IOException e) {
+      fail();
+    }
+    return null;
+  }
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    final Configuration conf = new Configuration();
+    final FileSystem fs = FileSystem.getLocal(conf).getRaw();
+    fs.delete(base, true);
+    final Random r = new Random();
+    final long seed = r.nextLong();
+    r.setSeed(seed);
+    LOG.info("seed: " + seed);
+    fs.mkdirs(base);
+    for (int i = 0; i < NFILES; ++i) {
+      Path file = base;
+      for (double d = 0.6; d > 0.0; d *= 0.8) {
+        if (r.nextDouble() < d) {
+          file = new Path(base, Integer.toString(r.nextInt(3)));
+          continue;
+        }
+        break;
+      }
+      OutputStream out = null;
+      try {
+        out = fs.create(new Path(file, "" + (char)('A' + i)));
+        final byte[] b = new byte[1024];
+        Arrays.fill(b, (byte)('A' + i));
+        for (int len = ((i % 13) + 1) * 1024; len > 0; len -= 1024) {
+          out.write(b);
+        }
+      } finally {
+        if (out != null) {
+          out.close();
+        }
+      }
+    }
+  }
+
+  @AfterClass
+  public static void cleanup() throws IOException {
+    final Configuration conf = new Configuration();
+    final FileSystem fs = FileSystem.getLocal(conf).getRaw();
+    fs.delete(base, true);
+  }
+
+  @Test
+  public void testUnsuitable() throws Exception {
+    try {
+      final Configuration conf = new Configuration();
+      // all files 13k or less
+      conf.setLong(FilePool.GRIDMIX_MIN_FILE, 14 * 1024);
+      final FilePool pool = new FilePool(conf, base);
+      pool.refresh();
+    } catch (IOException e) {
+      return;
+    }
+    fail();
+  }
+
+  @Test
+  public void testPool() throws Exception {
+    final Random r = new Random();
+    final Configuration conf = new Configuration();
+    conf.setLong(FilePool.GRIDMIX_MIN_FILE, 3 * 1024);
+    final FilePool pool = new FilePool(conf, base);
+    pool.refresh();
+    final ArrayList<FileStatus> files = new ArrayList<FileStatus>();
+
+    // ensure 1k, 2k files excluded
+    final int expectedPoolSize = (NFILES / 2 * (NFILES / 2 + 1) - 6) * 1024;
+    assertEquals(expectedPoolSize, pool.getInputFiles(Long.MAX_VALUE, files));
+    assertEquals(NFILES - 4, files.size());
+
+    // exact match
+    files.clear();
+    assertEquals(expectedPoolSize, pool.getInputFiles(expectedPoolSize, files));
+
+    // match random within 12k
+    files.clear();
+    final long rand = r.nextInt(expectedPoolSize);
+    assertTrue("Missed: " + rand,
+        (NFILES / 2) * 1024 > rand - pool.getInputFiles(rand, files));
+
+    // all files
+    conf.setLong(FilePool.GRIDMIX_MIN_FILE, 0);
+    pool.refresh();
+    files.clear();
+    assertEquals((NFILES / 2 * (NFILES / 2 + 1)) * 1024,
+        pool.getInputFiles(Long.MAX_VALUE, files));
+  }
+
+  void checkSplitEq(FileSystem fs, CombineFileSplit split, long bytes)
+      throws Exception {
+    long splitBytes = 0L;
+    HashSet<Path> uniq = new HashSet<Path>();
+    for (int i = 0; i < split.getNumPaths(); ++i) {
+      splitBytes += split.getLength(i);
+      assertTrue(
+          split.getLength(i) <= fs.getFileStatus(split.getPath(i)).getLen());
+      assertFalse(uniq.contains(split.getPath(i)));
+      uniq.add(split.getPath(i));
+    }
+    assertEquals(bytes, splitBytes);
+  }
+
+  @Test
+  public void testStriper() throws Exception {
+    final Random r = new Random();
+    final Configuration conf = new Configuration();
+    final FileSystem fs = FileSystem.getLocal(conf).getRaw();
+    conf.setLong(FilePool.GRIDMIX_MIN_FILE, 3 * 1024);
+    final FilePool pool = new FilePool(conf, base) {
+      @Override
+      public BlockLocation[] locationsFor(FileStatus stat, long start, long len)
+          throws IOException {
+        return new BlockLocation[] { new BlockLocation() };
+      }
+    };
+    pool.refresh();
+
+    final int expectedPoolSize = (NFILES / 2 * (NFILES / 2 + 1) - 6) * 1024;
+    final InputStriper striper = new InputStriper(pool, expectedPoolSize);
+    int last = 0;
+    for (int i = 0; i < expectedPoolSize;
+        last = Math.min(expectedPoolSize - i, r.nextInt(expectedPoolSize))) {
+      checkSplitEq(fs, striper.splitFor(pool, last, 0), last);
+      i += last;
+    }
+    final InputStriper striper2 = new InputStriper(pool, expectedPoolSize);
+    checkSplitEq(fs, striper2.splitFor(pool, expectedPoolSize, 0),
+        expectedPoolSize);
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFileQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFileQueue.java?rev=889778&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFileQueue.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFileQueue.java Fri Dec 11 19:32:58 2009
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+
+public class TestFileQueue {
+
+  static final Log LOG = LogFactory.getLog(TestFileQueue.class);
+  static final int NFILES = 4;
+  static final int BLOCK = 256;
+  static final Path[] paths = new Path[NFILES];
+  static final String[] loc = new String[NFILES];
+  static final long[] start = new long[NFILES];
+  static final long[] len = new long[NFILES];
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    final Configuration conf = new Configuration();
+    final FileSystem fs = FileSystem.getLocal(conf).getRaw();
+    final Path p = new Path(System.getProperty("test.build.data", "/tmp"),
+        "testFileQueue").makeQualified(fs);
+    fs.delete(p, true);
+    final byte[] b = new byte[BLOCK];
+    for (int i = 0; i < NFILES; ++i) {
+      Arrays.fill(b, (byte)('A' + i));
+      paths[i] = new Path(p, "" + (char)('A' + i));
+      OutputStream f = null;
+      try {
+        f = fs.create(paths[i]);
+        f.write(b);
+      } finally {
+        if (f != null) {
+          f.close();
+        }
+      }
+    }
+  }
+
+  @AfterClass
+  public static void cleanup() throws IOException {
+    final Configuration conf = new Configuration();
+    final FileSystem fs = FileSystem.getLocal(conf).getRaw();
+    final Path p = new Path(System.getProperty("test.build.data", "/tmp"),
+        "testFileQueue").makeQualified(fs);
+    fs.delete(p, true);
+  }
+
+  static ByteArrayOutputStream fillVerif() throws IOException {
+    final byte[] b = new byte[BLOCK];
+    final ByteArrayOutputStream out = new ByteArrayOutputStream();
+    for (int i = 0; i < NFILES; ++i) {
+      Arrays.fill(b, (byte)('A' + i));
+      out.write(b, 0, (int)len[i]);
+    }
+    return out;
+  }
+
+  @Test
+  public void testRepeat() throws Exception {
+    final Configuration conf = new Configuration();
+    Arrays.fill(loc, "");
+    Arrays.fill(start, 0L);
+    Arrays.fill(len, BLOCK);
+
+    final ByteArrayOutputStream out = fillVerif();
+    final FileQueue q =
+      new FileQueue(new CombineFileSplit(paths, start, len, loc), conf);
+    final byte[] verif = out.toByteArray();
+    final byte[] check = new byte[2 * NFILES * BLOCK];
+    q.read(check, 0, NFILES * BLOCK);
+    assertArrayEquals(verif, Arrays.copyOf(check, NFILES * BLOCK));
+
+    final byte[] verif2 = new byte[2 * NFILES * BLOCK];
+    System.arraycopy(verif, 0, verif2, 0, verif.length);
+    System.arraycopy(verif, 0, verif2, verif.length, verif.length);
+    q.read(check, 0, 2 * NFILES * BLOCK);
+    assertArrayEquals(verif2, check);
+
+  }
+
+  @Test
+  public void testUneven() throws Exception {
+    final Configuration conf = new Configuration();
+    Arrays.fill(loc, "");
+    Arrays.fill(start, 0L);
+    Arrays.fill(len, BLOCK);
+
+    final int B2 = BLOCK / 2;
+    for (int i = 0; i < NFILES; i += 2) {
+      start[i] += B2;
+      len[i] -= B2;
+    }
+    final FileQueue q =
+      new FileQueue(new CombineFileSplit(paths, start, len, loc), conf);
+    final ByteArrayOutputStream out = fillVerif();
+    final byte[] verif = out.toByteArray();
+    final byte[] check = new byte[NFILES / 2 * BLOCK + NFILES / 2 * B2];
+    q.read(check, 0, verif.length);
+    assertArrayEquals(verif, Arrays.copyOf(check, verif.length));
+    q.read(check, 0, verif.length);
+    assertArrayEquals(verif, Arrays.copyOf(check, verif.length));
+  }
+
+  @Test
+  public void testEmpty() throws Exception {
+    final Configuration conf = new Configuration();
+    // verify OK if unused
+    final FileQueue q = new FileQueue(new CombineFileSplit(
+          new Path[0], new long[0], new long[0], new String[0]), conf);
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java?rev=889778&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java Fri Dec 11 19:32:58 2009
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+
+public class TestGridmixRecord {
+  private static final Log LOG = LogFactory.getLog(TestGridmixRecord.class);
+
+  static void lengthTest(GridmixRecord x, GridmixRecord y, int min,
+      int max) throws Exception {
+    final Random r = new Random();
+    final long seed = r.nextLong();
+    r.setSeed(seed);
+    LOG.info("length: " + seed);
+    final DataInputBuffer in = new DataInputBuffer();
+    final DataOutputBuffer out1 = new DataOutputBuffer();
+    final DataOutputBuffer out2 = new DataOutputBuffer();
+    for (int i = min; i < max; ++i) {
+      setSerialize(x, r.nextLong(), i, out1);
+      // check write
+      assertEquals(i, out1.getLength());
+      // write to stream
+      x.write(out2);
+      // check read
+      in.reset(out1.getData(), 0, out1.getLength());
+      y.readFields(in);
+      assertEquals(i, x.getSize());
+      assertEquals(i, y.getSize());
+    }
+    // check stream read
+    in.reset(out2.getData(), 0, out2.getLength());
+    for (int i = min; i < max; ++i) {
+      y.readFields(in);
+      assertEquals(i, y.getSize());
+    }
+  }
+
+  static void randomReplayTest(GridmixRecord x, GridmixRecord y, int min,
+      int max) throws Exception {
+    final Random r = new Random();
+    final long seed = r.nextLong();
+    r.setSeed(seed);
+    LOG.info("randReplay: " + seed);
+    final DataOutputBuffer out1 = new DataOutputBuffer();
+    for (int i = min; i < max; ++i) {
+      final int s = out1.getLength();
+      x.setSeed(r.nextLong());
+      x.setSize(i);
+      x.write(out1);
+      assertEquals(i, out1.getLength() - s);
+    }
+    final DataInputBuffer in = new DataInputBuffer();
+    in.reset(out1.getData(), 0, out1.getLength());
+    final DataOutputBuffer out2 = new DataOutputBuffer();
+    // deserialize written records, write to separate buffer
+    for (int i = min; i < max; ++i) {
+      final int s = in.getPosition();
+      y.readFields(in);
+      assertEquals(i, in.getPosition() - s);
+      y.write(out2);
+    }
+    // verify written contents match
+    assertEquals(out1.getLength(), out2.getLength());
+    // assumes that writes will grow buffer deterministically
+    assertEquals("Bad test", out1.getData().length, out2.getData().length);
+    assertArrayEquals(out1.getData(), out2.getData());
+  }
+
+  static void eqSeedTest(GridmixRecord x, GridmixRecord y, int max)
+      throws Exception {
+    final Random r = new Random();
+    final long s = r.nextLong();
+    r.setSeed(s);
+    LOG.info("eqSeed: " + s);
+    assertEquals(x.fixedBytes(), y.fixedBytes());
+    final int min = x.fixedBytes() + 1;
+    final DataOutputBuffer out1 = new DataOutputBuffer();
+    final DataOutputBuffer out2 = new DataOutputBuffer();
+    for (int i = min; i < max; ++i) {
+      final long seed = r.nextLong();
+      setSerialize(x, seed, i, out1);
+      setSerialize(y, seed, i, out2);
+      assertEquals(x, y);
+      assertEquals(x.hashCode(), y.hashCode());
+
+      // verify written contents match
+      assertEquals(out1.getLength(), out2.getLength());
+      // assumes that writes will grow buffer deterministically
+      assertEquals("Bad test", out1.getData().length, out2.getData().length);
+      assertArrayEquals(out1.getData(), out2.getData());
+    }
+  }
+
+  static void binSortTest(GridmixRecord x, GridmixRecord y, int min,
+      int max, WritableComparator cmp) throws Exception {
+    final Random r = new Random();
+    final long s = r.nextLong();
+    r.setSeed(s);
+    LOG.info("sort: " + s);
+    final DataOutputBuffer out1 = new DataOutputBuffer();
+    final DataOutputBuffer out2 = new DataOutputBuffer();
+    for (int i = min; i < max; ++i) {
+      final long seed1 = r.nextLong();
+      setSerialize(x, seed1, i, out1);
+      assertEquals(0, x.compareSeed(seed1, Math.max(0, i - x.fixedBytes())));
+
+      final long seed2 = r.nextLong();
+      setSerialize(y, seed2, i, out2);
+      assertEquals(0, y.compareSeed(seed2, Math.max(0, i - x.fixedBytes())));
+
+      // for eq sized records, ensure byte cmp where req
+      final int chk = WritableComparator.compareBytes(
+          out1.getData(), 0, out1.getLength(),
+          out2.getData(), 0, out2.getLength());
+      assertEquals(chk, x.compareTo(y));
+      assertEquals(chk, cmp.compare(
+            out1.getData(), 0, out1.getLength(),
+            out2.getData(), 0, out2.getLength()));
+      // write second copy, compare eq
+      final int s1 = out1.getLength();
+      x.write(out1);
+      assertEquals(0, cmp.compare(out1.getData(), 0, s1,
+            out1.getData(), s1, out1.getLength() - s1));
+      final int s2 = out2.getLength();
+      y.write(out2);
+      assertEquals(0, cmp.compare(out2.getData(), 0, s2,
+            out2.getData(), s2, out2.getLength() - s2));
+      assertEquals(chk, cmp.compare(out1.getData(), 0, s1,
+            out2.getData(), s2, out2.getLength() - s2));
+    }
+  }
+
+  static void checkSpec(GridmixKey a, GridmixKey b) throws Exception {
+    final Random r = new Random();
+    final long s = r.nextLong();
+    r.setSeed(s);
+    LOG.info("spec: " + s);
+    final DataInputBuffer in = new DataInputBuffer();
+    final DataOutputBuffer out = new DataOutputBuffer();
+    a.setType(GridmixKey.REDUCE_SPEC);
+    b.setType(GridmixKey.REDUCE_SPEC);
+    for (int i = 0; i < 100; ++i) {
+      final int in_rec = r.nextInt(Integer.MAX_VALUE);
+      a.setReduceInputRecords(in_rec);
+      final int out_rec = r.nextInt(Integer.MAX_VALUE);
+      a.setReduceOutputRecords(out_rec);
+      final int out_bytes = r.nextInt(Integer.MAX_VALUE);
+      a.setReduceOutputBytes(out_bytes);
+      final int min = WritableUtils.getVIntSize(in_rec)
+                    + WritableUtils.getVIntSize(out_rec)
+                    + WritableUtils.getVIntSize(out_bytes);
+      assertEquals(min + 2, a.fixedBytes()); // meta + vint min
+      final int size = r.nextInt(1024) + a.fixedBytes() + 1;
+      setSerialize(a, r.nextLong(), size, out);
+      assertEquals(size, out.getLength());
+      assertTrue(a.equals(a));
+      assertEquals(0, a.compareTo(a));
+
+      in.reset(out.getData(), 0, out.getLength());
+
+      b.readFields(in);
+      assertEquals(size, b.getSize());
+      assertEquals(in_rec, b.getReduceInputRecords());
+      assertEquals(out_rec, b.getReduceOutputRecords());
+      assertEquals(out_bytes, b.getReduceOutputBytes());
+      assertTrue(a.equals(b));
+      assertEquals(0, a.compareTo(b));
+      assertEquals(a.hashCode(), b.hashCode());
+    }
+  }
+
+  static void setSerialize(GridmixRecord x, long seed, int size,
+      DataOutputBuffer out) throws IOException {
+    x.setSeed(seed);
+    x.setSize(size);
+    out.reset();
+    x.write(out);
+  }
+
+  @Test
+  public void testKeySpec() throws Exception {
+    final int min = 5;
+    final int max = 300;
+    final GridmixKey a = new GridmixKey(GridmixKey.REDUCE_SPEC, 1, 0L);
+    final GridmixKey b = new GridmixKey(GridmixKey.REDUCE_SPEC, 1, 0L);
+    lengthTest(a, b, min, max);
+    randomReplayTest(a, b, min, max);
+    binSortTest(a, b, min, max, new GridmixKey.Comparator());
+    // 2 fixed GR bytes, 1 type, 3 spec
+    eqSeedTest(a, b, max);
+    checkSpec(a, b);
+  }
+
+  @Test
+  public void testKeyData() throws Exception {
+    final int min = 2;
+    final int max = 300;
+    final GridmixKey a = new GridmixKey(GridmixKey.DATA, 1, 0L);
+    final GridmixKey b = new GridmixKey(GridmixKey.DATA, 1, 0L);
+    lengthTest(a, b, min, max);
+    randomReplayTest(a, b, min, max);
+    binSortTest(a, b, min, max, new GridmixKey.Comparator());
+    // 2 fixed GR bytes, 1 type
+    eqSeedTest(a, b, 300);
+  }
+
+  @Test
+  public void testBaseRecord() throws Exception {
+    final int min = 1;
+    final int max = 300;
+    final GridmixRecord a = new GridmixRecord();
+    final GridmixRecord b = new GridmixRecord();
+    lengthTest(a, b, min, max);
+    randomReplayTest(a, b, min, max);
+    binSortTest(a, b, min, max, new GridmixRecord.Comparator());
+    // 2 fixed GR bytes
+    eqSeedTest(a, b, 300);
+  }
+
+  public static void main(String[] argv) throws Exception {
+    boolean fail = false;
+    final TestGridmixRecord test = new TestGridmixRecord();
+    try { test.testKeySpec(); } catch (Exception e) {
+      fail = true;
+      e.printStackTrace();
+    }
+    try {test.testKeyData(); } catch (Exception e) {
+      fail = true;
+      e.printStackTrace();
+    }
+    try {test.testBaseRecord(); } catch (Exception e) {
+      fail = true;
+      e.printStackTrace();
+    }
+    System.exit(fail ? -1 : 0);
+  }
+
+  static void printDebug(GridmixRecord a, GridmixRecord b) throws IOException {
+    DataOutputBuffer out = new DataOutputBuffer();
+    a.write(out);
+    System.out.println("A " +
+        Arrays.toString(Arrays.copyOf(out.getData(), out.getLength())));
+    out.reset();
+    b.write(out);
+    System.out.println("B " +
+        Arrays.toString(Arrays.copyOf(out.getData(), out.getLength())));
+  }
+
+}

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java?rev=889778&r1=889777&r2=889778&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Fri Dec 11 19:32:58 2009
@@ -20,9 +20,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Comparator;
 import java.util.HashMap;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -35,8 +33,6 @@
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.CounterGroup;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskReport;
@@ -44,7 +40,6 @@
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.tools.rumen.JobStory;
 import org.apache.hadoop.tools.rumen.TaskInfo;
-import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import static org.apache.hadoop.mapreduce.TaskCounter.*;
 
@@ -96,7 +91,7 @@
 
   static class TestMonitor extends JobMonitor {
 
-    static final long SLOPBYTES = 5 * 1024;
+    static final long SLOPBYTES = 1024;
     private final int expected;
     private final BlockingQueue<Job> retiredJobs;
 
@@ -138,12 +133,12 @@
         final TaskReport[] mReports = job.getTaskReports(TaskType.MAP);
         assertEquals("Mismatched map count", nMaps, mReports.length);
         check(TaskType.MAP, job, spec, mReports,
-            0, 1, nReds * SLOPBYTES, nReds + 1);
+            0, 0, SLOPBYTES, nReds);
 
         final TaskReport[] rReports = job.getTaskReports(TaskType.REDUCE);
         assertEquals("Mismatched reduce count", nReds, rReports.length);
         check(TaskType.REDUCE, job, spec, rReports,
-            nMaps * SLOPBYTES, nMaps + 1, 0, 1);
+            nMaps * SLOPBYTES, 2 * nMaps, 0, 0);
       }
     }
 
@@ -176,74 +171,97 @@
                (int)counters.findCounter(MAP_OUTPUT_RECORDS).getValue();
 
             specInfo = spec.getTaskInfo(TaskType.MAP, i);
+            specInputRecords[i] = specInfo.getInputRecords();
+            specInputBytes[i] = specInfo.getInputBytes();
+            specOutputRecords[i] = specInfo.getOutputRecords();
+            specOutputBytes[i] = specInfo.getOutputBytes();
+            System.out.printf(type + " SPEC: %9d -> %9d :: %5d -> %5d\n",
+                 specInputBytes[i], specOutputBytes[i],
+                 specInputRecords[i], specOutputRecords[i]);
+            System.out.printf(type + " RUN:  %9d -> %9d :: %5d -> %5d\n",
+                 runInputBytes[i], runOutputBytes[i],
+                 runInputRecords[i], runOutputRecords[i]);
             break;
           case REDUCE:
-             runInputBytes[i] =
-               counters.findCounter(REDUCE_SHUFFLE_BYTES).getValue();
-             runInputRecords[i] =
-               (int)counters.findCounter(REDUCE_INPUT_RECORDS).getValue();
-             runOutputBytes[i] =
-               counters.findCounter("FileSystemCounters",
-                   "HDFS_BYTES_WRITTEN").getValue();
-             runOutputRecords[i] =
-               (int)counters.findCounter(REDUCE_OUTPUT_RECORDS).getValue();
+            runInputBytes[i] = 0;
+            runInputRecords[i] =
+              (int)counters.findCounter(REDUCE_INPUT_RECORDS).getValue();
+            runOutputBytes[i] =
+              counters.findCounter("FileSystemCounters",
+                  "HDFS_BYTES_WRITTEN").getValue();
+            runOutputRecords[i] =
+              (int)counters.findCounter(REDUCE_OUTPUT_RECORDS).getValue();
+
 
             specInfo = spec.getTaskInfo(TaskType.REDUCE, i);
+            // There is no reliable counter for reduce input bytes. The
+            // variable-length encoding of intermediate records and other noise
+            // make this quantity difficult to estimate. The shuffle and spec
+            // input bytes are included in debug output for reference, but are
+            // not checked
+            specInputBytes[i] = 0;
+            specInputRecords[i] = specInfo.getInputRecords();
+            specOutputRecords[i] = specInfo.getOutputRecords();
+            specOutputBytes[i] = specInfo.getOutputBytes();
+            System.out.printf(type + " SPEC: (%9d) -> %9d :: %5d -> %5d\n",
+                 specInfo.getInputBytes(), specOutputBytes[i],
+                 specInputRecords[i], specOutputRecords[i]);
+            System.out.printf(type + " RUN:  (%9d) -> %9d :: %5d -> %5d\n",
+                 counters.findCounter(REDUCE_SHUFFLE_BYTES).getValue(),
+                 runOutputBytes[i], runInputRecords[i], runOutputRecords[i]);
             break;
           default:
             specInfo = null;
             fail("Unexpected type: " + type);
         }
-        specInputBytes[i] = specInfo.getInputBytes();
-        specInputRecords[i] = specInfo.getInputRecords();
-        specOutputRecords[i] = specInfo.getOutputRecords();
-        specOutputBytes[i] = specInfo.getOutputBytes();
-        System.out.printf(type + " SPEC: %9d -> %9d :: %5d -> %5d\n",
-             specInputBytes[i], specOutputBytes[i],
-             specInputRecords[i], specOutputRecords[i]);
-        System.out.printf(type + " RUN:  %9d -> %9d :: %5d -> %5d\n",
-             runInputBytes[i], runOutputBytes[i],
-             runInputRecords[i], runOutputRecords[i]);
       }
 
       // Check input bytes
       Arrays.sort(specInputBytes);
       Arrays.sort(runInputBytes);
       for (int i = 0; i < runTasks.length; ++i) {
-        assertTrue("Mismatched input bytes " +
+        assertTrue("Mismatched " + type + " input bytes " +
             specInputBytes[i] + "/" + runInputBytes[i],
-            runInputBytes[i] - specInputBytes[i] <= extraInputBytes);
+            eqPlusMinus(runInputBytes[i], specInputBytes[i], extraInputBytes));
       }
 
       // Check input records
       Arrays.sort(specInputRecords);
       Arrays.sort(runInputRecords);
       for (int i = 0; i < runTasks.length; ++i) {
-        assertTrue("Mismatched input records " +
+        assertTrue("Mismatched " + type + " input records " +
             specInputRecords[i] + "/" + runInputRecords[i],
-            runInputRecords[i] - specInputRecords[i] <= extraInputRecords);
+            eqPlusMinus(runInputRecords[i], specInputRecords[i],
+              extraInputRecords));
       }
 
       // Check output bytes
       Arrays.sort(specOutputBytes);
       Arrays.sort(runOutputBytes);
       for (int i = 0; i < runTasks.length; ++i) {
-        assertTrue("Mismatched output bytes " +
+        assertTrue("Mismatched " + type + " output bytes " +
             specOutputBytes[i] + "/" + runOutputBytes[i],
-            runOutputBytes[i] - specOutputBytes[i] <= extraOutputBytes);
+            eqPlusMinus(runOutputBytes[i], specOutputBytes[i],
+              extraOutputBytes));
       }
 
       // Check output records
       Arrays.sort(specOutputRecords);
       Arrays.sort(runOutputRecords);
       for (int i = 0; i < runTasks.length; ++i) {
-        assertTrue("Mismatched output records " +
+        assertTrue("Mismatched " + type + " output records " +
             specOutputRecords[i] + "/" + runOutputRecords[i],
-            runOutputRecords[i] - specOutputRecords[i] <= extraOutputRecords);
+            eqPlusMinus(runOutputRecords[i], specOutputRecords[i],
+              extraOutputRecords));
       }
 
     }
 
+    private static boolean eqPlusMinus(long a, long b, long x) {
+      final long diff = Math.abs(a - b);
+      return diff <= x;
+    }
+
     @Override
     protected void onSuccess(Job job) {
       retiredJobs.add(job);
@@ -292,6 +310,7 @@
     };
     DebugGridmix client = new DebugGridmix();
     final Configuration conf = mrCluster.createJobConf();
+    //conf.setInt(Gridmix.GRIDMIX_KEY_LEN, 2);
     int res = ToolRunner.run(conf, client, argv);
     assertEquals("Client exited with nonzero status", 0, res);
     client.checkMonitor();

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRecordFactory.java?rev=889778&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRecordFactory.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRecordFactory.java Fri Dec 11 19:32:58 2009
@@ -0,0 +1,79 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.util.Random;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+
+public class TestRecordFactory {
+  private static final Log LOG = LogFactory.getLog(TestRecordFactory.class);
+
+  public static void testFactory(long targetBytes, long targetRecs)
+      throws Exception {
+    final Configuration conf = new Configuration();
+    final GridmixKey key = new GridmixKey();
+    final GridmixRecord val = new GridmixRecord();
+    LOG.info("Target bytes/records: " + targetBytes + "/" + targetRecs);
+    final RecordFactory f = new AvgRecordFactory(targetBytes, targetRecs, conf);
+    targetRecs = targetRecs <= 0 && targetBytes >= 0
+      ? Math.max(1,
+          targetBytes / conf.getInt("gridmix.missing.rec.size", 64 * 1024))
+      : targetRecs;
+
+    long records = 0L;
+    final DataOutputBuffer out = new DataOutputBuffer();
+    while (f.next(key, val)) {
+      ++records;
+      key.write(out);
+      val.write(out);
+    }
+    assertEquals(targetRecs, records);
+    assertEquals(targetBytes, out.getLength());
+  }
+
+  @Test
+  public void testRandom() throws Exception {
+    final Random r = new Random();
+    final long targetBytes = r.nextInt(1 << 20) + 3 * (1 << 14);
+    final long targetRecs = r.nextInt(1 << 14);
+    testFactory(targetBytes, targetRecs);
+  }
+
+  @Test
+  public void testAvg() throws Exception {
+    final Random r = new Random();
+    final long avgsize = r.nextInt(1 << 10) + 1;
+    final long targetRecs = r.nextInt(1 << 14);
+    testFactory(targetRecs * avgsize, targetRecs);
+  }
+
+  @Test
+  public void testZero() throws Exception {
+    final Random r = new Random();
+    final long targetBytes = r.nextInt(1 << 20);
+    testFactory(targetBytes, 0);
+  }
+}

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java?rev=889778&r1=889777&r2=889778&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java Fri Dec 11 19:32:58 2009
@@ -24,7 +24,8 @@
   private final int recsOut;
   private final long maxMemory;
 
-  public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut, int maxMemory) {
+  public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut,
+      long maxMemory) {
     this.bytesIn = bytesIn;
     this.recsIn = recsIn;
     this.bytesOut = bytesOut;

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java?rev=889778&r1=889777&r2=889778&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java Fri Dec 11 19:32:58 2009
@@ -619,38 +619,42 @@
     Values type = loggedTask.getTaskType();
     if ((type != Values.MAP) && (type != Values.REDUCE)) {
       throw new IllegalArgumentException(
-          "getTaskInfo only supports MAP or REDUCE tasks: " + type.toString() +
-          " for task = " + loggedTask.getTaskID());
+          "getTaskInfo only supports MAP or REDUCE tasks: " + type.toString()
+              + " for task = " + loggedTask.getTaskID());
     }
 
     for (LoggedTaskAttempt attempt : attempts) {
       attempt = sanitizeLoggedTaskAttempt(attempt);
       // ignore bad attempts or unsuccessful attempts.
-      if ((attempt == null)
-          || (attempt.getResult() != Values.SUCCESS)) {
+      if ((attempt == null) || (attempt.getResult() != Values.SUCCESS)) {
         continue;
       }
 
       if (type == Values.MAP) {
         inputBytes = attempt.getHdfsBytesRead();
         inputRecords = attempt.getMapInputRecords();
-        outputBytes = attempt.getMapOutputBytes();
+        outputBytes =
+            (job.getTotalReduces() > 0) ? attempt.getMapOutputBytes() : attempt
+                .getHdfsBytesWritten();
         outputRecords = attempt.getMapOutputRecords();
-        heapMegabytes = (job.getJobMapMB() > 0) ? job.getJobMapMB() 
-                                                : job.getHeapMegabytes();
+        heapMegabytes =
+            (job.getJobMapMB() > 0) ? job.getJobMapMB() : job
+                .getHeapMegabytes();
       } else {
         inputBytes = attempt.getReduceShuffleBytes();
         inputRecords = attempt.getReduceInputRecords();
         outputBytes = attempt.getHdfsBytesWritten();
         outputRecords = attempt.getReduceOutputRecords();
-        heapMegabytes = (job.getJobReduceMB() > 0) ? job.getJobReduceMB() 
-                                                   : job.getHeapMegabytes();
+        heapMegabytes =
+            (job.getJobReduceMB() > 0) ? job.getJobReduceMB() : job
+                .getHeapMegabytes();
       }
       break;
     }
 
-    TaskInfo taskInfo = new TaskInfo(inputBytes, (int) inputRecords,
-        outputBytes, (int) outputRecords, (int) heapMegabytes);
+    TaskInfo taskInfo =
+        new TaskInfo(inputBytes, (int) inputRecords, outputBytes,
+            (int) outputRecords, (int) heapMegabytes);
     return taskInfo;
   }
 
@@ -869,8 +873,9 @@
   private LoggedTaskAttempt getLoggedTaskAttempt(TaskType taskType,
       int taskNumber, int taskAttemptNumber) {
     buildMaps();
-    TaskAttemptID id = new TaskAttemptID(getMaskedTaskID(taskType, taskNumber),
-        taskAttemptNumber);
+    TaskAttemptID id =
+        new TaskAttemptID(getMaskedTaskID(taskType, taskNumber),
+            taskAttemptNumber);
     return loggedTaskAttemptMap.get(id);
   }