You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:38:23 UTC

svn commit: r1077079 [2/11] - in /hadoop/common/branches/branch-0.20-security-patches: ./ src/contrib/ src/contrib/gridmix/ src/contrib/gridmix/ivy/ src/contrib/gridmix/src/ src/contrib/gridmix/src/java/ src/contrib/gridmix/src/java/org/ src/contrib/gr...

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,523 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Formatter;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.TaskInfo;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Synthetic job generated from a trace description.
+ */
+class GridmixJob implements Callable<Job>, Delayed {
+
+  public static final String JOBNAME = "GRIDMIX";
+  public static final String ORIGNAME = "gridmix.job.name.original";
+  public static final Log LOG = LogFactory.getLog(GridmixJob.class);
+
+  private static final ThreadLocal<Formatter> nameFormat =
+    new ThreadLocal<Formatter>() {
+      @Override
+      protected Formatter initialValue() {
+        final StringBuilder sb = new StringBuilder(JOBNAME.length() + 5);
+        sb.append(JOBNAME);
+        return new Formatter(sb);
+      }
+    };
+
+  private final int seq;
+  private final Path outdir;
+  protected final Job job;
+  private final JobStory jobdesc;
+  private final long submissionTimeNanos;
+
+  public GridmixJob(Configuration conf, long submissionMillis,
+      JobStory jobdesc, Path outRoot, int seq) throws IOException {
+    ((StringBuilder)nameFormat.get().out()).setLength(JOBNAME.length());
+    job = new Job(conf, nameFormat.get().format("%05d", seq).toString());
+    submissionTimeNanos = TimeUnit.NANOSECONDS.convert(
+        submissionMillis, TimeUnit.MILLISECONDS);
+    this.jobdesc = jobdesc;
+    this.seq = seq;
+    outdir = new Path(outRoot, "" + seq);
+  }
+
+  protected GridmixJob(Configuration conf, long submissionMillis, String name)
+      throws IOException {
+    job = new Job(conf, name);
+    submissionTimeNanos = TimeUnit.NANOSECONDS.convert(
+        submissionMillis, TimeUnit.MILLISECONDS);
+    jobdesc = null;
+    outdir = null;
+    seq = -1;
+  }
+
+  public String toString() {
+    return job.getJobName();
+  }
+
+  public long getDelay(TimeUnit unit) {
+    return unit.convert(submissionTimeNanos - System.nanoTime(),
+        TimeUnit.NANOSECONDS);
+  }
+
+  @Override
+  public int compareTo(Delayed other) {
+    if (this == other) {
+      return 0;
+    }
+    if (other instanceof GridmixJob) {
+      final long otherNanos = ((GridmixJob)other).submissionTimeNanos;
+      if (otherNanos < submissionTimeNanos) {
+        return 1;
+      }
+      if (otherNanos > submissionTimeNanos) {
+        return -1;
+      }
+      return id() - ((GridmixJob)other).id();
+    }
+    final long diff =
+      getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS);
+    return 0 == diff ? 0 : (diff > 0 ? 1 : -1);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    }
+    // not possible unless job is cloned; all jobs should be unique
+    return other instanceof GridmixJob && id() == ((GridmixJob)other).id();
+  }
+
+  @Override
+  public int hashCode() {
+    return id();
+  }
+
+  int id() {
+    return seq;
+  }
+
+  Job getJob() {
+    return job;
+  }
+
+  JobStory getJobDesc() {
+    return jobdesc;
+  }
+
+  public Job call() throws IOException, InterruptedException,
+                           ClassNotFoundException {
+    job.setMapperClass(GridmixMapper.class);
+    job.setReducerClass(GridmixReducer.class);
+    job.setNumReduceTasks(jobdesc.getNumberReduces());
+    job.setMapOutputKeyClass(GridmixKey.class);
+    job.setMapOutputValueClass(GridmixRecord.class);
+    job.setSortComparatorClass(GridmixKey.Comparator.class);
+    job.setGroupingComparatorClass(SpecGroupingComparator.class);
+    job.setInputFormatClass(GridmixInputFormat.class);
+    job.setOutputFormatClass(RawBytesOutputFormat.class);
+    job.setPartitionerClass(DraftPartitioner.class);
+    job.setJarByClass(GridmixJob.class);
+    job.getConfiguration().setInt("gridmix.job.seq", seq);
+    job.getConfiguration().set(ORIGNAME, null == jobdesc.getJobID()
+        ? "<unknown>" : jobdesc.getJobID().toString());
+    job.getConfiguration().setBoolean("mapred.used.genericoptionsparser", true);
+    FileInputFormat.addInputPath(job, new Path("ignored"));
+    FileOutputFormat.setOutputPath(job, outdir);
+    job.submit();
+    return job;
+  }
+
+  public static class DraftPartitioner<V> extends Partitioner<GridmixKey,V> {
+    public int getPartition(GridmixKey key, V value, int numReduceTasks) {
+      return key.getPartition();
+    }
+  }
+
+  public static class SpecGroupingComparator
+      implements RawComparator<GridmixKey> {
+    private final DataInputBuffer di = new DataInputBuffer();
+    private final byte[] reset = di.getData();
+    @Override
+    public int compare(GridmixKey g1, GridmixKey g2) {
+      final byte t1 = g1.getType();
+      final byte t2 = g2.getType();
+      if (t1 == GridmixKey.REDUCE_SPEC ||
+          t2 == GridmixKey.REDUCE_SPEC) {
+        return t1 - t2;
+      }
+      assert t1 == GridmixKey.DATA;
+      assert t2 == GridmixKey.DATA;
+      return g1.compareTo(g2);
+    }
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      try {
+        final int ret;
+        di.reset(b1, s1, l1);
+        final int x1 = WritableUtils.readVInt(di);
+        di.reset(b2, s2, l2);
+        final int x2 = WritableUtils.readVInt(di);
+        final int t1 = b1[s1 + x1];
+        final int t2 = b2[s2 + x2];
+        if (t1 == GridmixKey.REDUCE_SPEC ||
+            t2 == GridmixKey.REDUCE_SPEC) {
+          ret = t1 - t2;
+        } else {
+          assert t1 == GridmixKey.DATA;
+          assert t2 == GridmixKey.DATA;
+          ret =
+            WritableComparator.compareBytes(b1, s1, x1, b2, s2, x2);
+        }
+        di.reset(reset, 0, 0);
+        return ret;
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  public static class GridmixMapper
+      extends Mapper<NullWritable,GridmixRecord,GridmixKey,GridmixRecord> {
+
+    private double acc;
+    private double ratio;
+    private final ArrayList<RecordFactory> reduces =
+      new ArrayList<RecordFactory>();
+    private final Random r = new Random();
+
+    private final GridmixKey key = new GridmixKey();
+    private final GridmixRecord val = new GridmixRecord();
+
+    @Override
+    protected void setup(Context ctxt)
+        throws IOException, InterruptedException {
+      final Configuration conf = ctxt.getConfiguration();
+      final GridmixSplit split = (GridmixSplit) ctxt.getInputSplit();
+      final int maps = split.getMapCount();
+      final long[] reduceBytes = split.getOutputBytes();
+      final long[] reduceRecords = split.getOutputRecords();
+
+      long totalRecords = 0L;
+      final int nReduces = ctxt.getNumReduceTasks();
+      if (nReduces > 0) {
+        int idx = 0;
+        int id = split.getId();
+        for (int i = 0; i < nReduces; ++i) {
+          final GridmixKey.Spec spec = new GridmixKey.Spec();
+          if (i == id) {
+            spec.bytes_out = split.getReduceBytes(idx);
+            spec.rec_out = split.getReduceRecords(idx);
+            ++idx;
+            id += maps;
+          }
+          reduces.add(new IntermediateRecordFactory(
+              new AvgRecordFactory(reduceBytes[i], reduceRecords[i], conf),
+              i, reduceRecords[i], spec, conf));
+          totalRecords += reduceRecords[i];
+        }
+      } else {
+        reduces.add(new AvgRecordFactory(reduceBytes[0], reduceRecords[0],
+              conf));
+        totalRecords = reduceRecords[0];
+      }
+      final long splitRecords = split.getInputRecords();
+      final long inputRecords = splitRecords <= 0 && split.getLength() >= 0
+        ? Math.max(1,
+          split.getLength() / conf.getInt("gridmix.missing.rec.size", 64*1024))
+        : splitRecords;
+      ratio = totalRecords / (1.0 * inputRecords);
+      acc = 0.0;
+    }
+
+    @Override
+    public void map(NullWritable ignored, GridmixRecord rec,
+        Context context) throws IOException, InterruptedException {
+      acc += ratio;
+      while (acc >= 1.0 && !reduces.isEmpty()) {
+        key.setSeed(r.nextLong());
+        val.setSeed(r.nextLong());
+        final int idx = r.nextInt(reduces.size());
+        final RecordFactory f = reduces.get(idx);
+        if (!f.next(key, val)) {
+          reduces.remove(idx);
+          continue;
+        }
+        context.write(key, val);
+        acc -= 1.0;
+      }
+    }
+
+    @Override
+    public void cleanup(Context context)
+        throws IOException, InterruptedException {
+      for (RecordFactory factory : reduces) {
+        key.setSeed(r.nextLong());
+        while (factory.next(key, val)) {
+          context.write(key, val);
+          key.setSeed(r.nextLong());
+        }
+      }
+    }
+  }
+
+  public static class GridmixReducer
+      extends Reducer<GridmixKey,GridmixRecord,NullWritable,GridmixRecord> {
+
+    private final Random r = new Random();
+    private final GridmixRecord val = new GridmixRecord();
+
+    private double acc;
+    private double ratio;
+    private RecordFactory factory;
+
+    @Override
+    protected void setup(Context context)
+        throws IOException, InterruptedException {
+      if (!context.nextKey() ||
+           context.getCurrentKey().getType() != GridmixKey.REDUCE_SPEC) {
+        throw new IOException("Missing reduce spec");
+      }
+      long outBytes = 0L;
+      long outRecords = 0L;
+      long inRecords = 0L;
+      for (GridmixRecord ignored : context.getValues()) {
+        final GridmixKey spec = context.getCurrentKey();
+        inRecords += spec.getReduceInputRecords();
+        outBytes += spec.getReduceOutputBytes();
+        outRecords += spec.getReduceOutputRecords();
+      }
+      if (0 == outRecords && inRecords > 0) {
+        LOG.info("Spec output bytes w/o records. Using input record count");
+        outRecords = inRecords;
+      }
+      factory =
+        new AvgRecordFactory(outBytes, outRecords, context.getConfiguration());
+      ratio = outRecords / (1.0 * inRecords);
+      acc = 0.0;
+    }
+    @Override
+    protected void reduce(GridmixKey key, Iterable<GridmixRecord> values,
+        Context context) throws IOException, InterruptedException {
+      for (GridmixRecord ignored : values) {
+        acc += ratio;
+        while (acc >= 1.0 && factory.next(null, val)) {
+          context.write(NullWritable.get(), val);
+          acc -= 1.0;
+        }
+      }
+    }
+    @Override
+    protected void cleanup(Context context)
+        throws IOException, InterruptedException {
+      val.setSeed(r.nextLong());
+      while (factory.next(null, val)) {
+        context.write(NullWritable.get(), val);
+        val.setSeed(r.nextLong());
+      }
+    }
+  }
+
+  static class GridmixRecordReader
+      extends RecordReader<NullWritable,GridmixRecord> {
+
+    private RecordFactory factory;
+    private final Random r = new Random();
+    private final GridmixRecord val = new GridmixRecord();
+
+    public GridmixRecordReader() { }
+
+    @Override
+    public void initialize(InputSplit genericSplit, TaskAttemptContext ctxt)
+            throws IOException, InterruptedException {
+      final GridmixSplit split = (GridmixSplit)genericSplit;
+      final Configuration conf = ctxt.getConfiguration();
+      factory = new ReadRecordFactory(split.getLength(),
+          split.getInputRecords(), new FileQueue(split, conf), conf);
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException {
+      val.setSeed(r.nextLong());
+      return factory.next(null, val);
+    }
+    @Override
+    public float getProgress() throws IOException {
+      return factory.getProgress();
+    }
+    @Override
+    public NullWritable getCurrentKey() {
+      return NullWritable.get();
+    }
+    @Override
+    public GridmixRecord getCurrentValue() {
+      return val;
+    }
+    @Override
+    public void close() throws IOException {
+      factory.close();
+    }
+  }
+
+  static class GridmixInputFormat
+      extends InputFormat<NullWritable,GridmixRecord> {
+
+    @Override
+    public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
+      return pullDescription(jobCtxt.getConfiguration().getInt(
+            "gridmix.job.seq", -1));
+    }
+    @Override
+    public RecordReader<NullWritable,GridmixRecord> createRecordReader(
+        InputSplit split, final TaskAttemptContext taskContext)
+        throws IOException {
+      return new GridmixRecordReader();
+    }
+  }
+
+  static class RawBytesOutputFormat<K>
+      extends FileOutputFormat<K,GridmixRecord> {
+
+    @Override
+    public RecordWriter<K,GridmixRecord> getRecordWriter(
+        TaskAttemptContext job) throws IOException {
+
+      Path file = getDefaultWorkFile(job, "");
+      FileSystem fs = file.getFileSystem(job.getConfiguration());
+      final FSDataOutputStream fileOut = fs.create(file, false);
+      return new RecordWriter<K,GridmixRecord>() {
+        @Override
+        public void write(K ignored, GridmixRecord value)
+            throws IOException {
+          value.writeRandom(fileOut, value.getSize());
+        }
+        @Override
+        public void close(TaskAttemptContext ctxt) throws IOException {
+          fileOut.close();
+        }
+      };
+    }
+  }
+
+  // TODO replace with ThreadLocal submitter?
+  private static final ConcurrentHashMap<Integer,List<InputSplit>> descCache =
+    new ConcurrentHashMap<Integer,List<InputSplit>>();
+
+  static void pushDescription(int seq, List<InputSplit> splits) {
+    if (null != descCache.putIfAbsent(seq, splits)) {
+      throw new IllegalArgumentException("Description exists for id " + seq);
+    }
+  }
+
+  static List<InputSplit> pullDescription(int seq) {
+    return descCache.remove(seq);
+  }
+
+  // not nesc when TL
+  static void clearAll() {
+    descCache.clear();
+  }
+
+  void buildSplits(FilePool inputDir) throws IOException {
+    long mapInputBytesTotal = 0L;
+    long mapOutputBytesTotal = 0L;
+    long mapOutputRecordsTotal = 0L;
+    final JobStory jobdesc = getJobDesc();
+    if (null == jobdesc) {
+      return;
+    }
+    final int maps = jobdesc.getNumberMaps();
+    final int reds = jobdesc.getNumberReduces();
+    for (int i = 0; i < maps; ++i) {
+      final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);
+      mapInputBytesTotal += info.getInputBytes();
+      mapOutputBytesTotal += info.getOutputBytes();
+      mapOutputRecordsTotal += info.getOutputRecords();
+    }
+    final double[] reduceRecordRatio = new double[reds];
+    final double[] reduceByteRatio = new double[reds];
+    for (int i = 0; i < reds; ++i) {
+      final TaskInfo info = jobdesc.getTaskInfo(TaskType.REDUCE, i);
+      reduceByteRatio[i] = info.getInputBytes() / (1.0 * mapOutputBytesTotal);
+      reduceRecordRatio[i] =
+        info.getInputRecords() / (1.0 * mapOutputRecordsTotal);
+    }
+    final InputStriper striper = new InputStriper(inputDir, mapInputBytesTotal);
+    final List<InputSplit> splits = new ArrayList<InputSplit>();
+    for (int i = 0; i < maps; ++i) {
+      final int nSpec = reds / maps + ((reds % maps) > i ? 1 : 0);
+      final long[] specBytes = new long[nSpec];
+      final long[] specRecords = new long[nSpec];
+      for (int j = 0; j < nSpec; ++j) {
+        final TaskInfo info =
+          jobdesc.getTaskInfo(TaskType.REDUCE, i + j * maps);
+        specBytes[j] = info.getOutputBytes();
+        specRecords[j] = info.getOutputRecords();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i,
+              i + j * maps, info.getOutputRecords(), info.getOutputBytes()));
+        }
+      }
+      final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);
+      splits.add(new GridmixSplit(striper.splitFor(inputDir,
+              info.getInputBytes(), 3), maps, i,
+            info.getInputBytes(), info.getInputRecords(),
+            info.getOutputBytes(), info.getOutputRecords(),
+            reduceByteRatio, reduceRecordRatio, specBytes, specRecords));
+    }
+    pushDescription(id(), splits);
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.WritableComparator;
+
+class GridmixKey extends GridmixRecord {
+  static final byte REDUCE_SPEC = 0;
+  static final byte DATA = 1;
+
+  static final int META_BYTES = 1;
+
+  private byte type;
+  private int partition; // NOT serialized
+  private Spec spec = new Spec();
+
+  GridmixKey() {
+    this(DATA, 1, 0L);
+  }
+  GridmixKey(byte type, int size, long seed) {
+    super(size, seed);
+    this.type = type;
+    // setting type may change pcnt random bytes
+    setSize(size);
+  }
+
+  @Override
+  public int getSize() {
+    switch (type) {
+      case REDUCE_SPEC:
+        return super.getSize() + spec.getSize() + META_BYTES;
+      case DATA:
+        return super.getSize() + META_BYTES;
+      default:
+        throw new IllegalStateException("Invalid type: " + type);
+    }
+  }
+
+  @Override
+  public void setSize(int size) {
+    switch (type) {
+      case REDUCE_SPEC:
+        super.setSize(size - (META_BYTES + spec.getSize()));
+        break;
+      case DATA:
+        super.setSize(size - META_BYTES);
+        break;
+      default:
+        throw new IllegalStateException("Invalid type: " + type);
+    }
+  }
+
+  /**
+   * Partition is not serialized.
+   */
+  public int getPartition() {
+    return partition;
+  }
+  public void setPartition(int partition) {
+    this.partition = partition;
+  }
+
+  public long getReduceInputRecords() {
+    assert REDUCE_SPEC == getType();
+    return spec.rec_in;
+  }
+  public void setReduceInputRecords(long rec_in) {
+    assert REDUCE_SPEC == getType();
+    final int origSize = getSize();
+    spec.rec_in = rec_in;
+    setSize(origSize);
+  }
+
+  public long getReduceOutputRecords() {
+    assert REDUCE_SPEC == getType();
+    return spec.rec_out;
+  }
+  public void setReduceOutputRecords(long rec_out) {
+    assert REDUCE_SPEC == getType();
+    final int origSize = getSize();
+    spec.rec_out = rec_out;
+    setSize(origSize);
+  }
+
+  public long getReduceOutputBytes() {
+    assert REDUCE_SPEC == getType();
+    return spec.bytes_out;
+  };
+  public void setReduceOutputBytes(long b_out) {
+    assert REDUCE_SPEC == getType();
+    final int origSize = getSize();
+    spec.bytes_out = b_out;
+    setSize(origSize);
+  }
+
+  public byte getType() {
+    return type;
+  }
+  public void setType(byte type) throws IOException {
+    final int origSize = getSize();
+    switch (type) {
+      case REDUCE_SPEC:
+      case DATA:
+        this.type = type;
+        break;
+      default:
+        throw new IOException("Invalid type: " + type);
+    }
+    setSize(origSize);
+  }
+
+  public void setSpec(Spec spec) {
+    assert REDUCE_SPEC == getType();
+    final int origSize = getSize();
+    this.spec.set(spec);
+    setSize(origSize);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    setType(in.readByte());
+    if (REDUCE_SPEC == getType()) {
+      spec.readFields(in);
+    }
+  }
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    final byte t = getType();
+    out.writeByte(t);
+    if (REDUCE_SPEC == t) {
+      spec.write(out);
+    }
+  }
+  int fixedBytes() {
+    return super.fixedBytes() +
+      (REDUCE_SPEC == getType() ? spec.getSize() : 0) + META_BYTES;
+  }
+  @Override
+  public int compareTo(GridmixRecord other) {
+    final GridmixKey o = (GridmixKey) other;
+    final byte t1 = getType();
+    final byte t2 = o.getType();
+    if (t1 != t2) {
+      return t1 - t2;
+    }
+    return super.compareTo(other);
+  }
+
+  /**
+   * Note that while the spec is not explicitly included, changing the spec
+   * may change its size, which will affect equality.
+   */
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    }
+    if (other != null && other.getClass() == getClass()) {
+      final GridmixKey o = ((GridmixKey)other);
+      return getType() == o.getType() && super.equals(o);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return super.hashCode() ^ getType();
+  }
+
+  public static class Spec implements Writable {
+    long rec_in;
+    long rec_out;
+    long bytes_out;
+    public Spec() { }
+
+    public void set(Spec other) {
+      rec_in = other.rec_in;
+      bytes_out = other.bytes_out;
+      rec_out = other.rec_out;
+    }
+
+    public int getSize() {
+      return WritableUtils.getVIntSize(rec_in) +
+             WritableUtils.getVIntSize(rec_out) +
+             WritableUtils.getVIntSize(bytes_out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      rec_in = WritableUtils.readVLong(in);
+      rec_out = WritableUtils.readVLong(in);
+      bytes_out = WritableUtils.readVLong(in);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      WritableUtils.writeVLong(out, rec_in);
+      WritableUtils.writeVLong(out, rec_out);
+      WritableUtils.writeVLong(out, bytes_out);
+    }
+  }
+
+  public static class Comparator extends GridmixRecord.Comparator {
+
+    private final DataInputBuffer di = new DataInputBuffer();
+    private final byte[] reset = di.getData();
+
+    public Comparator() {
+      super(GridmixKey.class);
+    }
+
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      try {
+        di.reset(b1, s1, l1);
+        final int x1 = WritableUtils.readVInt(di);
+        di.reset(b2, s2, l2);
+        final int x2 = WritableUtils.readVInt(di);
+        final int ret = (b1[s1 + x1] != b2[s2 + x2])
+          ? b1[s1 + x1] - b2[s2 + x2]
+          : super.compare(b1, s1, x1, b2, s2, x2);
+        di.reset(reset, 0, 0);
+        return ret;
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    static {
+      WritableComparator.define(GridmixKey.class, new Comparator());
+    }
+  }
+}
+

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+
+class GridmixRecord implements WritableComparable<GridmixRecord> {
+
+  private static final int FIXED_BYTES = 1;
+  private int size = -1;
+  private long seed;
+  private final DataInputBuffer dib =
+    new DataInputBuffer();
+  private final DataOutputBuffer dob =
+    new DataOutputBuffer(Long.SIZE / Byte.SIZE);
+  private byte[] literal = dob.getData();
+
+  GridmixRecord() {
+    this(1, 0L);
+  }
+
+  GridmixRecord(int size, long seed) {
+    this.seed = seed;
+    setSizeInternal(size);
+  }
+
+  public int getSize() {
+    return size;
+  }
+
+  public void setSize(int size) {
+    setSizeInternal(size);
+  }
+
+  private void setSizeInternal(int size) {
+    this.size = Math.max(1, size);
+    try {
+      seed = maskSeed(seed, this.size);
+      dob.reset();
+      dob.writeLong(seed);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public final void setSeed(long seed) {
+    this.seed = seed;
+  }
+
+  /** Marsaglia, 2003. */
+  long nextRand(long x) {
+    x ^= (x << 13);
+    x ^= (x >>> 7);
+    return (x ^= (x << 17));
+  }
+
+  public void writeRandom(DataOutput out, final int size) throws IOException {
+    long tmp = seed;
+    out.writeLong(tmp);
+    int i = size - (Long.SIZE / Byte.SIZE);
+    while (i > Long.SIZE / Byte.SIZE - 1) {
+      tmp = nextRand(tmp);
+      out.writeLong(tmp);
+      i -= Long.SIZE / Byte.SIZE;
+    }
+    for (tmp = nextRand(tmp); i > 0; --i) {
+      out.writeByte((int)(tmp & 0xFF));
+      tmp >>>= Byte.SIZE;
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    size = WritableUtils.readVInt(in);
+    int payload = size - WritableUtils.getVIntSize(size);
+    if (payload > Long.SIZE / Byte.SIZE) {
+      seed = in.readLong();
+      payload -= Long.SIZE / Byte.SIZE;
+    } else {
+      Arrays.fill(literal, (byte)0);
+      in.readFully(literal, 0, payload);
+      dib.reset(literal, 0, literal.length);
+      seed = dib.readLong();
+      payload = 0;
+    }
+    final int vBytes = in.skipBytes(payload);
+    if (vBytes != payload) {
+      throw new EOFException("Expected " + payload + ", read " + vBytes);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    // data bytes including vint encoding
+    WritableUtils.writeVInt(out, size);
+    final int payload = size - WritableUtils.getVIntSize(size);
+    if (payload > Long.SIZE / Byte.SIZE) {
+      writeRandom(out, payload);
+    } else if (payload > 0) {
+      out.write(literal, 0, payload);
+    }
+  }
+
+  @Override
+  public int compareTo(GridmixRecord other) {
+    return compareSeed(other.seed,
+        Math.max(0, other.getSize() - other.fixedBytes()));
+  }
+
+  int fixedBytes() {
+    // min vint size
+    return FIXED_BYTES;
+  }
+
+  private static long maskSeed(long sd, int sz) {
+    // Don't use fixedBytes here; subclasses will set intended random len
+    if (sz <= FIXED_BYTES) {
+      sd = 0L;
+    } else if (sz < Long.SIZE / Byte.SIZE + FIXED_BYTES) {
+      final int tmp = sz - FIXED_BYTES;
+      final long mask = (1L << (Byte.SIZE * tmp)) - 1;
+      sd &= mask << (Byte.SIZE * (Long.SIZE / Byte.SIZE - tmp));
+    }
+    return sd;
+  }
+
+  int compareSeed(long jSeed, int jSize) {
+    final int iSize = Math.max(0, getSize() - fixedBytes());
+    final int seedLen = Math.min(iSize, jSize) + FIXED_BYTES;
+    jSeed = maskSeed(jSeed, seedLen);
+    long iSeed = maskSeed(seed, seedLen);
+    final int cmplen = Math.min(iSize, jSize);
+    for (int i = 0; i < cmplen; i += Byte.SIZE) {
+      final int k = cmplen - i;
+      for (long j = Long.SIZE - Byte.SIZE;
+          j >= Math.max(0, Long.SIZE / Byte.SIZE - k) * Byte.SIZE;
+          j -= Byte.SIZE) {
+        final int xi = (int)((iSeed >>> j) & 0xFFL);
+        final int xj = (int)((jSeed >>> j) & 0xFFL);
+        if (xi != xj) {
+          return xi - xj;
+        }
+      }
+      iSeed = nextRand(iSeed);
+      jSeed = nextRand(jSeed);
+    }
+    return iSize - jSize;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    }
+    if (other != null && other.getClass() == getClass()) {
+      final GridmixRecord o = ((GridmixRecord)other);
+      return getSize() == o.getSize() && seed == o.seed;
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return (int)(seed * getSize());
+  }
+
+  public static class Comparator extends WritableComparator {
+
+    public Comparator() {
+      super(GridmixRecord.class);
+    }
+
+    public Comparator(Class<? extends WritableComparable<?>> sub) {
+      super(sub);
+    }
+
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      int n1 = WritableUtils.decodeVIntSize(b1[s1]);
+      int n2 = WritableUtils.decodeVIntSize(b2[s2]);
+      n1 -= WritableUtils.getVIntSize(n1);
+      n2 -= WritableUtils.getVIntSize(n2);
+      return compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2-n2);
+    }
+
+    static {
+      WritableComparator.define(GridmixRecord.class, new Comparator());
+    }
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableUtils;
+
+class GridmixSplit extends CombineFileSplit {
+  private int id;
+  private int nSpec;
+  private int maps;
+  private int reduces;
+  private long inputRecords;
+  private long outputBytes;
+  private long outputRecords;
+  private long maxMemory;
+  private double[] reduceBytes = new double[0];
+  private double[] reduceRecords = new double[0];
+
+  // Spec for reduces id mod this
+  private long[] reduceOutputBytes = new long[0];
+  private long[] reduceOutputRecords = new long[0];
+
+  GridmixSplit() {
+    super();
+  }
+
+  public GridmixSplit(CombineFileSplit cfsplit, int maps, int id,
+      long inputBytes, long inputRecords, long outputBytes,
+      long outputRecords, double[] reduceBytes, double[] reduceRecords,
+      long[] reduceOutputBytes, long[] reduceOutputRecords)
+      throws IOException {
+    super(cfsplit);
+    this.id = id;
+    this.maps = maps;
+    reduces = reduceBytes.length;
+    this.inputRecords = inputRecords;
+    this.outputBytes = outputBytes;
+    this.outputRecords = outputRecords;
+    this.reduceBytes = reduceBytes;
+    this.reduceRecords = reduceRecords;
+    nSpec = reduceOutputBytes.length;
+    this.reduceOutputBytes = reduceOutputBytes;
+    this.reduceOutputRecords = reduceOutputRecords;
+  }
+  public int getId() {
+    return id;
+  }
+  public int getMapCount() {
+    return maps;
+  }
+  public long getInputRecords() {
+    return inputRecords;
+  }
+  public long[] getOutputBytes() {
+    if (0 == reduces) {
+      return new long[] { outputBytes };
+    }
+    final long[] ret = new long[reduces];
+    for (int i = 0; i < reduces; ++i) {
+      ret[i] = Math.round(outputBytes * reduceBytes[i]);
+    }
+    return ret;
+  }
+  public long[] getOutputRecords() {
+    if (0 == reduces) {
+      return new long[] { outputRecords };
+    }
+    final long[] ret = new long[reduces];
+    for (int i = 0; i < reduces; ++i) {
+      ret[i] = Math.round(outputRecords * reduceRecords[i]);
+    }
+    return ret;
+  }
+  public long getReduceBytes(int i) {
+    return reduceOutputBytes[i];
+  }
+  public long getReduceRecords(int i) {
+    return reduceOutputRecords[i];
+  }
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    WritableUtils.writeVInt(out, id);
+    WritableUtils.writeVInt(out, maps);
+    WritableUtils.writeVLong(out, inputRecords);
+    WritableUtils.writeVLong(out, outputBytes);
+    WritableUtils.writeVLong(out, outputRecords);
+    WritableUtils.writeVLong(out, maxMemory);
+    WritableUtils.writeVInt(out, reduces);
+    for (int i = 0; i < reduces; ++i) {
+      out.writeDouble(reduceBytes[i]);
+      out.writeDouble(reduceRecords[i]);
+    }
+    WritableUtils.writeVInt(out, nSpec);
+    for (int i = 0; i < nSpec; ++i) {
+      WritableUtils.writeVLong(out, reduceOutputBytes[i]);
+      WritableUtils.writeVLong(out, reduceOutputRecords[i]);
+    }
+  }
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    id = WritableUtils.readVInt(in);
+    maps = WritableUtils.readVInt(in);
+    inputRecords = WritableUtils.readVLong(in);
+    outputBytes = WritableUtils.readVLong(in);
+    outputRecords = WritableUtils.readVLong(in);
+    maxMemory = WritableUtils.readVLong(in);
+    reduces = WritableUtils.readVInt(in);
+    if (reduceBytes.length < reduces) {
+      reduceBytes = new double[reduces];
+      reduceRecords = new double[reduces];
+    }
+    for (int i = 0; i < reduces; ++i) {
+      reduceBytes[i] = in.readDouble();
+      reduceRecords[i] = in.readDouble();
+    }
+    nSpec = WritableUtils.readVInt(in);
+    if (reduceOutputBytes.length < nSpec) {
+      reduceOutputBytes = new long[nSpec];
+      reduceOutputRecords = new long[nSpec];
+    }
+    for (int i = 0; i < nSpec; ++i) {
+      reduceOutputBytes[i] = WritableUtils.readVLong(in);
+      reduceOutputRecords[i] = WritableUtils.readVLong(in);
+    }
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Given a {@link #FilePool}, obtain a set of files capable of satisfying
+ * a full set of splits, then iterate over each source to fill the request.
+ */
+class InputStriper {
+  public static final Log LOG = LogFactory.getLog(InputStriper.class);
+  int idx;
+  long currentStart;
+  FileStatus current;
+  final List<FileStatus> files = new ArrayList<FileStatus>();
+
+  /**
+   * @param inputDir Pool from which files are requested.
+   * @param mapBytes Sum of all expected split requests.
+   */
+  InputStriper(FilePool inputDir, long mapBytes)
+      throws IOException {
+    final long inputBytes = inputDir.getInputFiles(mapBytes, files);
+    if (mapBytes > inputBytes) {
+      LOG.warn("Using " + inputBytes + "/" + mapBytes + " bytes");
+    }
+    if (files.isEmpty() && mapBytes > 0) {
+      throw new IOException("Failed to satisfy request for " + mapBytes);
+    }
+    current = files.isEmpty() ? null : files.get(0);
+  }
+
+  /**
+   * @param inputDir Pool used to resolve block locations.
+   * @param bytes Target byte count
+   * @param nLocs Number of block locations per split.
+   * @return A set of files satisfying the byte count, with locations weighted
+   *         to the dominating proportion of input bytes.
+   */
+  CombineFileSplit splitFor(FilePool inputDir, long bytes, int nLocs)
+      throws IOException {
+    final ArrayList<Path> paths = new ArrayList<Path>();
+    final ArrayList<Long> start = new ArrayList<Long>();
+    final ArrayList<Long> length = new ArrayList<Long>();
+    final HashMap<String,Double> sb = new HashMap<String,Double>();
+    do {
+      paths.add(current.getPath());
+      start.add(currentStart);
+      final long fromFile = Math.min(bytes, current.getLen() - currentStart);
+      length.add(fromFile);
+      for (BlockLocation loc :
+          inputDir.locationsFor(current, currentStart, fromFile)) {
+        final double tedium = loc.getLength() / (1.0 * bytes);
+        for (String l : loc.getHosts()) {
+          Double j = sb.get(l);
+          if (null == j) {
+            sb.put(l, tedium);
+          } else {
+            sb.put(l, j.doubleValue() + tedium);
+          }
+        }
+      }
+      currentStart += fromFile;
+      bytes -= fromFile;
+      if (current.getLen() - currentStart == 0) {
+        current = files.get(++idx % files.size());
+        currentStart = 0;
+      }
+    } while (bytes > 0);
+    final ArrayList<Entry<String,Double>> sort =
+      new ArrayList<Entry<String,Double>>(sb.entrySet());
+    Collections.sort(sort, hostRank);
+    final String[] hosts = new String[Math.min(nLocs, sort.size())];
+    for (int i = 0; i < nLocs && i < sort.size(); ++i) {
+      hosts[i] = sort.get(i).getKey();
+    }
+    return new CombineFileSplit(paths.toArray(new Path[0]),
+        toLongArray(start), toLongArray(length), hosts);
+  }
+
+  private long[] toLongArray(final ArrayList<Long> sigh) {
+    final long[] ret = new long[sigh.size()];
+    for (int i = 0; i < ret.length; ++i) {
+      ret[i] = sigh.get(i);
+    }
+    return ret;
+  }
+
+  static final Comparator<Entry<String,Double>> hostRank =
+    new Comparator<Entry<String,Double>>() {
+      public int compare(Entry<String,Double> a, Entry<String,Double> b) {
+          final double va = a.getValue();
+          final double vb = b.getValue();
+          return va > vb ? -1 : va < vb ? 1 : 0;
+        }
+    };
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Factory passing reduce specification as its last record.
+ */
+class IntermediateRecordFactory extends RecordFactory {
+
+  private final GridmixKey.Spec spec;
+  private final RecordFactory factory;
+  private final int partition;
+  private final long targetRecords;
+  private boolean done = false;
+  private long accRecords = 0L;
+
+  /**
+   * @param targetBytes Expected byte count.
+   * @param targetRecords Expected record count; will emit spec records after
+   *                      this boundary is passed.
+   * @param partition Reduce to which records are emitted.
+   * @param spec Specification to emit.
+   * @param conf Unused.
+   */
+  public IntermediateRecordFactory(long targetBytes, long targetRecords,
+      int partition, GridmixKey.Spec spec, Configuration conf) {
+    this(new AvgRecordFactory(targetBytes, targetRecords, conf), partition,
+        targetRecords, spec, conf);
+  }
+
+  /**
+   * @param factory Factory from which byte/record counts are obtained.
+   * @param partition Reduce to which records are emitted.
+   * @param targetRecords Expected record count; will emit spec records after
+   *                      this boundary is passed.
+   * @param spec Specification to emit.
+   * @param conf Unused.
+   */
+  public IntermediateRecordFactory(RecordFactory factory, int partition,
+      long targetRecords, GridmixKey.Spec spec, Configuration conf) {
+    this.spec = spec;
+    this.factory = factory;
+    this.partition = partition;
+    this.targetRecords = targetRecords;
+  }
+
+  @Override
+  public boolean next(GridmixKey key, GridmixRecord val) throws IOException {
+    assert key != null;
+    final boolean rslt = factory.next(key, val);
+    ++accRecords;
+    if (rslt) {
+      if (accRecords < targetRecords) {
+        key.setType(GridmixKey.DATA);
+      } else {
+        final int orig = key.getSize();
+        key.setType(GridmixKey.REDUCE_SPEC);
+        spec.rec_in = accRecords;
+        key.setSpec(spec);
+        val.setSize(val.getSize() - (key.getSize() - orig));
+        // reset counters
+        accRecords = 0L;
+        spec.bytes_out = 0L;
+        spec.rec_out = 0L;
+        done = true;
+      }
+    } else if (!done) {
+      // ensure spec emitted
+      key.setType(GridmixKey.REDUCE_SPEC);
+      key.setPartition(partition);
+      key.setSize(0);
+      val.setSize(0);
+      spec.rec_in = 0L;
+      key.setSpec(spec);
+      done = true;
+      return true;
+    }
+    key.setPartition(partition);
+    return rslt;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return factory.getProgress();
+  }
+
+  @Override
+  public void close() throws IOException {
+    factory.close();
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.TaskInfo;
+import org.apache.hadoop.tools.rumen.ZombieJobProducer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+/**
+ * Component reading job traces generated by Rumen. Each job in the trace is
+ * assigned a sequence number and given a submission time relative to the
+ * job that preceded it. Jobs are enqueued in the JobSubmitter provided at
+ * construction.
+ * @see org.apache.hadoop.tools.rumen.HadoopLogsAnalyzer
+ */
+class JobFactory implements Gridmix.Component<Void> {
+
+  public static final Log LOG = LogFactory.getLog(JobFactory.class);
+
+  private final Path scratch;
+  private final float rateFactor;
+  private final Configuration conf;
+  private final ReaderThread rThread;
+  private final AtomicInteger sequence;
+  private final JobSubmitter submitter;
+  private final CountDownLatch startFlag;
+  private volatile IOException error = null;
+  protected final JobStoryProducer jobProducer;
+
+  /**
+   * Creating a new instance does not start the thread.
+   * @param submitter Component to which deserialized jobs are passed
+   * @param jobTrace Stream of job traces with which to construct a
+   *                 {@link org.apache.hadoop.tools.rumen.ZombieJobProducer}
+   * @param scratch Directory into which to write output from simulated jobs
+   * @param conf Config passed to all jobs to be submitted
+   * @param startFlag Latch released from main to start pipeline
+   */
+  public JobFactory(JobSubmitter submitter, InputStream jobTrace,
+      Path scratch, Configuration conf, CountDownLatch startFlag)
+      throws IOException {
+    this(submitter, new ZombieJobProducer(jobTrace, null), scratch, conf,
+        startFlag);
+  }
+
+  /**
+   * Constructor permitting JobStoryProducer to be mocked.
+   * @param submitter Component to which deserialized jobs are passed
+   * @param jobProducer Producer generating JobStory objects.
+   * @param scratch Directory into which to write output from simulated jobs
+   * @param conf Config passed to all jobs to be submitted
+   * @param startFlag Latch released from main to start pipeline
+   */
+  protected JobFactory(JobSubmitter submitter, JobStoryProducer jobProducer,
+      Path scratch, Configuration conf, CountDownLatch startFlag) {
+    sequence = new AtomicInteger(0);
+    this.scratch = scratch;
+    this.rateFactor = conf.getFloat(Gridmix.GRIDMIX_SUB_MUL, 1.0f);
+    this.jobProducer = jobProducer;
+    this.conf = new Configuration(conf);
+    this.submitter = submitter;
+    this.startFlag = startFlag;
+    this.rThread = new ReaderThread();
+  }
+
+  static class MinTaskInfo extends TaskInfo {
+    public MinTaskInfo(TaskInfo info) {
+      super(info.getInputBytes(), info.getInputRecords(),
+            info.getOutputBytes(), info.getOutputRecords(),
+            info.getTaskMemory());
+    }
+    public long getInputBytes() {
+      return Math.max(0, super.getInputBytes());
+    }
+    public int getInputRecords() {
+      return Math.max(0, super.getInputRecords());
+    }
+    public long getOutputBytes() {
+      return Math.max(0, super.getOutputBytes());
+    }
+    public int getOutputRecords() {
+      return Math.max(0, super.getOutputRecords());
+    }
+    public long getTaskMemory() {
+      return Math.max(0, super.getTaskMemory());
+    }
+  }
+
+  static class FilterJobStory implements JobStory {
+
+    protected final JobStory job;
+
+    public FilterJobStory(JobStory job) {
+      this.job = job;
+    }
+    public JobConf getJobConf() { return job.getJobConf(); }
+    public String getName() { return job.getName(); }
+    public JobID getJobID() { return job.getJobID(); }
+    public String getUser() { return job.getUser(); }
+    public long getSubmissionTime() { return job.getSubmissionTime(); }
+    public InputSplit[] getInputSplits() { return job.getInputSplits(); }
+    public int getNumberMaps() { return job.getNumberMaps(); }
+    public int getNumberReduces() { return job.getNumberReduces(); }
+    public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
+      return job.getTaskInfo(taskType, taskNumber);
+    }
+    public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType, int taskNumber,
+        int taskAttemptNumber) {
+      return job.getTaskAttemptInfo(taskType, taskNumber, taskAttemptNumber);
+    }
+    public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(
+        int taskNumber, int taskAttemptNumber, int locality) {
+      return job.getMapTaskAttemptInfoAdjusted(
+          taskNumber, taskAttemptNumber, locality);
+    }
+    public Values getOutcome() {
+      return job.getOutcome();
+    }
+  }
+
+  /**
+   * Worker thread responsible for reading descriptions, assigning sequence
+   * numbers, and normalizing time.
+   */
+  private class ReaderThread extends Thread {
+
+    public ReaderThread() {
+      super("GridmixJobFactory");
+    }
+
+    private JobStory getNextJobFiltered() throws IOException {
+      JobStory job;
+      do {
+        job = jobProducer.getNextJob();
+      } while (job != null
+          && (job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS ||
+              job.getSubmissionTime() < 0));
+      return null == job ? null : new FilterJobStory(job) {
+          @Override
+          public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
+            return new MinTaskInfo(this.job.getTaskInfo(taskType, taskNumber));
+          }
+        };
+    }
+
+    @Override
+    public void run() {
+      try {
+        startFlag.await();
+        if (Thread.currentThread().isInterrupted()) {
+          return;
+        }
+        final long initTime = TimeUnit.MILLISECONDS.convert(
+            System.nanoTime(), TimeUnit.NANOSECONDS);
+        LOG.debug("START @ " + initTime);
+        long first = -1;
+        long last = -1;
+        while (!Thread.currentThread().isInterrupted()) {
+          try {
+            final JobStory job = getNextJobFiltered();
+            if (null == job) {
+              return;
+            }
+            if (first < 0) {
+              first = job.getSubmissionTime();
+            }
+            final long current = job.getSubmissionTime();
+            if (current < last) {
+              LOG.warn("Job " + job.getJobID() + " out of order");
+              continue;
+            }
+            last = current;
+            submitter.add(new GridmixJob(conf, initTime +
+                  Math.round(rateFactor * (current - first)),
+                job, scratch, sequence.getAndIncrement()));
+          } catch (IOException e) {
+            JobFactory.this.error = e;
+            return;
+          }
+        }
+      } catch (InterruptedException e) {
+        // exit thread; ignore any jobs remaining in the trace
+        return;
+      } finally {
+        IOUtils.cleanup(null, jobProducer);
+      }
+    }
+  }
+
+  /**
+   * Obtain the error that caused the thread to exit unexpectedly.
+   */
+  public IOException error() {
+    return error;
+  }
+
+  /**
+   * Add is disabled.
+   * @throws UnsupportedOperationException
+   */
+  public void add(Void ignored) {
+    throw new UnsupportedOperationException(getClass().getName() +
+        " is at the start of the pipeline and accepts no events");
+  }
+
+  /**
+   * Start the reader thread, wait for latch if necessary.
+   */
+  public void start() {
+    rThread.start();
+  }
+
+  /**
+   * Wait for the reader thread to exhaust the job trace.
+   */
+  public void join(long millis) throws InterruptedException {
+    rThread.join(millis);
+  }
+
+  /**
+   * Interrupt the reader thread.
+   */
+  public void shutdown() {
+    rThread.interrupt();
+  }
+
+  /**
+   * Interrupt the reader thread. This requires no special consideration, as
+   * the thread has no pending work queue.
+   */
+  public void abort() {
+    // Currently no special work
+    rThread.interrupt();
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Component accepting submitted, running jobs and responsible for
+ * monitoring jobs for success and failure. Once a job is submitted, it is
+ * polled for status until complete. If a job is complete, then the monitor
+ * thread returns immediately to the queue. If not, the monitor will sleep
+ * for some duration.
+ */
+class JobMonitor implements Gridmix.Component<Job> {
+
+  public static final Log LOG = LogFactory.getLog(JobMonitor.class);
+
+  private final Queue<Job> mJobs;
+  private final MonitorThread mThread;
+  private final BlockingQueue<Job> runningJobs;
+  private final long pollDelayMillis;
+  private boolean graceful = false;
+  private boolean shutdown = false;
+
+  /**
+   * Create a JobMonitor with a default polling interval of 5s.
+   */
+  public JobMonitor() {
+    this(5, TimeUnit.SECONDS);
+  }
+
+  /**
+   * Create a JobMonitor that sleeps for the specified duration after
+   * polling a still-running job.
+   * @param pollDelay Delay after polling a running job
+   * @param unit Time unit for pollDelaySec (rounded to milliseconds)
+   */
+  public JobMonitor(int pollDelay, TimeUnit unit) {
+    mThread = new MonitorThread();
+    runningJobs = new LinkedBlockingQueue<Job>();
+    mJobs = new LinkedList<Job>();
+    this.pollDelayMillis = TimeUnit.MILLISECONDS.convert(pollDelay, unit);
+  }
+
+  /**
+   * Add a job to the polling queue.
+   */
+  public void add(Job job) throws InterruptedException {
+    runningJobs.put(job);
+  }
+
+  /**
+   * Temporary hook for recording job success.
+   */
+  protected void onSuccess(Job job) {
+    LOG.info(job.getJobName() + " (" + job.getJobID() + ")" + " success");
+  }
+
+  /**
+   * Temporary hook for recording job failure.
+   */
+  protected void onFailure(Job job) {
+    LOG.info(job.getJobName() + " (" + job.getJobID() + ")" + " failure");
+  }
+
+  /**
+   * If shutdown before all jobs have completed, any still-running jobs
+   * may be extracted from the component.
+   * @throws IllegalStateException If monitoring thread is still running.
+   * @return Any jobs submitted and not known to have completed.
+   */
+  List<Job> getRemainingJobs() {
+    if (mThread.isAlive()) {
+      LOG.warn("Internal error: Polling running monitor for jobs");
+    }
+    synchronized (mJobs) {
+      return new ArrayList<Job>(mJobs);
+    }
+  }
+
+  /**
+   * Monitoring thread pulling running jobs from the component and into
+   * a queue to be polled for status.
+   */
+  private class MonitorThread extends Thread {
+
+    public MonitorThread() {
+      super("GridmixJobMonitor");
+    }
+
+    /**
+     * Check a job for success or failure.
+     */
+    public void process(Job job) throws IOException, InterruptedException {
+      if (job.isSuccessful()) {
+        onSuccess(job);
+      } else {
+        onFailure(job);
+      }
+    }
+
+    @Override
+    public void run() {
+      boolean graceful;
+      boolean shutdown;
+      while (true) {
+        try {
+          synchronized (mJobs) {
+            graceful = JobMonitor.this.graceful;
+            shutdown = JobMonitor.this.shutdown;
+            runningJobs.drainTo(mJobs);
+          }
+
+          // shutdown conditions; either shutdown requested and all jobs
+          // have completed or abort requested and there are recently
+          // submitted jobs not in the monitored set
+          if (shutdown) {
+            if (!graceful) {
+              while (!runningJobs.isEmpty()) {
+                synchronized (mJobs) {
+                  runningJobs.drainTo(mJobs);
+                }
+              }
+              break;
+            } else if (mJobs.isEmpty()) {
+              break;
+            }
+          }
+          while (!mJobs.isEmpty()) {
+            Job job;
+            synchronized (mJobs) {
+              job = mJobs.poll();
+            }
+            try {
+              if (job.isComplete()) {
+                process(job);
+                continue;
+              }
+            } catch (IOException e) {
+              if (e.getCause() instanceof ClosedByInterruptException) {
+                // Job doesn't throw InterruptedException, but RPC socket layer
+                // is blocking and may throw a wrapped Exception if this thread
+                // is interrupted. Since the lower level cleared the flag,
+                // reset it here
+                Thread.currentThread().interrupt();
+              } else {
+                LOG.warn("Lost job " + (null == job.getJobName()
+                     ? "<unknown>" : job.getJobName()), e);
+                continue;
+              }
+            }
+            synchronized (mJobs) {
+              if (!mJobs.offer(job)) {
+                LOG.error("Lost job " + (null == job.getJobName()
+                     ? "<unknown>" : job.getJobName())); // should never
+                                                         // happen
+              }
+            }
+            break;
+          }
+          try {
+            TimeUnit.MILLISECONDS.sleep(pollDelayMillis);
+          } catch (InterruptedException e) {
+            shutdown = true;
+            continue;
+          }
+        } catch (Throwable e) {
+          LOG.warn("Unexpected exception: ", e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Start the internal, monitoring thread.
+   */
+  public void start() {
+    mThread.start();
+  }
+
+  /**
+   * Wait for the monitor to halt, assuming shutdown or abort have been
+   * called. Note that, since submission may be sporatic, this will hang
+   * if no form of shutdown has been requested.
+   */
+  public void join(long millis) throws InterruptedException {
+    mThread.join(millis);
+  }
+
+  /**
+   * Drain all submitted jobs to a queue and stop the monitoring thread.
+   * Upstream submitter is assumed dead.
+   */
+  public void abort() {
+    synchronized (mJobs) {
+      graceful = false;
+      shutdown = true;
+    }
+    mThread.interrupt();
+  }
+
+  /**
+   * When all monitored jobs have completed, stop the monitoring thread.
+   * Upstream submitter is assumed dead.
+   */
+  public void shutdown() {
+    synchronized (mJobs) {
+      graceful = true;
+      shutdown = true;
+    }
+    mThread.interrupt();
+  }
+}
+
+

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Component accepting deserialized job traces, computing split data, and
+ * submitting to the cluster on deadline. Each job added from an upstream
+ * factory must be submitted to the cluster by the deadline recorded on it.
+ * Once submitted, jobs must be added to a downstream component for
+ * monitoring.
+ */
+class JobSubmitter implements Gridmix.Component<GridmixJob> {
+
+  public static final Log LOG = LogFactory.getLog(JobSubmitter.class);
+
+  final Semaphore sem;
+  private final FilePool inputDir;
+  private final JobMonitor monitor;
+  private final ExecutorService sched;
+  private volatile boolean shutdown = false;
+
+  /**
+   * Initialize the submission component with downstream monitor and pool of
+   * files from which split data may be read.
+   * @param monitor Monitor component to which jobs should be passed
+   * @param threads Number of submission threads
+   *   See {@link Gridmix#GRIDMIX_SUB_THR}.
+   * @param queueDepth Max depth of pending work queue
+   *   See {@link Gridmix#GRIDMIX_QUE_DEP}.
+   * @param inputDir Set of files from which split data may be mined for
+   *   synthetic jobs.
+   */
+  public JobSubmitter(JobMonitor monitor, int threads, int queueDepth,
+      FilePool inputDir) {
+    sem = new Semaphore(queueDepth);
+    sched = new ThreadPoolExecutor(threads, threads, 0L,
+        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+    this.inputDir = inputDir;
+    this.monitor = monitor;
+  }
+
+  /**
+   * Runnable wrapping a job to be submitted to the cluster.
+   */
+  private class SubmitTask implements Runnable {
+
+    final GridmixJob job;
+    public SubmitTask(GridmixJob job) {
+      this.job = job;
+    }
+    public void run() {
+      try {
+        // pre-compute split information
+        try {
+          job.buildSplits(inputDir);
+        } catch (IOException e) {
+          LOG.warn("Failed to submit " + job.getJob().getJobName(), e);
+          return;
+        }
+        // Sleep until deadline
+        long nsDelay = job.getDelay(TimeUnit.NANOSECONDS);
+        while (nsDelay > 0) {
+          TimeUnit.NANOSECONDS.sleep(nsDelay);
+          nsDelay = job.getDelay(TimeUnit.NANOSECONDS);
+        }
+        try {
+          // submit job
+          monitor.add(job.call());
+          LOG.debug("SUBMIT " + job + "@" + System.currentTimeMillis() +
+              " (" + job.getJob().getJobID() + ")");
+        } catch (IOException e) {
+          LOG.warn("Failed to submit " + job.getJob().getJobName(), e);
+          if (e.getCause() instanceof ClosedByInterruptException) {
+            throw new InterruptedException("Failed to submit " +
+                job.getJob().getJobName());
+          }
+        } catch (ClassNotFoundException e) {
+          LOG.warn("Failed to submit " + job.getJob().getJobName(), e);
+        }
+      } catch (InterruptedException e) {
+        // abort execution, remove splits if nesc
+        // TODO release ThdLoc
+        GridmixJob.pullDescription(job.id());
+        Thread.currentThread().interrupt();
+        return;
+      } finally {
+        sem.release();
+      }
+    }
+  }
+
+  /**
+   * Enqueue the job to be submitted per the deadline associated with it.
+   */
+  public void add(final GridmixJob job) throws InterruptedException {
+    final boolean addToQueue = !shutdown;
+    if (addToQueue) {
+      final SubmitTask task = new SubmitTask(job);
+      sem.acquire();
+      try {
+        sched.execute(task);
+      } catch (RejectedExecutionException e) {
+        sem.release();
+      }
+    }
+  }
+
+  /**
+   * (Re)scan the set of input files from which splits are derived.
+   */
+  public void refreshFilePool() throws IOException {
+    inputDir.refresh();
+  }
+
+  /**
+   * Does nothing, as the threadpool is already initialized and waiting for
+   * work from the upstream factory.
+   */
+  public void start() { }
+
+  /**
+   * Continue running until all queued jobs have been submitted to the
+   * cluster.
+   */
+  public void join(long millis) throws InterruptedException {
+    if (!shutdown) {
+      throw new IllegalStateException("Cannot wait for active submit thread");
+    }
+    sched.awaitTermination(millis, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Finish all jobs pending submission, but do not accept new work.
+   */
+  public void shutdown() {
+    // complete pending tasks, but accept no new tasks
+    shutdown = true;
+    sched.shutdown();
+  }
+
+  /**
+   * Discard pending work, including precomputed work waiting to be
+   * submitted.
+   */
+  public void abort() {
+    //pendingJobs.clear();
+    shutdown = true;
+    sched.shutdownNow();
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReadRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReadRecordFactory.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReadRecordFactory.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReadRecordFactory.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+
+/**
+ * For every record consumed, read key + val bytes from the stream provided.
+ */
+class ReadRecordFactory extends RecordFactory {
+
+  /**
+   * Size of internal, scratch buffer to read from internal stream.
+   */
+  public static final String GRIDMIX_READ_BUF_SIZE = "gridmix.read.buffer.size";
+
+  private final byte[] buf;
+  private final InputStream src;
+  private final RecordFactory factory;
+
+  /**
+   * @param targetBytes Expected byte count.
+   * @param targetRecords Expected record count.
+   * @param src Stream to read bytes.
+   * @param conf Used to establish read buffer size. @see #GRIDMIX_READ_BUF_SIZE
+   */
+  public ReadRecordFactory(long targetBytes, long targetRecords,
+      InputStream src, Configuration conf) {
+    this(new AvgRecordFactory(targetBytes, targetRecords, conf), src, conf);
+  }
+
+  /**
+   * @param factory Factory to draw record sizes.
+   * @param src Stream to read bytes.
+   * @param conf Used to establish read buffer size. @see #GRIDMIX_READ_BUF_SIZE
+   */
+  public ReadRecordFactory(RecordFactory factory, InputStream src,
+      Configuration conf) {
+    this.src = src;
+    this.factory = factory;
+    buf = new byte[conf.getInt(GRIDMIX_READ_BUF_SIZE, 64 * 1024)];
+  }
+
+  @Override
+  public boolean next(GridmixKey key, GridmixRecord val) throws IOException {
+    if (!factory.next(key, val)) {
+      return false;
+    }
+    for (int len = (null == key ? 0 : key.getSize()) + val.getSize();
+         len > 0; len -= buf.length) {
+      IOUtils.readFully(src, buf, 0, Math.min(buf.length, len));
+    }
+    return true;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return factory.getProgress();
+  }
+
+  @Override
+  public void close() throws IOException {
+    IOUtils.cleanup(null, src);
+    factory.close();
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RecordFactory.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RecordFactory.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RecordFactory.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Interface for producing records as inputs and outputs to tasks.
+ */
+abstract class RecordFactory implements Closeable {
+
+  /**
+   * Transform the given record or perform some operation.
+   * @return true if the record should be emitted.
+   */
+  public abstract boolean next(GridmixKey key, GridmixRecord val)
+    throws IOException;
+
+  /**
+   * Estimate of exhausted record capacity.
+   */
+  public abstract float getProgress() throws IOException;
+
+}