You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2009/12/11 20:33:02 UTC

svn commit: r889778 [1/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/ src/tools/org/apache/hadoop/tools/rumen/

Author: cdouglas
Date: Fri Dec 11 19:32:58 2009
New Revision: 889778

URL: http://svn.apache.org/viewvc?rev=889778&view=rev
Log:
MAPREDUCE-1124. Fix imprecise byte counts in Gridmix.

Added:
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReadRecordFactory.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RecordFactory.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFilePool.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFileQueue.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRecordFactory.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=889778&r1=889777&r2=889778&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Dec 11 19:32:58 2009
@@ -992,3 +992,4 @@
     MAPREDUCE-879. Fix broken unit test TestTaskTrackerLocalization on MacOS.
     (Sreekanth Ramakrishnan via yhemanth)
 
+    MAPREDUCE-1124. Fix imprecise byte counts in Gridmix. (cdouglas)

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java?rev=889778&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java Fri Dec 11 19:32:58 2009
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Given byte and record targets, emit roughly equal-sized records satisfying
+ * the contract.
+ */
+class AvgRecordFactory extends RecordFactory {
+
+  /**
+   * Percentage of record for key data.
+   */
+  public static final String GRIDMIX_KEY_FRC = "gridmix.key.fraction";
+
+
+  private final long targetBytes;
+  private final long targetRecords;
+  private final long step;
+  private final int avgrec;
+  private final int keyLen;
+  private long accBytes = 0L;
+  private long accRecords = 0L;
+
+  /**
+   * @param targetBytes Expected byte count.
+   * @param targetRecords Expected record count.
+   * @param conf Used to resolve edge cases @see #GRIDMIX_KEY_FRC
+   */
+  public AvgRecordFactory(long targetBytes, long targetRecords,
+      Configuration conf) {
+    this.targetBytes = targetBytes;
+    this.targetRecords = targetRecords <= 0 && this.targetBytes >= 0
+      ? Math.max(1,
+          this.targetBytes / conf.getInt("gridmix.missing.rec.size", 64 * 1024))
+      : targetRecords;
+    final long tmp = this.targetBytes / this.targetRecords;
+    step = this.targetBytes - this.targetRecords * tmp;
+    avgrec = (int) Math.min(Integer.MAX_VALUE, tmp + 1);
+    keyLen = Math.max(1,
+        (int)(tmp * Math.min(1.0f, conf.getFloat(GRIDMIX_KEY_FRC, 0.1f))));
+  }
+
+  @Override
+  public boolean next(GridmixKey key, GridmixRecord val) throws IOException {
+    if (accBytes >= targetBytes) {
+      return false;
+    }
+    final int reclen = accRecords++ >= step ? avgrec - 1 : avgrec;
+    final int len = (int) Math.min(targetBytes - accBytes, reclen);
+    // len != reclen?
+    if (key != null) {
+      key.setSize(keyLen);
+      val.setSize(len - key.getSize());
+    } else {
+      val.setSize(len);
+    }
+    accBytes += len;
+    return true;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return Math.min(1.0f, accBytes / ((float)targetBytes));
+  }
+
+  @Override
+  public void close() throws IOException {
+    // noop
+  }
+
+}

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java?rev=889778&r1=889777&r2=889778&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java Fri Dec 11 19:32:58 2009
@@ -223,7 +223,6 @@
         return getSize();
       }
 
-      // TODO sort, pick rand pairs of kth large/small in dir
       IndexMapper mapping;
       if ((curdir.size() < 200) || ((double) targetSize / getSize() > 0.5)) {
         mapping = new DenseIndexMapper(curdir.size());
@@ -234,13 +233,13 @@
       ArrayList<Integer> selected = new ArrayList<Integer>();
       long ret = 0L;
       int poolSize = curdir.size();
-      while (ret < targetSize) {
+      do {
         int pos = rand.nextInt(poolSize);
         int index = mapping.get(pos);
         selected.add(index);
         ret += curdir.get(index).getLen();
         mapping.swap(pos, --poolSize);
-      }
+      } while (ret < targetSize);
 
       for (Integer i : selected) {
         files.add(curdir.get(i));

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java?rev=889778&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java Fri Dec 11 19:32:58 2009
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+
+/**
+ * Given a {@link org.apache.hadoop.mapreduce.lib.input.CombineFileSplit},
+ * circularly read through each input source.
+ */
+class FileQueue extends InputStream {
+
+  private int idx = -1;
+  private long curlen = -1L;
+  private FSDataInputStream input;
+  private final byte[] z = new byte[1];
+  private final Path[] paths;
+  private final long[] lengths;
+  private final long[] startoffset;
+  private final Configuration conf;
+
+  /**
+   * @param split Description of input sources.
+   * @param conf Used to resolve FileSystem instances.
+   */
+  public FileQueue(CombineFileSplit split, Configuration conf)
+      throws IOException {
+    this.conf = conf;
+    paths = split.getPaths();
+    startoffset = split.getStartOffsets();
+    lengths = split.getLengths();
+    nextSource();
+  }
+
+  protected void nextSource() throws IOException {
+    if (0 == paths.length) {
+      return;
+    }
+    if (input != null) {
+      input.close();
+    }
+    idx = (idx + 1) % paths.length;
+    curlen = lengths[idx];
+    final Path file = paths[idx];
+    final FileSystem fs = file.getFileSystem(conf);
+    input = fs.open(file);
+    input.seek(startoffset[idx]);
+  }
+
+  @Override
+  public int read() throws IOException {
+    final int tmp = read(z);
+    return tmp == -1 ? -1 : (0xFF & z[0]);
+  }
+
+  @Override
+  public int read(byte[] b) throws IOException {
+    return read(b, 0, b.length);
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    int kvread = 0;
+    while (kvread < len) {
+      if (curlen <= 0) {
+        nextSource();
+        continue;
+      }
+      final int srcRead = (int) Math.min(len - kvread, curlen);
+      IOUtils.readFully(input, b, kvread, srcRead);
+      curlen -= srcRead;
+      kvread += srcRead;
+    }
+    return kvread;
+  }
+
+  @Override
+  public void close() throws IOException {
+    input.close();
+  }
+
+}

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java?rev=889778&r1=889777&r2=889778&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java Fri Dec 11 19:32:58 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.List;
@@ -28,7 +29,6 @@
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
@@ -52,10 +52,30 @@
 // TODO can replace with form of GridmixJob
 class GenerateData extends GridmixJob {
 
+  /**
+   * Total bytes to write.
+   */
+  public static final String GRIDMIX_GEN_BYTES = "gridmix.gen.bytes";
+
+  /**
+   * Maximum size per file written.
+   */
+  public static final String GRIDMIX_GEN_CHUNK = "gridmix.gen.bytes.per.file";
+
+  /**
+   * Size of writes to output file.
+   */
+  public static final String GRIDMIX_VAL_BYTES = "gendata.val.bytes";
+
+  /**
+   * Status reporting interval, in megabytes.
+   */
+  public static final String GRIDMIX_GEN_INTERVAL = "gendata.interval.mb";
+
   public GenerateData(Configuration conf, Path outdir, long genbytes)
       throws IOException {
     super(conf, 0L, "GRIDMIX_GENDATA");
-    job.getConfiguration().setLong("gridmix.gendata.bytes", genbytes);
+    job.getConfiguration().setLong(GRIDMIX_GEN_BYTES, genbytes);
     FileOutputFormat.setOutputPath(job, outdir);
   }
 
@@ -84,7 +104,7 @@
     protected void setup(Context context)
         throws IOException, InterruptedException {
       val = new BytesWritable(new byte[
-          context.getConfiguration().getInt("gendata.val.bytes", 1024 * 1024)]);
+          context.getConfiguration().getInt(GRIDMIX_VAL_BYTES, 1024 * 1024)]);
     }
 
     @Override
@@ -106,7 +126,7 @@
       final JobClient client = new JobClient(jobCtxt.getConfiguration());
       ClusterStatus stat = client.getClusterStatus(true);
       final long toGen =
-        jobCtxt.getConfiguration().getLong("gridmix.gendata.bytes", -1);
+        jobCtxt.getConfiguration().getLong(GRIDMIX_GEN_BYTES, -1);
       if (toGen < 0) {
         throw new IOException("Invalid/missing generation bytes: " + toGen);
       }
@@ -144,7 +164,7 @@
             throws IOException, InterruptedException {
           toWrite = split.getLength();
           RINTERVAL = ctxt.getConfiguration().getInt(
-              "gendata.report.interval.mb", 10) << 20;
+              GRIDMIX_GEN_INTERVAL, 10) << 20;
         }
         @Override
         public boolean nextKeyValue() throws IOException {
@@ -219,20 +239,52 @@
     public RecordWriter<NullWritable,BytesWritable> getRecordWriter(
         TaskAttemptContext job) throws IOException {
 
-      Path file = getDefaultWorkFile(job, "");
-      FileSystem fs = file.getFileSystem(job.getConfiguration());
-      final FSDataOutputStream fileOut = fs.create(file, false);
-      return new RecordWriter<NullWritable,BytesWritable>() {
-        @Override
-        public void write(NullWritable key, BytesWritable value)
-            throws IOException {
-          fileOut.write(value.getBytes(), 0, value.getLength());
-        }
-        @Override
-        public void close(TaskAttemptContext ctxt) throws IOException {
+      return new ChunkWriter(getDefaultWorkFile(job, ""),
+          job.getConfiguration());
+    }
+
+    static class ChunkWriter extends RecordWriter<NullWritable,BytesWritable> {
+      private final Path outDir;
+      private final FileSystem fs;
+      private final long maxFileBytes;
+
+      private long accFileBytes = 0L;
+      private long fileIdx = -1L;
+      private OutputStream fileOut = null;
+
+      public ChunkWriter(Path outDir, Configuration conf) throws IOException {
+        this.outDir = outDir;
+        fs = outDir.getFileSystem(conf);
+        maxFileBytes = conf.getLong(GRIDMIX_GEN_CHUNK, 1L << 30);
+        nextDestination();
+      }
+      private void nextDestination() throws IOException {
+        if (fileOut != null) {
           fileOut.close();
         }
-      };
+        fileOut = fs.create(new Path(outDir, "segment-" + (++fileIdx)), false);
+        accFileBytes = 0L;
+      }
+      @Override
+      public void write(NullWritable key, BytesWritable value)
+          throws IOException {
+        int written = 0;
+        final int total = value.getLength();
+        while (written < total) {
+          final int write = (int)
+            Math.min(total - written, maxFileBytes - accFileBytes);
+          fileOut.write(value.getBytes(), written, write);
+          written += write;
+          accFileBytes += write;
+          if (accFileBytes >= maxFileBytes) {
+            nextDestination();
+          }
+        }
+      }
+      @Override
+      public void close(TaskAttemptContext ctxt) throws IOException {
+        fileOut.close();
+      }
     }
   }
 

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=889778&r1=889777&r2=889778&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Fri Dec 11 19:32:58 2009
@@ -26,7 +26,6 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapreduce.Job;
@@ -73,12 +72,11 @@
     "gridmix.client.pending.queue.depth";
 
   /**
-   * Size of key data in synthetic jobs. At present, key length is not
-   * available in job traces. Since all solutions are equally bad, globally
-   * specifying the amount of each record that is key data is the simplest
-   * to implement and the method chosen.
+   * Multiplier to accelerate or decelerate job submission. As a crude means of
+   * sizing a job trace to a cluster, the time separating two jobs is
+   * multiplied by this factor.
    */
-  public static final String GRIDMIX_KEY_LEN = "gridmix.min.key.length";
+  public static final String GRIDMIX_SUB_MUL = "gridmix.submit.multiplier";
 
   // Submit data structures
   private JobFactory factory;
@@ -135,7 +133,7 @@
     submitter = createJobSubmitter(monitor,
         conf.getInt(GRIDMIX_SUB_THR,
           Runtime.getRuntime().availableProcessors() + 1),
-        conf.getInt(GRIDMIX_QUE_DEP, 100),
+        conf.getInt(GRIDMIX_QUE_DEP, 5),
         new FilePool(conf, ioPath));
     factory = createJobFactory(submitter, traceIn, scratchDir, conf, startFlag);
     monitor.start();
@@ -182,12 +180,10 @@
       printUsage(System.err);
       return 1;
     }
-    FileSystem fs = null;
     InputStream trace = null;
     try {
       final Configuration conf = getConf();
       Path scratchDir = new Path(ioPath, conf.get(GRIDMIX_OUT_DIR, "gridmix"));
-      fs = scratchDir.getFileSystem(conf);
       // add shutdown hook for SIGINT, etc.
       Runtime.getRuntime().addShutdownHook(sdh);
       CountDownLatch startFlag = new CountDownLatch(1);
@@ -210,7 +206,7 @@
 
       if (factory != null) {
         // wait for input exhaustion
-        factory.join();
+        factory.join(Long.MAX_VALUE);
         final Throwable badTraceException = factory.error();
         if (null != badTraceException) {
           LOG.error("Error in trace", badTraceException);
@@ -218,10 +214,10 @@
         }
         // wait for pending tasks to be submitted
         submitter.shutdown();
-        submitter.join();
+        submitter.join(Long.MAX_VALUE);
         // wait for running tasks to complete
         monitor.shutdown();
-        monitor.join();
+        monitor.join(Long.MAX_VALUE);
       }
     } finally {
       IOUtils.cleanup(LOG, trace);
@@ -236,13 +232,17 @@
    */
   class Shutdown extends Thread {
 
-    private void killComponent(Component<?> component) {
+    static final long FAC_SLEEP = 1000;
+    static final long SUB_SLEEP = 4000;
+    static final long MON_SLEEP = 15000;
+
+    private void killComponent(Component<?> component, long maxwait) {
       if (component == null) {
         return;
       }
-      component.abort();   // read no more tasks
+      component.abort();
       try {
-        component.join();
+        component.join(maxwait);
       } catch (InterruptedException e) {
         LOG.warn("Interrupted waiting for " + component);
       }
@@ -253,9 +253,9 @@
     public void run() {
       LOG.info("Exiting...");
       try {
-        killComponent(factory);   // read no more tasks
-        killComponent(submitter); // submit no more tasks
-        killComponent(monitor);   // process remaining jobs in this thread
+        killComponent(factory, FAC_SLEEP);   // read no more tasks
+        killComponent(submitter, SUB_SLEEP); // submit no more tasks
+        killComponent(monitor, MON_SLEEP);   // process remaining jobs here
       } finally {
         if (monitor == null) {
           return;
@@ -306,7 +306,8 @@
     out.printf("       %-40s : Output directory\n", GRIDMIX_OUT_DIR);
     out.printf("       %-40s : Submitting threads\n", GRIDMIX_SUB_THR);
     out.printf("       %-40s : Queued job desc\n", GRIDMIX_QUE_DEP);
-    out.printf("       %-40s : Key size\n", GRIDMIX_KEY_LEN);
+    out.printf("       %-40s : Key fraction of rec\n",
+        AvgRecordFactory.GRIDMIX_KEY_FRC);
   }
 
   /**
@@ -331,7 +332,7 @@
      * Wait until the service completes. It is assumed that either a
      * {@link #shutdown} or {@link #abort} has been requested.
      */
-    void join() throws InterruptedException;
+    void join(long millis) throws InterruptedException;
 
     /**
      * Shut down gracefully, finishing all pending work. Reject new requests.

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java?rev=889778&r1=889777&r2=889778&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java Fri Dec 11 19:32:58 2009
@@ -17,22 +17,10 @@
  */
 package org.apache.hadoop.mapred.gridmix;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.nio.IntBuffer;
-import java.nio.LongBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.EnumSet;
 import java.util.Formatter;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map.Entry;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
@@ -40,15 +28,10 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Writable;
@@ -65,7 +48,6 @@
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.tools.rumen.JobStory;
@@ -80,6 +62,7 @@
 class GridmixJob implements Callable<Job>, Delayed {
 
   public static final String JOBNAME = "GRIDMIX";
+  public static final String ORIGNAME = "gridmix.job.name.original";
   public static final Log LOG = LogFactory.getLog(GridmixJob.class);
 
   private static final ThreadLocal<Formatter> nameFormat =
@@ -180,14 +163,17 @@
     job.setReducerClass(GridmixReducer.class);
     job.setNumReduceTasks(jobdesc.getNumberReduces());
     job.setMapOutputKeyClass(GridmixKey.class);
-    job.setMapOutputValueClass(BytesWritable.class);
-    job.setSortComparatorClass(BytesWritable.Comparator.class);
+    job.setMapOutputValueClass(GridmixRecord.class);
+    job.setSortComparatorClass(GridmixKey.Comparator.class);
     job.setGroupingComparatorClass(SpecGroupingComparator.class);
     job.setInputFormatClass(GridmixInputFormat.class);
     job.setOutputFormatClass(RawBytesOutputFormat.class);
     job.setPartitionerClass(DraftPartitioner.class);
     job.setJarByClass(GridmixJob.class);
     job.getConfiguration().setInt("gridmix.job.seq", seq);
+    job.getConfiguration().set(ORIGNAME, null == jobdesc.getJobID()
+        ? "<unknown>" : jobdesc.getJobID().toString());
+    job.getConfiguration().setBoolean(Job.USED_GENERIC_PARSER, true);
     FileInputFormat.addInputPath(job, new Path("ignored"));
     FileOutputFormat.setOutputPath(job, outdir);
     job.submit();
@@ -200,11 +186,10 @@
     }
   }
 
-  /**
-   * Group REDUCE_SPEC records together
-   */
   public static class SpecGroupingComparator
-      implements RawComparator<GridmixKey>, Serializable {
+      implements RawComparator<GridmixKey> {
+    private final DataInputBuffer di = new DataInputBuffer();
+    private final byte[] reset = di.getData();
     @Override
     public int compare(GridmixKey g1, GridmixKey g2) {
       final byte t1 = g1.getType();
@@ -215,284 +200,128 @@
       }
       assert t1 == GridmixKey.DATA;
       assert t2 == GridmixKey.DATA;
-      return WritableComparator.compareBytes(
-          g1.getBytes(), 0, g1.getLength(),
-          g2.getBytes(), 0, g2.getLength());
+      return g1.compareTo(g2);
     }
     @Override
-    public int compare(byte[] b1, int s1, int l1,
-                       byte[] b2, int s2, int l2) {
-      final byte t1 = b1[s1 + 4];
-      final byte t2 = b2[s2 + 4];
-      if (t1 == GridmixKey.REDUCE_SPEC ||
-          t2 == GridmixKey.REDUCE_SPEC) {
-        return t1 - t2;
-      }
-      assert t1 == GridmixKey.DATA;
-      assert t2 == GridmixKey.DATA;
-      return WritableComparator.compareBytes(
-          b1, s1 + 4, l1 - 4,
-          b2, s2 + 4, l2 - 4);
-    }
-  }
-
-  /**
-   * Keytype for synthetic jobs, some embedding instructions for the reduce.
-   */
-  public static class GridmixKey extends BytesWritable {
-    // long fields specifying reduce contract
-    private enum RSpec { REC_IN, REC_OUT, BYTES_OUT };
-    private static final int SPEC_START = 5; // type + partition len
-    private static final int NUMFIELDS = RSpec.values().length;
-    private static final int SPEC_SIZE = NUMFIELDS * 8;
-
-    // Key types
-    static final byte REDUCE_SPEC = 0;
-    static final byte DATA = 1;
-
-    private IntBuffer partition;
-    private LongBuffer spec;
-
-    public GridmixKey() {
-      super(new byte[SPEC_START]);
-    }
-
-    public GridmixKey(byte type, byte[] b) {
-      super(b);
-      setType(type);
-    }
-
-    public byte getType() {
-      return getBytes()[0];
-    }
-    public void setPartition(int partition) {
-      this.partition.put(0, partition);
-    }
-    public int getPartition() {
-      return partition.get(0);
-    }
-    public long getReduceInputRecords() {
-      checkState(REDUCE_SPEC);
-      return spec.get(RSpec.REC_IN.ordinal());
-    }
-    public long getReduceOutputBytes() {
-      checkState(REDUCE_SPEC);
-      return spec.get(RSpec.BYTES_OUT.ordinal());
-    }
-    public long getReduceOutputRecords() {
-      checkState(REDUCE_SPEC);
-      return spec.get(RSpec.REC_OUT.ordinal());
-    }
-    public void setType(byte b) {
-      switch (b) {
-        case REDUCE_SPEC:
-          if (getCapacity() < SPEC_START + SPEC_SIZE) {
-            setSize(SPEC_START + SPEC_SIZE);
-          }
-          spec =
-            ByteBuffer.wrap(getBytes(), SPEC_START, SPEC_SIZE).asLongBuffer();
-          break;
-        case DATA:
-          if (getCapacity() < SPEC_START) {
-            setSize(SPEC_START);
-          }
-          spec = null;
-          break;
-        default:
-          throw new IllegalArgumentException("Illegal type " + b);
-      }
-      getBytes()[0] = b;
-      partition =
-        ByteBuffer.wrap(getBytes(), 1, SPEC_START - 1).asIntBuffer();
-    }
-    public void setReduceInputRecords(long records) {
-      checkState(REDUCE_SPEC);
-      spec.put(RSpec.REC_IN.ordinal(), records);
-    }
-    public void setReduceOutputBytes(long bytes) {
-      checkState(REDUCE_SPEC);
-      spec.put(RSpec.BYTES_OUT.ordinal(), bytes);
-    }
-    public void setReduceOutputRecords(long records) {
-      checkState(REDUCE_SPEC);
-      spec.put(RSpec.REC_OUT.ordinal(), records);
-    }
-    private void checkState(byte b) {
-      if (getLength() < SPEC_START || getType() != b) {
-        throw new IllegalStateException("Expected " + b + ", was " + getType());
-      }
-    }
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      super.readFields(in);
-      if (getLength() < SPEC_START) {
-        throw new IOException("Invalid GridmixKey, len " + getLength());
-      }
-      partition =
-        ByteBuffer.wrap(getBytes(), 1, SPEC_START - 1).asIntBuffer();
-      spec = getType() == REDUCE_SPEC
-        ? ByteBuffer.wrap(getBytes(), SPEC_START, SPEC_SIZE).asLongBuffer()
-        : null;
-    }
-    @Override
-    public void write(DataOutput out) throws IOException {
-      super.write(out);
-      if (getType() == REDUCE_SPEC) {
-        LOG.debug("SPEC(" + getPartition() + ") " + getReduceInputRecords() +
-            " -> " + getReduceOutputRecords() + "/" + getReduceOutputBytes());
-      }
-    }
-    @Override
-    public boolean equals(Object other) {
-      if (other instanceof GridmixKey) {
-        return super.equals(other);
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      try {
+        final int ret;
+        di.reset(b1, s1, l1);
+        final int x1 = WritableUtils.readVInt(di);
+        di.reset(b2, s2, l2);
+        final int x2 = WritableUtils.readVInt(di);
+        final int t1 = b1[s1 + x1];
+        final int t2 = b2[s2 + x2];
+        if (t1 == GridmixKey.REDUCE_SPEC ||
+            t2 == GridmixKey.REDUCE_SPEC) {
+          ret = t1 - t2;
+        } else {
+          assert t1 == GridmixKey.DATA;
+          assert t2 == GridmixKey.DATA;
+          ret =
+            WritableComparator.compareBytes(b1, s1, x1, b2, s2, x2);
+        }
+        di.reset(reset, 0, 0);
+        return ret;
+      } catch (IOException e) {
+        throw new RuntimeException(e);
       }
-      return false;
-    }
-
-    @Override
-    public int hashCode() {
-      return super.hashCode();
     }
   }
 
   public static class GridmixMapper
-      extends Mapper<IntWritable,BytesWritable,GridmixKey,BytesWritable> {
-
-    private final Random r = new Random();
-    private GridmixKey key;
-    private final BytesWritable val = new BytesWritable();
+      extends Mapper<NullWritable,GridmixRecord,GridmixKey,GridmixRecord> {
 
-    private int keyLen;
     private double acc;
     private double ratio;
-    private int[] reduceRecordSize;
-    private long[] reduceRecordCount;
-    private long[] reduceRecordRemaining;
+    private final ArrayList<RecordFactory> reduces =
+      new ArrayList<RecordFactory>();
+    private final Random r = new Random();
+
+    private final GridmixKey key = new GridmixKey();
+    private final GridmixRecord val = new GridmixRecord();
 
     @Override
-    protected void setup(Context context)
+    protected void setup(Context ctxt)
         throws IOException, InterruptedException {
-      // TODO clearly job-specific, but no data at present
-      keyLen = context.getConfiguration().getInt(Gridmix.GRIDMIX_KEY_LEN, 20);
-      key = new GridmixKey(GridmixKey.DATA, new byte[keyLen]);
-      final GridmixSplit split = (GridmixSplit) context.getInputSplit();
-      LOG.info("ID: " + split.getId());
-      reduceRecordCount = split.getOutputRecords();
-      reduceRecordRemaining =
-        Arrays.copyOf(reduceRecordCount, reduceRecordCount.length);
-      reduceRecordSize = new int[reduceRecordCount.length];
-      int valsize = -1;
+      final Configuration conf = ctxt.getConfiguration();
+      final GridmixSplit split = (GridmixSplit) ctxt.getInputSplit();
+      final int maps = split.getMapCount();
       final long[] reduceBytes = split.getOutputBytes();
+      final long[] reduceRecords = split.getOutputRecords();
+
       long totalRecords = 0L;
-      for (int i = 0; i < reduceBytes.length; ++i) {
-        reduceRecordSize[i] = Math.max(0,
-          Math.round(reduceBytes[i] / (1.0f * reduceRecordCount[i])) - keyLen);
-        valsize = Math.max(reduceRecordSize[i], valsize);
-        totalRecords += reduceRecordCount[i];
-      }
-      valsize = Math.max(0, valsize - 4); // BW len encoding
-      val.setCapacity(valsize);
-      val.setSize(valsize);
-      ratio = totalRecords / (1.0 * split.getInputRecords());
+      final int nReduces = ctxt.getNumReduceTasks();
+      if (nReduces > 0) {
+        int idx = 0;
+        int id = split.getId();
+        for (int i = 0; i < nReduces; ++i) {
+          final GridmixKey.Spec spec = new GridmixKey.Spec();
+          if (i == id) {
+            spec.bytes_out = split.getReduceBytes(idx);
+            spec.rec_out = split.getReduceRecords(idx);
+            ++idx;
+            id += maps;
+          }
+          reduces.add(new IntermediateRecordFactory(
+              new AvgRecordFactory(reduceBytes[i], reduceRecords[i], conf),
+              i, reduceRecords[i], spec, conf));
+          totalRecords += reduceRecords[i];
+        }
+      } else {
+        reduces.add(new AvgRecordFactory(reduceBytes[0], reduceRecords[0],
+              conf));
+        totalRecords = reduceRecords[0];
+      }
+      final long splitRecords = split.getInputRecords();
+      final long inputRecords = splitRecords <= 0 && split.getLength() >= 0
+        ? Math.max(1,
+          split.getLength() / conf.getInt("gridmix.missing.rec.size", 64*1024))
+        : splitRecords;
+      ratio = totalRecords / (1.0 * inputRecords);
       acc = 0.0;
     }
 
-    protected void fillBytes(BytesWritable val, int len) {
-      r.nextBytes(val.getBytes());
-      val.setSize(len);
-    }
-
-    /** Find next non-empty partition after start. */
-    private int getNextPart(final int start) {
-      int p = start;
-      do {
-        p = (p + 1) % reduceRecordSize.length;
-      } while (0 == reduceRecordRemaining[p] && p != start);
-      return 0 == reduceRecordRemaining[p] ? -1 : p;
-    }
-
     @Override
-    public void map(IntWritable ignored, BytesWritable bytes,
+    public void map(NullWritable ignored, GridmixRecord rec,
         Context context) throws IOException, InterruptedException {
-      int p = getNextPart(r.nextInt(reduceRecordSize.length));
-      if (-1 == p) {
-        return;
-      }
       acc += ratio;
-      while (acc >= 1.0) {
-        fillBytes(key, key.getLength());
-        key.setType(GridmixKey.DATA);
-        key.setPartition(p);
-        --reduceRecordRemaining[p];
-        fillBytes(val, reduceRecordSize[p]);
+      while (acc >= 1.0 && !reduces.isEmpty()) {
+        key.setSeed(r.nextLong());
+        val.setSeed(r.nextLong());
+        final int idx = r.nextInt(reduces.size());
+        final RecordFactory f = reduces.get(idx);
+        if (!f.next(key, val)) {
+          reduces.remove(idx);
+          continue;
+        }
         context.write(key, val);
         acc -= 1.0;
-        if (0 == reduceRecordRemaining[p] && -1 == (p = getNextPart(p))) {
-          return;
-        }
       }
     }
 
     @Override
     public void cleanup(Context context)
         throws IOException, InterruptedException {
-      // output any remaining records
-      // TODO include reduce spec in remaining records if avail
-      //      (i.e. move this to map)
-      for (int i = 0; i < reduceRecordSize.length; ++i) {
-        for (long j = reduceRecordRemaining[i]; j > 0; --j) {
-          fillBytes(key, key.getLength());
-          key.setType(GridmixKey.DATA);
-          key.setPartition(i);
-          fillBytes(val, reduceRecordSize[i]);
+      for (RecordFactory factory : reduces) {
+        key.setSeed(r.nextLong());
+        while (factory.next(key, val)) {
           context.write(key, val);
+          key.setSeed(r.nextLong());
         }
       }
-      val.setSize(0);
-      key.setType(GridmixKey.REDUCE_SPEC);
-      final int reduces = context.getNumReduceTasks();
-      final GridmixSplit split = (GridmixSplit) context.getInputSplit();
-      final int maps = split.getMapCount();
-      int idx = 0;
-      int id = split.getId();
-      for (int i = 0; i < reduces; ++i) {
-        key.setPartition(i);
-        key.setReduceInputRecords(reduceRecordCount[i]);
-        // Write spec for all red st r_id % id == 0
-        if (i == id) {
-          key.setReduceOutputBytes(split.getReduceBytes(idx));
-          key.setReduceOutputRecords(split.getReduceRecords(idx));
-          LOG.debug(String.format("SPEC'D %d / %d to %d",
-                split.getReduceRecords(idx), split.getReduceBytes(idx), i));
-          ++idx;
-          id += maps;
-        } else {
-          key.setReduceOutputBytes(0);
-          key.setReduceOutputRecords(0);
-        }
-        context.write(key, val);
-      }
     }
   }
 
   public static class GridmixReducer
-      extends Reducer<GridmixKey,BytesWritable,NullWritable,BytesWritable> {
+      extends Reducer<GridmixKey,GridmixRecord,NullWritable,GridmixRecord> {
 
     private final Random r = new Random();
-    private final BytesWritable val = new BytesWritable();
+    private final GridmixRecord val = new GridmixRecord();
 
     private double acc;
     private double ratio;
-    private long written;
-    private long inRecords = 0L;
-    private long outBytes = 0L;
-    private long outRecords = 0L;
-
-    protected void fillBytes(BytesWritable val, int len) {
-      r.nextBytes(val.getBytes());
-      val.setSize(len);
-    }
+    private RecordFactory factory;
 
     @Override
     protected void setup(Context context)
@@ -501,62 +330,52 @@
            context.getCurrentKey().getType() != GridmixKey.REDUCE_SPEC) {
         throw new IOException("Missing reduce spec");
       }
-      for (BytesWritable ignored : context.getValues()) {
+      long outBytes = 0L;
+      long outRecords = 0L;
+      long inRecords = 0L;
+      for (GridmixRecord ignored : context.getValues()) {
         final GridmixKey spec = context.getCurrentKey();
         inRecords += spec.getReduceInputRecords();
-        LOG.debug("GOT COUNT " + spec.getReduceInputRecords());
         outBytes += spec.getReduceOutputBytes();
         outRecords += spec.getReduceOutputRecords();
       }
-      LOG.debug("GOT SPEC " + outRecords + "/" + outBytes);
-      val.setCapacity(Math.round(outBytes / (1.0f * outRecords)));
+      if (0 == outRecords && inRecords > 0) {
+        LOG.info("Spec output bytes w/o records. Using input record count");
+        outRecords = inRecords;
+      }
+      factory =
+        new AvgRecordFactory(outBytes, outRecords, context.getConfiguration());
       ratio = outRecords / (1.0 * inRecords);
       acc = 0.0;
-      LOG.debug(String.format("RECV %d -> %10d/%10d %d %f", inRecords,
-            outRecords, outBytes, val.getCapacity(), ratio));
     }
     @Override
-    protected void reduce(GridmixKey key, Iterable<BytesWritable> values,
+    protected void reduce(GridmixKey key, Iterable<GridmixRecord> values,
         Context context) throws IOException, InterruptedException {
-      for (BytesWritable ignored : values) {
+      for (GridmixRecord ignored : values) {
         acc += ratio;
-        while (acc >= 1.0 && written < outBytes) {
-          final int len = (int) Math.min(outBytes - written, val.getCapacity());
-          fillBytes(val, len);
+        while (acc >= 1.0 && factory.next(null, val)) {
           context.write(NullWritable.get(), val);
           acc -= 1.0;
-          written += len;
-          LOG.debug(String.format("%f %d/%d", acc, written, outBytes));
         }
       }
     }
     @Override
     protected void cleanup(Context context)
         throws IOException, InterruptedException {
-      while (written < outBytes) {
-        final int len = (int) Math.min(outBytes - written, val.getCapacity());
-        fillBytes(val, len);
+      val.setSeed(r.nextLong());
+      while (factory.next(null, val)) {
         context.write(NullWritable.get(), val);
-        written += len;
+        val.setSeed(r.nextLong());
       }
     }
   }
 
   static class GridmixRecordReader
-      extends RecordReader<IntWritable,BytesWritable> {
+      extends RecordReader<NullWritable,GridmixRecord> {
 
-    private long bytesRead = 0;
-    private long bytesTotal;
-    private Configuration conf;
-    private final IntWritable key = new IntWritable();
-    private final BytesWritable inBytes = new BytesWritable();
-
-    private FSDataInputStream input;
-    private int idx = -1;
-    private int capacity;
-    private Path[] paths;
-    private long[] startoffset;
-    private long[] lengths;
+    private RecordFactory factory;
+    private final Random r = new Random();
+    private final GridmixRecord val = new GridmixRecord();
 
     public GridmixRecordReader() { }
 
@@ -564,178 +383,36 @@
     public void initialize(InputSplit genericSplit, TaskAttemptContext ctxt)
             throws IOException, InterruptedException {
       final GridmixSplit split = (GridmixSplit)genericSplit;
-      this.conf = ctxt.getConfiguration();
-      paths = split.getPaths();
-      startoffset = split.getStartOffsets();
-      lengths = split.getLengths();
-      bytesTotal = split.getLength();
-      capacity = (int) Math.round(bytesTotal / (1.0 * split.getInputRecords()));
-      inBytes.setCapacity(capacity);
-      nextSource();
-    }
-    private void nextSource() throws IOException {
-      idx = (idx + 1) % paths.length;
-      final Path file = paths[idx];
-      final FileSystem fs = file.getFileSystem(conf);
-      input = fs.open(file, capacity);
-      input.seek(startoffset[idx]);
+      final Configuration conf = ctxt.getConfiguration();
+      factory = new ReadRecordFactory(split.getLength(),
+          split.getInputRecords(), new FileQueue(split, conf), conf);
     }
+
     @Override
     public boolean nextKeyValue() throws IOException {
-      if (bytesRead >= bytesTotal) {
-        return false;
-      }
-      final int len = (int)
-        Math.min(bytesTotal - bytesRead, inBytes.getCapacity());
-      int kvread = 0;
-      while (kvread < len) {
-        assert lengths[idx] >= 0;
-        if (lengths[idx] <= 0) {
-          nextSource();
-          continue;
-        }
-        final int srcRead = (int) Math.min(len - kvread, lengths[idx]);
-        IOUtils.readFully(input, inBytes.getBytes(), kvread, srcRead);
-        //LOG.trace("Read " + srcRead + " bytes from " + paths[idx]);
-        lengths[idx] -= srcRead;
-        kvread += srcRead;
-      }
-      bytesRead += kvread;
-      return true;
+      val.setSeed(r.nextLong());
+      return factory.next(null, val);
     }
     @Override
     public float getProgress() throws IOException {
-      return bytesRead / ((float)bytesTotal);
+      return factory.getProgress();
     }
     @Override
-    public IntWritable getCurrentKey() { return key; }
+    public NullWritable getCurrentKey() {
+      return NullWritable.get();
+    }
     @Override
-    public BytesWritable getCurrentValue() { return inBytes; }
+    public GridmixRecord getCurrentValue() {
+      return val;
+    }
     @Override
     public void close() throws IOException {
-      IOUtils.cleanup(null, input);
-    }
-  }
-
-  static class GridmixSplit extends CombineFileSplit {
-    private int id;
-    private int nSpec;
-    private int maps;
-    private int reduces;
-    private long inputRecords;
-    private long outputBytes;
-    private long outputRecords;
-    private long maxMemory;
-    private double[] reduceBytes = new double[0];
-    private double[] reduceRecords = new double[0];
-
-    // Spec for reduces id mod this
-    private long[] reduceOutputBytes = new long[0];
-    private long[] reduceOutputRecords = new long[0];
-
-    GridmixSplit() {
-      super();
-    }
-
-    public GridmixSplit(CombineFileSplit cfsplit, int maps, int id,
-        long inputBytes, long inputRecords, long outputBytes,
-        long outputRecords, double[] reduceBytes, double[] reduceRecords,
-        long[] reduceOutputBytes, long[] reduceOutputRecords)
-        throws IOException {
-      super(cfsplit);
-      this.id = id;
-      this.maps = maps;
-      reduces = reduceBytes.length;
-      this.inputRecords = inputRecords;
-      this.outputBytes = outputBytes;
-      this.outputRecords = outputRecords;
-      this.reduceBytes = Arrays.copyOf(reduceBytes, reduces);
-      this.reduceRecords = Arrays.copyOf(reduceRecords, reduces);
-      nSpec = reduceOutputBytes.length;
-      this.reduceOutputBytes = reduceOutputBytes;
-      this.reduceOutputRecords = reduceOutputRecords;
-    }
-    public int getId() {
-      return id;
-    }
-    public int getMapCount() {
-      return maps;
-    }
-    public long getInputRecords() {
-      return inputRecords;
-    }
-    public long[] getOutputBytes() {
-      final long[] ret = new long[reduces];
-      for (int i = 0; i < reduces; ++i) {
-        ret[i] = Math.round(outputBytes * reduceBytes[i]);
-      }
-      return ret;
-    }
-    public long[] getOutputRecords() {
-      final long[] ret = new long[reduces];
-      for (int i = 0; i < reduces; ++i) {
-        ret[i] = Math.round(outputRecords * reduceRecords[i]);
-      }
-      return ret;
-    }
-    public long getReduceBytes(int i) {
-      return reduceOutputBytes[i];
-    }
-    public long getReduceRecords(int i) {
-      return reduceOutputRecords[i];
-    }
-    @Override
-    public void write(DataOutput out) throws IOException {
-      super.write(out);
-      WritableUtils.writeVInt(out, id);
-      WritableUtils.writeVInt(out, maps);
-      WritableUtils.writeVLong(out, inputRecords);
-      WritableUtils.writeVLong(out, outputBytes);
-      WritableUtils.writeVLong(out, outputRecords);
-      WritableUtils.writeVLong(out, maxMemory);
-      WritableUtils.writeVInt(out, reduces);
-      for (int i = 0; i < reduces; ++i) {
-        out.writeDouble(reduceBytes[i]);
-        out.writeDouble(reduceRecords[i]);
-      }
-      WritableUtils.writeVInt(out, nSpec);
-      for (int i = 0; i < nSpec; ++i) {
-        out.writeLong(reduceOutputBytes[i]);
-        out.writeLong(reduceOutputRecords[i]);
-      }
-    }
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      super.readFields(in);
-      id = WritableUtils.readVInt(in);
-      maps = WritableUtils.readVInt(in);
-      inputRecords = WritableUtils.readVLong(in);
-      outputBytes = WritableUtils.readVLong(in);
-      outputRecords = WritableUtils.readVLong(in);
-      maxMemory = WritableUtils.readVLong(in);
-      reduces = WritableUtils.readVInt(in);
-      if (reduceBytes.length < reduces) {
-        reduceBytes = new double[reduces];
-        reduceRecords = new double[reduces];
-      }
-      for (int i = 0; i < reduces; ++i) {
-        reduceBytes[i] = in.readDouble();
-        reduceRecords[i] = in.readDouble();
-      }
-      nSpec = WritableUtils.readVInt(in);
-      if (reduceOutputBytes.length < nSpec) {
-        reduceOutputBytes = new long[nSpec];
-        reduceOutputRecords = new long[nSpec];
-      }
-      for (int i = 0; i < nSpec; ++i) {
-        reduceOutputBytes[i] = in.readLong();
-        reduceOutputRecords[i] = in.readLong();
-      }
+      factory.close();
     }
   }
 
   static class GridmixInputFormat
-      extends InputFormat<IntWritable,BytesWritable> {
+      extends InputFormat<NullWritable,GridmixRecord> {
 
     @Override
     public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
@@ -743,29 +420,28 @@
             "gridmix.job.seq", -1));
     }
     @Override
-    public RecordReader<IntWritable,BytesWritable> createRecordReader(
+    public RecordReader<NullWritable,GridmixRecord> createRecordReader(
         InputSplit split, final TaskAttemptContext taskContext)
         throws IOException {
       return new GridmixRecordReader();
     }
   }
 
-  static class RawBytesOutputFormat
-      extends FileOutputFormat<NullWritable,BytesWritable> {
+  static class RawBytesOutputFormat<K>
+      extends FileOutputFormat<K,GridmixRecord> {
 
     @Override
-    public RecordWriter<NullWritable,BytesWritable> getRecordWriter(
+    public RecordWriter<K,GridmixRecord> getRecordWriter(
         TaskAttemptContext job) throws IOException {
 
       Path file = getDefaultWorkFile(job, "");
       FileSystem fs = file.getFileSystem(job.getConfiguration());
       final FSDataOutputStream fileOut = fs.create(file, false);
-      return new RecordWriter<NullWritable,BytesWritable>() {
+      return new RecordWriter<K,GridmixRecord>() {
         @Override
-        public void write(NullWritable key, BytesWritable value)
+        public void write(K ignored, GridmixRecord value)
             throws IOException {
-          //LOG.trace("WROTE " + value.getLength() + " bytes");
-          fileOut.write(value.getBytes(), 0, value.getLength());
+          value.writeRandom(fileOut, value.getSize());
         }
         @Override
         public void close(TaskAttemptContext ctxt) throws IOException {
@@ -829,8 +505,10 @@
           jobdesc.getTaskInfo(TaskType.REDUCE, i + j * maps);
         specBytes[j] = info.getOutputBytes();
         specRecords[j] = info.getOutputRecords();
-        LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i,
-            i + j * maps, info.getOutputRecords(), info.getOutputBytes()));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i,
+              i + j * maps, info.getOutputRecords(), info.getOutputBytes()));
+        }
       }
       final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);
       splits.add(new GridmixSplit(striper.splitFor(inputDir,
@@ -842,77 +520,4 @@
     pushDescription(id(), splits);
   }
 
-  static class InputStriper {
-    int idx;
-    long currentStart;
-    FileStatus current;
-    final List<FileStatus> files = new ArrayList<FileStatus>();
-
-    InputStriper(FilePool inputDir, long mapBytes)
-        throws IOException {
-      final long inputBytes = inputDir.getInputFiles(mapBytes, files);
-      if (mapBytes > inputBytes) {
-        LOG.warn("Using " + inputBytes + "/" + mapBytes + " bytes");
-      }
-      current = files.get(0);
-    }
-
-    CombineFileSplit splitFor(FilePool inputDir, long bytes, int nLocs)
-        throws IOException {
-      final ArrayList<Path> paths = new ArrayList<Path>();
-      final ArrayList<Long> start = new ArrayList<Long>();
-      final ArrayList<Long> length = new ArrayList<Long>();
-      final HashMap<String,Double> sb = new HashMap<String,Double>();
-      while (bytes > 0) {
-        paths.add(current.getPath());
-        start.add(currentStart);
-        final long fromFile = Math.min(bytes, current.getLen() - currentStart);
-        length.add(fromFile);
-        for (BlockLocation loc :
-            inputDir.locationsFor(current, currentStart, fromFile)) {
-          final double tedium = loc.getLength() / (1.0 * bytes);
-          for (String l : loc.getHosts()) {
-            Double j = sb.get(l);
-            if (null == j) {
-              sb.put(l, tedium);
-            } else {
-              sb.put(l, j.doubleValue() + tedium);
-            }
-          }
-        }
-        currentStart += fromFile;
-        bytes -= fromFile;
-        if (current.getLen() - currentStart == 0) {
-          current = files.get(++idx % files.size());
-          currentStart = 0;
-        }
-      }
-      final ArrayList<Entry<String,Double>> sort =
-        new ArrayList<Entry<String,Double>>(sb.entrySet());
-      Collections.sort(sort, hostRank);
-      final String[] hosts = new String[Math.min(nLocs, sort.size())];
-      for (int i = 0; i < nLocs && i < sort.size(); ++i) {
-        hosts[i] = sort.get(i).getKey();
-      }
-      return new CombineFileSplit(paths.toArray(new Path[0]),
-          toLongArray(start), toLongArray(length), hosts);
-    }
-
-    private long[] toLongArray(final ArrayList<Long> sigh) {
-      final long[] ret = new long[sigh.size()];
-      for (int i = 0; i < ret.length; ++i) {
-        ret[i] = sigh.get(i);
-      }
-      return ret;
-    }
-
-    final Comparator<Entry<String,Double>> hostRank =
-      new Comparator<Entry<String,Double>>() {
-        public int compare(Entry<String,Double> a, Entry<String,Double> b) {
-            final double va = a.getValue();
-            final double vb = b.getValue();
-            return va > vb ? -1 : va < vb ? 1 : 0;
-          }
-      };
-  }
 }

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java?rev=889778&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java Fri Dec 11 19:32:58 2009
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.WritableComparator;
+
+class GridmixKey extends GridmixRecord {
+  static final byte REDUCE_SPEC = 0;
+  static final byte DATA = 1;
+
+  static final int META_BYTES = 1;
+
+  private byte type;
+  private int partition; // NOT serialized
+  private Spec spec = new Spec();
+
+  GridmixKey() {
+    this(DATA, 1, 0L);
+  }
+  GridmixKey(byte type, int size, long seed) {
+    super(size, seed);
+    this.type = type;
+    // setting type may change pcnt random bytes
+    setSize(size);
+  }
+
+  @Override
+  public int getSize() {
+    switch (type) {
+      case REDUCE_SPEC:
+        return super.getSize() + spec.getSize() + META_BYTES;
+      case DATA:
+        return super.getSize() + META_BYTES;
+      default:
+        throw new IllegalStateException("Invalid type: " + type);
+    }
+  }
+
+  @Override
+  public void setSize(int size) {
+    switch (type) {
+      case REDUCE_SPEC:
+        super.setSize(size - (META_BYTES + spec.getSize()));
+        break;
+      case DATA:
+        super.setSize(size - META_BYTES);
+        break;
+      default:
+        throw new IllegalStateException("Invalid type: " + type);
+    }
+  }
+
+  /**
+   * Partition is not serialized.
+   */
+  public int getPartition() {
+    return partition;
+  }
+  public void setPartition(int partition) {
+    this.partition = partition;
+  }
+
+  public long getReduceInputRecords() {
+    assert REDUCE_SPEC == getType();
+    return spec.rec_in;
+  }
+  public void setReduceInputRecords(long rec_in) {
+    assert REDUCE_SPEC == getType();
+    final int origSize = getSize();
+    spec.rec_in = rec_in;
+    setSize(origSize);
+  }
+
+  public long getReduceOutputRecords() {
+    assert REDUCE_SPEC == getType();
+    return spec.rec_out;
+  }
+  public void setReduceOutputRecords(long rec_out) {
+    assert REDUCE_SPEC == getType();
+    final int origSize = getSize();
+    spec.rec_out = rec_out;
+    setSize(origSize);
+  }
+
+  public long getReduceOutputBytes() {
+    assert REDUCE_SPEC == getType();
+    return spec.bytes_out;
+  };
+  public void setReduceOutputBytes(long b_out) {
+    assert REDUCE_SPEC == getType();
+    final int origSize = getSize();
+    spec.bytes_out = b_out;
+    setSize(origSize);
+  }
+
+  public byte getType() {
+    return type;
+  }
+  public void setType(byte type) throws IOException {
+    final int origSize = getSize();
+    switch (type) {
+      case REDUCE_SPEC:
+      case DATA:
+        this.type = type;
+        break;
+      default:
+        throw new IOException("Invalid type: " + type);
+    }
+    setSize(origSize);
+  }
+
+  public void setSpec(Spec spec) {
+    assert REDUCE_SPEC == getType();
+    final int origSize = getSize();
+    this.spec.set(spec);
+    setSize(origSize);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    setType(in.readByte());
+    if (REDUCE_SPEC == getType()) {
+      spec.readFields(in);
+    }
+  }
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    final byte t = getType();
+    out.writeByte(t);
+    if (REDUCE_SPEC == t) {
+      spec.write(out);
+    }
+  }
+  int fixedBytes() {
+    return super.fixedBytes() +
+      (REDUCE_SPEC == getType() ? spec.getSize() : 0) + META_BYTES;
+  }
+  @Override
+  public int compareTo(GridmixRecord other) {
+    final GridmixKey o = (GridmixKey) other;
+    final byte t1 = getType();
+    final byte t2 = o.getType();
+    if (t1 != t2) {
+      return t1 - t2;
+    }
+    return super.compareTo(other);
+  }
+
+  /**
+   * Note that while the spec is not explicitly included, changing the spec
+   * may change its size, which will affect equality.
+   */
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    }
+    if (other != null && other.getClass() == getClass()) {
+      final GridmixKey o = ((GridmixKey)other);
+      return getType() == o.getType() && super.equals(o);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return super.hashCode() ^ getType();
+  }
+
+  public static class Spec implements Writable {
+    long rec_in;
+    long rec_out;
+    long bytes_out;
+    public Spec() { }
+
+    public void set(Spec other) {
+      rec_in = other.rec_in;
+      bytes_out = other.bytes_out;
+      rec_out = other.rec_out;
+    }
+
+    public int getSize() {
+      return WritableUtils.getVIntSize(rec_in) +
+             WritableUtils.getVIntSize(rec_out) +
+             WritableUtils.getVIntSize(bytes_out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      rec_in = WritableUtils.readVLong(in);
+      rec_out = WritableUtils.readVLong(in);
+      bytes_out = WritableUtils.readVLong(in);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      WritableUtils.writeVLong(out, rec_in);
+      WritableUtils.writeVLong(out, rec_out);
+      WritableUtils.writeVLong(out, bytes_out);
+    }
+  }
+
+  public static class Comparator extends GridmixRecord.Comparator {
+
+    private final DataInputBuffer di = new DataInputBuffer();
+    private final byte[] reset = di.getData();
+
+    public Comparator() {
+      super(GridmixKey.class);
+    }
+
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      try {
+        di.reset(b1, s1, l1);
+        final int x1 = WritableUtils.readVInt(di);
+        di.reset(b2, s2, l2);
+        final int x2 = WritableUtils.readVInt(di);
+        final int ret = (b1[s1 + x1] != b2[s2 + x2])
+          ? b1[s1 + x1] - b2[s2 + x2]
+          : super.compare(b1, s1, x1, b2, s2, x2);
+        di.reset(reset, 0, 0);
+        return ret;
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    static {
+      WritableComparator.define(GridmixKey.class, new Comparator());
+    }
+  }
+}
+

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java?rev=889778&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java Fri Dec 11 19:32:58 2009
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+
+class GridmixRecord implements WritableComparable<GridmixRecord> {
+
+  private static final int FIXED_BYTES = 1;
+  private int size = -1;
+  private long seed;
+  private final DataInputBuffer dib =
+    new DataInputBuffer();
+  private final DataOutputBuffer dob =
+    new DataOutputBuffer(Long.SIZE / Byte.SIZE);
+  private byte[] literal = dob.getData();
+
+  GridmixRecord() {
+    this(1, 0L);
+  }
+
+  GridmixRecord(int size, long seed) {
+    this.seed = seed;
+    setSizeInternal(size);
+  }
+
+  public int getSize() {
+    return size;
+  }
+
+  public void setSize(int size) {
+    setSizeInternal(size);
+  }
+
+  private void setSizeInternal(int size) {
+    this.size = Math.max(1, size);
+    try {
+      seed = maskSeed(seed, this.size);
+      dob.reset();
+      dob.writeLong(seed);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public final void setSeed(long seed) {
+    this.seed = seed;
+  }
+
+  /** Marsaglia, 2003. */
+  long nextRand(long x) {
+    x ^= (x << 13);
+    x ^= (x >>> 7);
+    return (x ^= (x << 17));
+  }
+
+  public void writeRandom(DataOutput out, final int size) throws IOException {
+    long tmp = seed;
+    out.writeLong(tmp);
+    int i = size - (Long.SIZE / Byte.SIZE);
+    while (i > Long.SIZE / Byte.SIZE - 1) {
+      tmp = nextRand(tmp);
+      out.writeLong(tmp);
+      i -= Long.SIZE / Byte.SIZE;
+    }
+    for (tmp = nextRand(tmp); i > 0; --i) {
+      out.writeByte((int)(tmp & 0xFF));
+      tmp >>>= Byte.SIZE;
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    size = WritableUtils.readVInt(in);
+    int payload = size - WritableUtils.getVIntSize(size);
+    if (payload > Long.SIZE / Byte.SIZE) {
+      seed = in.readLong();
+      payload -= Long.SIZE / Byte.SIZE;
+    } else {
+      Arrays.fill(literal, (byte)0);
+      in.readFully(literal, 0, payload);
+      dib.reset(literal, 0, literal.length);
+      seed = dib.readLong();
+      payload = 0;
+    }
+    final int vBytes = in.skipBytes(payload);
+    if (vBytes != payload) {
+      throw new EOFException("Expected " + payload + ", read " + vBytes);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    // data bytes including vint encoding
+    WritableUtils.writeVInt(out, size);
+    final int payload = size - WritableUtils.getVIntSize(size);
+    if (payload > Long.SIZE / Byte.SIZE) {
+      writeRandom(out, payload);
+    } else if (payload > 0) {
+      out.write(literal, 0, payload);
+    }
+  }
+
+  @Override
+  public int compareTo(GridmixRecord other) {
+    return compareSeed(other.seed,
+        Math.max(0, other.getSize() - other.fixedBytes()));
+  }
+
+  int fixedBytes() {
+    // min vint size
+    return FIXED_BYTES;
+  }
+
+  private static long maskSeed(long sd, int sz) {
+    // Don't use fixedBytes here; subclasses will set intended random len
+    if (sz <= FIXED_BYTES) {
+      sd = 0L;
+    } else if (sz < Long.SIZE / Byte.SIZE + FIXED_BYTES) {
+      final int tmp = sz - FIXED_BYTES;
+      final long mask = (1L << (Byte.SIZE * tmp)) - 1;
+      sd &= mask << (Byte.SIZE * (Long.SIZE / Byte.SIZE - tmp));
+    }
+    return sd;
+  }
+
+  int compareSeed(long jSeed, int jSize) {
+    final int iSize = Math.max(0, getSize() - fixedBytes());
+    final int seedLen = Math.min(iSize, jSize) + FIXED_BYTES;
+    jSeed = maskSeed(jSeed, seedLen);
+    long iSeed = maskSeed(seed, seedLen);
+    final int cmplen = Math.min(iSize, jSize);
+    for (int i = 0; i < cmplen; i += Byte.SIZE) {
+      final int k = cmplen - i;
+      for (long j = Long.SIZE - Byte.SIZE;
+          j >= Math.max(0, Long.SIZE / Byte.SIZE - k) * Byte.SIZE;
+          j -= Byte.SIZE) {
+        final int xi = (int)((iSeed >>> j) & 0xFFL);
+        final int xj = (int)((jSeed >>> j) & 0xFFL);
+        if (xi != xj) {
+          return xi - xj;
+        }
+      }
+      iSeed = nextRand(iSeed);
+      jSeed = nextRand(jSeed);
+    }
+    return iSize - jSize;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    }
+    if (other != null && other.getClass() == getClass()) {
+      final GridmixRecord o = ((GridmixRecord)other);
+      return getSize() == o.getSize() && seed == o.seed;
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return (int)(seed * getSize());
+  }
+
+  public static class Comparator extends WritableComparator {
+
+    public Comparator() {
+      super(GridmixRecord.class);
+    }
+
+    public Comparator(Class<? extends WritableComparable<?>> sub) {
+      super(sub);
+    }
+
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      int n1 = WritableUtils.decodeVIntSize(b1[s1]);
+      int n2 = WritableUtils.decodeVIntSize(b2[s2]);
+      n1 -= WritableUtils.getVIntSize(n1);
+      n2 -= WritableUtils.getVIntSize(n2);
+      return compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2-n2);
+    }
+
+    static {
+      WritableComparator.define(GridmixRecord.class, new Comparator());
+    }
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java?rev=889778&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java Fri Dec 11 19:32:58 2009
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+
+class GridmixSplit extends CombineFileSplit {
+  private int id;
+  private int nSpec;
+  private int maps;
+  private int reduces;
+  private long inputRecords;
+  private long outputBytes;
+  private long outputRecords;
+  private long maxMemory;
+  private double[] reduceBytes = new double[0];
+  private double[] reduceRecords = new double[0];
+
+  // Spec for reduces id mod this
+  private long[] reduceOutputBytes = new long[0];
+  private long[] reduceOutputRecords = new long[0];
+
+  GridmixSplit() {
+    super();
+  }
+
+  public GridmixSplit(CombineFileSplit cfsplit, int maps, int id,
+      long inputBytes, long inputRecords, long outputBytes,
+      long outputRecords, double[] reduceBytes, double[] reduceRecords,
+      long[] reduceOutputBytes, long[] reduceOutputRecords)
+      throws IOException {
+    super(cfsplit);
+    this.id = id;
+    this.maps = maps;
+    reduces = reduceBytes.length;
+    this.inputRecords = inputRecords;
+    this.outputBytes = outputBytes;
+    this.outputRecords = outputRecords;
+    this.reduceBytes = reduceBytes;
+    this.reduceRecords = reduceRecords;
+    nSpec = reduceOutputBytes.length;
+    this.reduceOutputBytes = reduceOutputBytes;
+    this.reduceOutputRecords = reduceOutputRecords;
+  }
+  public int getId() {
+    return id;
+  }
+  public int getMapCount() {
+    return maps;
+  }
+  public long getInputRecords() {
+    return inputRecords;
+  }
+  public long[] getOutputBytes() {
+    if (0 == reduces) {
+      return new long[] { outputBytes };
+    }
+    final long[] ret = new long[reduces];
+    for (int i = 0; i < reduces; ++i) {
+      ret[i] = Math.round(outputBytes * reduceBytes[i]);
+    }
+    return ret;
+  }
+  public long[] getOutputRecords() {
+    if (0 == reduces) {
+      return new long[] { outputRecords };
+    }
+    final long[] ret = new long[reduces];
+    for (int i = 0; i < reduces; ++i) {
+      ret[i] = Math.round(outputRecords * reduceRecords[i]);
+    }
+    return ret;
+  }
+  public long getReduceBytes(int i) {
+    return reduceOutputBytes[i];
+  }
+  public long getReduceRecords(int i) {
+    return reduceOutputRecords[i];
+  }
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    WritableUtils.writeVInt(out, id);
+    WritableUtils.writeVInt(out, maps);
+    WritableUtils.writeVLong(out, inputRecords);
+    WritableUtils.writeVLong(out, outputBytes);
+    WritableUtils.writeVLong(out, outputRecords);
+    WritableUtils.writeVLong(out, maxMemory);
+    WritableUtils.writeVInt(out, reduces);
+    for (int i = 0; i < reduces; ++i) {
+      out.writeDouble(reduceBytes[i]);
+      out.writeDouble(reduceRecords[i]);
+    }
+    WritableUtils.writeVInt(out, nSpec);
+    for (int i = 0; i < nSpec; ++i) {
+      WritableUtils.writeVLong(out, reduceOutputBytes[i]);
+      WritableUtils.writeVLong(out, reduceOutputRecords[i]);
+    }
+  }
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    id = WritableUtils.readVInt(in);
+    maps = WritableUtils.readVInt(in);
+    inputRecords = WritableUtils.readVLong(in);
+    outputBytes = WritableUtils.readVLong(in);
+    outputRecords = WritableUtils.readVLong(in);
+    maxMemory = WritableUtils.readVLong(in);
+    reduces = WritableUtils.readVInt(in);
+    if (reduceBytes.length < reduces) {
+      reduceBytes = new double[reduces];
+      reduceRecords = new double[reduces];
+    }
+    for (int i = 0; i < reduces; ++i) {
+      reduceBytes[i] = in.readDouble();
+      reduceRecords[i] = in.readDouble();
+    }
+    nSpec = WritableUtils.readVInt(in);
+    if (reduceOutputBytes.length < nSpec) {
+      reduceOutputBytes = new long[nSpec];
+      reduceOutputRecords = new long[nSpec];
+    }
+    for (int i = 0; i < nSpec; ++i) {
+      reduceOutputBytes[i] = WritableUtils.readVLong(in);
+      reduceOutputRecords[i] = WritableUtils.readVLong(in);
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java?rev=889778&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java Fri Dec 11 19:32:58 2009
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Given a {@link #FilePool}, obtain a set of files capable of satisfying
+ * a full set of splits, then iterate over each source to fill the request.
+ */
+class InputStriper {
+  public static final Log LOG = LogFactory.getLog(InputStriper.class);
+  int idx;
+  long currentStart;
+  FileStatus current;
+  final List<FileStatus> files = new ArrayList<FileStatus>();
+
+  /**
+   * @param inputDir Pool from which files are requested.
+   * @param mapBytes Sum of all expected split requests.
+   */
+  InputStriper(FilePool inputDir, long mapBytes)
+      throws IOException {
+    final long inputBytes = inputDir.getInputFiles(mapBytes, files);
+    if (mapBytes > inputBytes) {
+      LOG.warn("Using " + inputBytes + "/" + mapBytes + " bytes");
+    }
+    if (files.isEmpty() && mapBytes > 0) {
+      throw new IOException("Failed to satisfy request for " + mapBytes);
+    }
+    current = files.isEmpty() ? null : files.get(0);
+  }
+
+  /**
+   * @param inputDir Pool used to resolve block locations.
+   * @param bytes Target byte count
+   * @param nLocs Number of block locations per split.
+   * @return A set of files satisfying the byte count, with locations weighted
+   *         to the dominating proportion of input bytes.
+   */
+  CombineFileSplit splitFor(FilePool inputDir, long bytes, int nLocs)
+      throws IOException {
+    final ArrayList<Path> paths = new ArrayList<Path>();
+    final ArrayList<Long> start = new ArrayList<Long>();
+    final ArrayList<Long> length = new ArrayList<Long>();
+    final HashMap<String,Double> sb = new HashMap<String,Double>();
+    do {
+      paths.add(current.getPath());
+      start.add(currentStart);
+      final long fromFile = Math.min(bytes, current.getLen() - currentStart);
+      length.add(fromFile);
+      for (BlockLocation loc :
+          inputDir.locationsFor(current, currentStart, fromFile)) {
+        final double tedium = loc.getLength() / (1.0 * bytes);
+        for (String l : loc.getHosts()) {
+          Double j = sb.get(l);
+          if (null == j) {
+            sb.put(l, tedium);
+          } else {
+            sb.put(l, j.doubleValue() + tedium);
+          }
+        }
+      }
+      currentStart += fromFile;
+      bytes -= fromFile;
+      if (current.getLen() - currentStart == 0) {
+        current = files.get(++idx % files.size());
+        currentStart = 0;
+      }
+    } while (bytes > 0);
+    final ArrayList<Entry<String,Double>> sort =
+      new ArrayList<Entry<String,Double>>(sb.entrySet());
+    Collections.sort(sort, hostRank);
+    final String[] hosts = new String[Math.min(nLocs, sort.size())];
+    for (int i = 0; i < nLocs && i < sort.size(); ++i) {
+      hosts[i] = sort.get(i).getKey();
+    }
+    return new CombineFileSplit(paths.toArray(new Path[0]),
+        toLongArray(start), toLongArray(length), hosts);
+  }
+
+  private long[] toLongArray(final ArrayList<Long> sigh) {
+    final long[] ret = new long[sigh.size()];
+    for (int i = 0; i < ret.length; ++i) {
+      ret[i] = sigh.get(i);
+    }
+    return ret;
+  }
+
+  static final Comparator<Entry<String,Double>> hostRank =
+    new Comparator<Entry<String,Double>>() {
+      public int compare(Entry<String,Double> a, Entry<String,Double> b) {
+          final double va = a.getValue();
+          final double vb = b.getValue();
+          return va > vb ? -1 : va < vb ? 1 : 0;
+        }
+    };
+}

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java?rev=889778&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java Fri Dec 11 19:32:58 2009
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Factory passing reduce specification as its last record.
+ */
+class IntermediateRecordFactory extends RecordFactory {
+
+  private final GridmixKey.Spec spec;
+  private final RecordFactory factory;
+  private final int partition;
+  private final long targetRecords;
+  private boolean done = false;
+  private long accRecords = 0L;
+
+  /**
+   * @param targetBytes Expected byte count.
+   * @param targetRecords Expected record count; will emit spec records after
+   *                      this boundary is passed.
+   * @param partition Reduce to which records are emitted.
+   * @param spec Specification to emit.
+   * @param conf Unused.
+   */
+  public IntermediateRecordFactory(long targetBytes, long targetRecords,
+      int partition, GridmixKey.Spec spec, Configuration conf) {
+    this(new AvgRecordFactory(targetBytes, targetRecords, conf), partition,
+        targetRecords, spec, conf);
+  }
+
+  /**
+   * @param factory Factory from which byte/record counts are obtained.
+   * @param partition Reduce to which records are emitted.
+   * @param targetRecords Expected record count; will emit spec records after
+   *                      this boundary is passed.
+   * @param spec Specification to emit.
+   * @param conf Unused.
+   */
+  public IntermediateRecordFactory(RecordFactory factory, int partition,
+      long targetRecords, GridmixKey.Spec spec, Configuration conf) {
+    this.spec = spec;
+    this.factory = factory;
+    this.partition = partition;
+    this.targetRecords = targetRecords;
+  }
+
+  @Override
+  public boolean next(GridmixKey key, GridmixRecord val) throws IOException {
+    assert key != null;
+    final boolean rslt = factory.next(key, val);
+    ++accRecords;
+    if (rslt) {
+      if (accRecords < targetRecords) {
+        key.setType(GridmixKey.DATA);
+      } else {
+        final int orig = key.getSize();
+        key.setType(GridmixKey.REDUCE_SPEC);
+        spec.rec_in = accRecords;
+        key.setSpec(spec);
+        val.setSize(val.getSize() - (key.getSize() - orig));
+        // reset counters
+        accRecords = 0L;
+        spec.bytes_out = 0L;
+        spec.rec_out = 0L;
+        done = true;
+      }
+    } else if (!done) {
+      // ensure spec emitted
+      key.setType(GridmixKey.REDUCE_SPEC);
+      key.setPartition(partition);
+      key.setSize(0);
+      val.setSize(0);
+      spec.rec_in = 0L;
+      key.setSpec(spec);
+      done = true;
+      return true;
+    }
+    key.setPartition(partition);
+    return rslt;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return factory.getProgress();
+  }
+
+  @Override
+  public void close() throws IOException {
+    factory.close();
+  }
+}