You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:38:23 UTC

svn commit: r1077079 [3/11] - in /hadoop/common/branches/branch-0.20-security-patches: ./ src/contrib/ src/contrib/gridmix/ src/contrib/gridmix/ivy/ src/contrib/gridmix/src/ src/contrib/gridmix/src/java/ src/contrib/gridmix/src/java/org/ src/contrib/gr...

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.TaskInfo;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;;
+
+
+/**
+ * Component generating random job traces for testing on a single node.
+ */
+class DebugJobFactory extends JobFactory {
+
+  public DebugJobFactory(JobSubmitter submitter, Path scratch, int numJobs,
+      Configuration conf, CountDownLatch startFlag) throws IOException {
+    super(submitter, new DebugJobProducer(numJobs, conf), scratch, conf,
+        startFlag);
+  }
+
+  ArrayList<JobStory> getSubmitted() {
+    return ((DebugJobProducer)jobProducer).submitted;
+  }
+
+  private static class DebugJobProducer implements JobStoryProducer {
+    final ArrayList<JobStory> submitted;
+    private final Configuration conf;
+    private final AtomicInteger numJobs;
+
+    public DebugJobProducer(int numJobs, Configuration conf) {
+      super();
+      this.conf = conf;
+      this.numJobs = new AtomicInteger(numJobs);
+      this.submitted = new ArrayList<JobStory>();
+    }
+
+    @Override
+    public JobStory getNextJob() throws IOException {
+      if (numJobs.getAndDecrement() > 0) {
+        final MockJob ret = new MockJob(conf);
+        submitted.add(ret);
+        return ret;
+      }
+      return null;
+    }
+
+    @Override
+    public void close() { }
+  }
+
+  static double[] getDistr(Random r, double mindist, int size) {
+    assert 0.0 <= mindist && mindist <= 1.0;
+    final double min = mindist / size;
+    final double rem = 1.0 - min * size;
+    final double[] tmp = new double[size];
+    for (int i = 0; i < tmp.length - 1; ++i) {
+      tmp[i] = r.nextDouble() * rem;
+    }
+    tmp[tmp.length - 1] = rem;
+    Arrays.sort(tmp);
+
+    final double[] ret = new double[size];
+    ret[0] = tmp[0] + min;
+    for (int i = 1; i < size; ++i) {
+      ret[i] = tmp[i] - tmp[i-1] + min;
+    }
+    return ret;
+  }
+
+  /**
+   * Generate random task data for a synthetic job.
+   */
+  static class MockJob implements JobStory {
+
+    static final int MIN_REC = 1 << 14;
+    static final int MIN_BYTES = 1 << 20;
+    static final int VAR_REC = 1 << 14;
+    static final int VAR_BYTES = 4 << 20;
+    static final int MAX_MAP = 5;
+    static final int MAX_RED = 3;
+
+    static void initDist(Random r, double min, int[] recs, long[] bytes,
+        long tot_recs, long tot_bytes) {
+      final double[] recs_dist = getDistr(r, min, recs.length);
+      final double[] bytes_dist = getDistr(r, min, recs.length);
+      long totalbytes = 0L;
+      int totalrecs = 0;
+      for (int i = 0; i < recs.length; ++i) {
+        recs[i] = (int) Math.round(tot_recs * recs_dist[i]);
+        bytes[i] = Math.round(tot_bytes * bytes_dist[i]);
+        totalrecs += recs[i];
+        totalbytes += bytes[i];
+      }
+      // Add/remove excess
+      recs[0] += totalrecs - tot_recs;
+      bytes[0] += totalbytes - tot_bytes;
+      if (LOG.isInfoEnabled()) {
+        LOG.info("DIST: " + Arrays.toString(recs) + " " +
+            tot_recs + "/" + totalrecs + " " +
+            Arrays.toString(bytes) + " " + tot_bytes + "/" + totalbytes);
+      }
+    }
+
+    private static final AtomicInteger seq = new AtomicInteger(0);
+    // set timestamps in the past
+    private static final AtomicLong timestamp =
+      new AtomicLong(System.currentTimeMillis() -
+        TimeUnit.MILLISECONDS.convert(60, TimeUnit.DAYS));
+
+    private final int id;
+    private final String name;
+    private final int[] m_recsIn, m_recsOut, r_recsIn, r_recsOut;
+    private final long[] m_bytesIn, m_bytesOut, r_bytesIn, r_bytesOut;
+    private final long submitTime;
+
+    public MockJob(Configuration conf) {
+      final Random r = new Random();
+      final long seed = r.nextLong();
+      r.setSeed(seed);
+      id = seq.getAndIncrement();
+      name = String.format("MOCKJOB%05d", id);
+      LOG.info(name + " (" + seed + ")");
+      submitTime = timestamp.addAndGet(TimeUnit.MILLISECONDS.convert(
+            r.nextInt(10), TimeUnit.SECONDS));
+
+      m_recsIn = new int[r.nextInt(MAX_MAP) + 1];
+      m_bytesIn = new long[m_recsIn.length];
+      m_recsOut = new int[m_recsIn.length];
+      m_bytesOut = new long[m_recsIn.length];
+
+      r_recsIn = new int[r.nextInt(MAX_RED) + 1];
+      r_bytesIn = new long[r_recsIn.length];
+      r_recsOut = new int[r_recsIn.length];
+      r_bytesOut = new long[r_recsIn.length];
+
+      // map input
+      final long map_recs = r.nextInt(VAR_REC) + MIN_REC;
+      final long map_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
+      initDist(r, 0.5, m_recsIn, m_bytesIn, map_recs, map_bytes);
+
+      // shuffle
+      final long shuffle_recs = r.nextInt(VAR_REC) + MIN_REC;
+      final long shuffle_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
+      initDist(r, 0.4, m_recsOut, m_bytesOut, shuffle_recs, shuffle_bytes);
+      initDist(r, 0.8, r_recsIn, r_bytesIn, shuffle_recs, shuffle_bytes);
+
+      // reduce output
+      final long red_recs = r.nextInt(VAR_REC) + MIN_REC;
+      final long red_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
+      initDist(r, 0.4, r_recsOut, r_bytesOut, red_recs, red_bytes);
+
+      if (LOG.isDebugEnabled()) {
+        int iMapBTotal = 0, oMapBTotal = 0, iRedBTotal = 0, oRedBTotal = 0;
+        int iMapRTotal = 0, oMapRTotal = 0, iRedRTotal = 0, oRedRTotal = 0;
+        for (int i = 0; i < m_recsIn.length; ++i) {
+          iMapRTotal += m_recsIn[i];
+          iMapBTotal += m_bytesIn[i];
+          oMapRTotal += m_recsOut[i];
+          oMapBTotal += m_bytesOut[i];
+        }
+        for (int i = 0; i < r_recsIn.length; ++i) {
+          iRedRTotal += r_recsIn[i];
+          iRedBTotal += r_bytesIn[i];
+          oRedRTotal += r_recsOut[i];
+          oRedBTotal += r_bytesOut[i];
+        }
+        LOG.debug(String.format("%s: M (%03d) %6d/%10d -> %6d/%10d" +
+                                   " R (%03d) %6d/%10d -> %6d/%10d @%d", name,
+            m_bytesIn.length, iMapRTotal, iMapBTotal, oMapRTotal, oMapBTotal,
+            r_bytesIn.length, iRedRTotal, iRedBTotal, oRedRTotal, oRedBTotal,
+            submitTime));
+      }
+    }
+
+    @Override
+    public String getName() {
+      return name;
+    }
+
+    @Override
+    public String getUser() {
+      return "FOOBAR";
+    }
+
+    @Override
+    public JobID getJobID() {
+      return new JobID("job_mock_" + name, id);
+    }
+
+    @Override
+    public Values getOutcome() {
+      return Values.SUCCESS;
+    }
+
+    @Override
+    public long getSubmissionTime() {
+      return submitTime;
+    }
+
+    @Override
+    public int getNumberMaps() {
+      return m_bytesIn.length;
+    }
+
+    @Override
+    public int getNumberReduces() {
+      return r_bytesIn.length;
+    }
+
+    @Override
+    public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
+      switch (taskType) {
+        case MAP:
+          return new TaskInfo(m_bytesIn[taskNumber], m_recsIn[taskNumber],
+              m_bytesOut[taskNumber], m_recsOut[taskNumber], -1);
+        case REDUCE:
+          return new TaskInfo(r_bytesIn[taskNumber], r_recsIn[taskNumber],
+              r_bytesOut[taskNumber], r_recsOut[taskNumber], -1);
+        default:
+          throw new IllegalArgumentException("Not interested");
+      }
+    }
+
+    @Override
+    public InputSplit[] getInputSplits() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType,
+        int taskNumber, int taskAttemptNumber) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber,
+        int taskAttemptNumber, int locality) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public org.apache.hadoop.mapred.JobConf getJobConf() {
+      throw new UnsupportedOperationException();
+    }
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFilePool.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFilePool.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFilePool.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFilePool.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Random;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class TestFilePool {
+
+  static final Log LOG = LogFactory.getLog(TestFileQueue.class);
+  static final int NFILES = 26;
+  static final Path base = getBaseDir();
+
+  static Path getBaseDir() {
+    try {
+      final Configuration conf = new Configuration();
+      final FileSystem fs = FileSystem.getLocal(conf).getRaw();
+      return new Path(System.getProperty("test.build.data", "/tmp"),
+          "testFilePool").makeQualified(fs);
+    } catch (IOException e) {
+      fail();
+    }
+    return null;
+  }
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    final Configuration conf = new Configuration();
+    final FileSystem fs = FileSystem.getLocal(conf).getRaw();
+    fs.delete(base, true);
+    final Random r = new Random();
+    final long seed = r.nextLong();
+    r.setSeed(seed);
+    LOG.info("seed: " + seed);
+    fs.mkdirs(base);
+    for (int i = 0; i < NFILES; ++i) {
+      Path file = base;
+      for (double d = 0.6; d > 0.0; d *= 0.8) {
+        if (r.nextDouble() < d) {
+          file = new Path(base, Integer.toString(r.nextInt(3)));
+          continue;
+        }
+        break;
+      }
+      OutputStream out = null;
+      try {
+        out = fs.create(new Path(file, "" + (char)('A' + i)));
+        final byte[] b = new byte[1024];
+        Arrays.fill(b, (byte)('A' + i));
+        for (int len = ((i % 13) + 1) * 1024; len > 0; len -= 1024) {
+          out.write(b);
+        }
+      } finally {
+        if (out != null) {
+          out.close();
+        }
+      }
+    }
+  }
+
+  @AfterClass
+  public static void cleanup() throws IOException {
+    final Configuration conf = new Configuration();
+    final FileSystem fs = FileSystem.getLocal(conf).getRaw();
+    fs.delete(base, true);
+  }
+
+  @Test
+  public void testUnsuitable() throws Exception {
+    try {
+      final Configuration conf = new Configuration();
+      // all files 13k or less
+      conf.setLong(FilePool.GRIDMIX_MIN_FILE, 14 * 1024);
+      final FilePool pool = new FilePool(conf, base);
+      pool.refresh();
+    } catch (IOException e) {
+      return;
+    }
+    fail();
+  }
+
+  @Test
+  public void testPool() throws Exception {
+    final Random r = new Random();
+    final Configuration conf = new Configuration();
+    conf.setLong(FilePool.GRIDMIX_MIN_FILE, 3 * 1024);
+    final FilePool pool = new FilePool(conf, base);
+    pool.refresh();
+    final ArrayList<FileStatus> files = new ArrayList<FileStatus>();
+
+    // ensure 1k, 2k files excluded
+    final int expectedPoolSize = (NFILES / 2 * (NFILES / 2 + 1) - 6) * 1024;
+    assertEquals(expectedPoolSize, pool.getInputFiles(Long.MAX_VALUE, files));
+    assertEquals(NFILES - 4, files.size());
+
+    // exact match
+    files.clear();
+    assertEquals(expectedPoolSize, pool.getInputFiles(expectedPoolSize, files));
+
+    // match random within 12k
+    files.clear();
+    final long rand = r.nextInt(expectedPoolSize);
+    assertTrue("Missed: " + rand,
+        (NFILES / 2) * 1024 > rand - pool.getInputFiles(rand, files));
+
+    // all files
+    conf.setLong(FilePool.GRIDMIX_MIN_FILE, 0);
+    pool.refresh();
+    files.clear();
+    assertEquals((NFILES / 2 * (NFILES / 2 + 1)) * 1024,
+        pool.getInputFiles(Long.MAX_VALUE, files));
+  }
+
+  void checkSplitEq(FileSystem fs, CombineFileSplit split, long bytes)
+      throws Exception {
+    long splitBytes = 0L;
+    HashSet<Path> uniq = new HashSet<Path>();
+    for (int i = 0; i < split.getNumPaths(); ++i) {
+      splitBytes += split.getLength(i);
+      assertTrue(
+          split.getLength(i) <= fs.getFileStatus(split.getPath(i)).getLen());
+      assertFalse(uniq.contains(split.getPath(i)));
+      uniq.add(split.getPath(i));
+    }
+    assertEquals(bytes, splitBytes);
+  }
+
+  @Test
+  public void testStriper() throws Exception {
+    final Random r = new Random();
+    final Configuration conf = new Configuration();
+    final FileSystem fs = FileSystem.getLocal(conf).getRaw();
+    conf.setLong(FilePool.GRIDMIX_MIN_FILE, 3 * 1024);
+    final FilePool pool = new FilePool(conf, base) {
+      @Override
+      public BlockLocation[] locationsFor(FileStatus stat, long start, long len)
+          throws IOException {
+        return new BlockLocation[] { new BlockLocation() };
+      }
+    };
+    pool.refresh();
+
+    final int expectedPoolSize = (NFILES / 2 * (NFILES / 2 + 1) - 6) * 1024;
+    final InputStriper striper = new InputStriper(pool, expectedPoolSize);
+    int last = 0;
+    for (int i = 0; i < expectedPoolSize;
+        last = Math.min(expectedPoolSize - i, r.nextInt(expectedPoolSize))) {
+      checkSplitEq(fs, striper.splitFor(pool, last, 0), last);
+      i += last;
+    }
+    final InputStriper striper2 = new InputStriper(pool, expectedPoolSize);
+    checkSplitEq(fs, striper2.splitFor(pool, expectedPoolSize, 0),
+        expectedPoolSize);
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFileQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFileQueue.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFileQueue.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFileQueue.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class TestFileQueue {
+
+  static final Log LOG = LogFactory.getLog(TestFileQueue.class);
+  static final int NFILES = 4;
+  static final int BLOCK = 256;
+  static final Path[] paths = new Path[NFILES];
+  static final String[] loc = new String[NFILES];
+  static final long[] start = new long[NFILES];
+  static final long[] len = new long[NFILES];
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    final Configuration conf = new Configuration();
+    final FileSystem fs = FileSystem.getLocal(conf).getRaw();
+    final Path p = new Path(System.getProperty("test.build.data", "/tmp"),
+        "testFileQueue").makeQualified(fs);
+    fs.delete(p, true);
+    final byte[] b = new byte[BLOCK];
+    for (int i = 0; i < NFILES; ++i) {
+      Arrays.fill(b, (byte)('A' + i));
+      paths[i] = new Path(p, "" + (char)('A' + i));
+      OutputStream f = null;
+      try {
+        f = fs.create(paths[i]);
+        f.write(b);
+      } finally {
+        if (f != null) {
+          f.close();
+        }
+      }
+    }
+  }
+
+  @AfterClass
+  public static void cleanup() throws IOException {
+    final Configuration conf = new Configuration();
+    final FileSystem fs = FileSystem.getLocal(conf).getRaw();
+    final Path p = new Path(System.getProperty("test.build.data", "/tmp"),
+        "testFileQueue").makeQualified(fs);
+    fs.delete(p, true);
+  }
+
+  static ByteArrayOutputStream fillVerif() throws IOException {
+    final byte[] b = new byte[BLOCK];
+    final ByteArrayOutputStream out = new ByteArrayOutputStream();
+    for (int i = 0; i < NFILES; ++i) {
+      Arrays.fill(b, (byte)('A' + i));
+      out.write(b, 0, (int)len[i]);
+    }
+    return out;
+  }
+
+  @Test
+  public void testRepeat() throws Exception {
+    final Configuration conf = new Configuration();
+    Arrays.fill(loc, "");
+    Arrays.fill(start, 0L);
+    Arrays.fill(len, BLOCK);
+
+    final ByteArrayOutputStream out = fillVerif();
+    final FileQueue q =
+      new FileQueue(new CombineFileSplit(paths, start, len, loc), conf);
+    final byte[] verif = out.toByteArray();
+    final byte[] check = new byte[2 * NFILES * BLOCK];
+    q.read(check, 0, NFILES * BLOCK);
+    assertArrayEquals(verif, Arrays.copyOf(check, NFILES * BLOCK));
+
+    final byte[] verif2 = new byte[2 * NFILES * BLOCK];
+    System.arraycopy(verif, 0, verif2, 0, verif.length);
+    System.arraycopy(verif, 0, verif2, verif.length, verif.length);
+    q.read(check, 0, 2 * NFILES * BLOCK);
+    assertArrayEquals(verif2, check);
+
+  }
+
+  @Test
+  public void testUneven() throws Exception {
+    final Configuration conf = new Configuration();
+    Arrays.fill(loc, "");
+    Arrays.fill(start, 0L);
+    Arrays.fill(len, BLOCK);
+
+    final int B2 = BLOCK / 2;
+    for (int i = 0; i < NFILES; i += 2) {
+      start[i] += B2;
+      len[i] -= B2;
+    }
+    final FileQueue q =
+      new FileQueue(new CombineFileSplit(paths, start, len, loc), conf);
+    final ByteArrayOutputStream out = fillVerif();
+    final byte[] verif = out.toByteArray();
+    final byte[] check = new byte[NFILES / 2 * BLOCK + NFILES / 2 * B2];
+    q.read(check, 0, verif.length);
+    assertArrayEquals(verif, Arrays.copyOf(check, verif.length));
+    q.read(check, 0, verif.length);
+    assertArrayEquals(verif, Arrays.copyOf(check, verif.length));
+  }
+
+  @Test
+  public void testEmpty() throws Exception {
+    final Configuration conf = new Configuration();
+    // verify OK if unused
+    final FileQueue q = new FileQueue(new CombineFileSplit(
+          new Path[0], new long[0], new long[0], new String[0]), conf);
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+
+public class TestGridmixRecord {
+  private static final Log LOG = LogFactory.getLog(TestGridmixRecord.class);
+
+  static void lengthTest(GridmixRecord x, GridmixRecord y, int min,
+      int max) throws Exception {
+    final Random r = new Random();
+    final long seed = r.nextLong();
+    r.setSeed(seed);
+    LOG.info("length: " + seed);
+    final DataInputBuffer in = new DataInputBuffer();
+    final DataOutputBuffer out1 = new DataOutputBuffer();
+    final DataOutputBuffer out2 = new DataOutputBuffer();
+    for (int i = min; i < max; ++i) {
+      setSerialize(x, r.nextLong(), i, out1);
+      // check write
+      assertEquals(i, out1.getLength());
+      // write to stream
+      x.write(out2);
+      // check read
+      in.reset(out1.getData(), 0, out1.getLength());
+      y.readFields(in);
+      assertEquals(i, x.getSize());
+      assertEquals(i, y.getSize());
+    }
+    // check stream read
+    in.reset(out2.getData(), 0, out2.getLength());
+    for (int i = min; i < max; ++i) {
+      y.readFields(in);
+      assertEquals(i, y.getSize());
+    }
+  }
+
+  static void randomReplayTest(GridmixRecord x, GridmixRecord y, int min,
+      int max) throws Exception {
+    final Random r = new Random();
+    final long seed = r.nextLong();
+    r.setSeed(seed);
+    LOG.info("randReplay: " + seed);
+    final DataOutputBuffer out1 = new DataOutputBuffer();
+    for (int i = min; i < max; ++i) {
+      final int s = out1.getLength();
+      x.setSeed(r.nextLong());
+      x.setSize(i);
+      x.write(out1);
+      assertEquals(i, out1.getLength() - s);
+    }
+    final DataInputBuffer in = new DataInputBuffer();
+    in.reset(out1.getData(), 0, out1.getLength());
+    final DataOutputBuffer out2 = new DataOutputBuffer();
+    // deserialize written records, write to separate buffer
+    for (int i = min; i < max; ++i) {
+      final int s = in.getPosition();
+      y.readFields(in);
+      assertEquals(i, in.getPosition() - s);
+      y.write(out2);
+    }
+    // verify written contents match
+    assertEquals(out1.getLength(), out2.getLength());
+    // assumes that writes will grow buffer deterministically
+    assertEquals("Bad test", out1.getData().length, out2.getData().length);
+    assertArrayEquals(out1.getData(), out2.getData());
+  }
+
+  static void eqSeedTest(GridmixRecord x, GridmixRecord y, int max)
+      throws Exception {
+    final Random r = new Random();
+    final long s = r.nextLong();
+    r.setSeed(s);
+    LOG.info("eqSeed: " + s);
+    assertEquals(x.fixedBytes(), y.fixedBytes());
+    final int min = x.fixedBytes() + 1;
+    final DataOutputBuffer out1 = new DataOutputBuffer();
+    final DataOutputBuffer out2 = new DataOutputBuffer();
+    for (int i = min; i < max; ++i) {
+      final long seed = r.nextLong();
+      setSerialize(x, seed, i, out1);
+      setSerialize(y, seed, i, out2);
+      assertEquals(x, y);
+      assertEquals(x.hashCode(), y.hashCode());
+
+      // verify written contents match
+      assertEquals(out1.getLength(), out2.getLength());
+      // assumes that writes will grow buffer deterministically
+      assertEquals("Bad test", out1.getData().length, out2.getData().length);
+      assertArrayEquals(out1.getData(), out2.getData());
+    }
+  }
+
+  static void binSortTest(GridmixRecord x, GridmixRecord y, int min,
+      int max, WritableComparator cmp) throws Exception {
+    final Random r = new Random();
+    final long s = r.nextLong();
+    r.setSeed(s);
+    LOG.info("sort: " + s);
+    final DataOutputBuffer out1 = new DataOutputBuffer();
+    final DataOutputBuffer out2 = new DataOutputBuffer();
+    for (int i = min; i < max; ++i) {
+      final long seed1 = r.nextLong();
+      setSerialize(x, seed1, i, out1);
+      assertEquals(0, x.compareSeed(seed1, Math.max(0, i - x.fixedBytes())));
+
+      final long seed2 = r.nextLong();
+      setSerialize(y, seed2, i, out2);
+      assertEquals(0, y.compareSeed(seed2, Math.max(0, i - x.fixedBytes())));
+
+      // for eq sized records, ensure byte cmp where req
+      final int chk = WritableComparator.compareBytes(
+          out1.getData(), 0, out1.getLength(),
+          out2.getData(), 0, out2.getLength());
+      assertEquals(chk, x.compareTo(y));
+      assertEquals(chk, cmp.compare(
+            out1.getData(), 0, out1.getLength(),
+            out2.getData(), 0, out2.getLength()));
+      // write second copy, compare eq
+      final int s1 = out1.getLength();
+      x.write(out1);
+      assertEquals(0, cmp.compare(out1.getData(), 0, s1,
+            out1.getData(), s1, out1.getLength() - s1));
+      final int s2 = out2.getLength();
+      y.write(out2);
+      assertEquals(0, cmp.compare(out2.getData(), 0, s2,
+            out2.getData(), s2, out2.getLength() - s2));
+      assertEquals(chk, cmp.compare(out1.getData(), 0, s1,
+            out2.getData(), s2, out2.getLength() - s2));
+    }
+  }
+
+  static void checkSpec(GridmixKey a, GridmixKey b) throws Exception {
+    final Random r = new Random();
+    final long s = r.nextLong();
+    r.setSeed(s);
+    LOG.info("spec: " + s);
+    final DataInputBuffer in = new DataInputBuffer();
+    final DataOutputBuffer out = new DataOutputBuffer();
+    a.setType(GridmixKey.REDUCE_SPEC);
+    b.setType(GridmixKey.REDUCE_SPEC);
+    for (int i = 0; i < 100; ++i) {
+      final int in_rec = r.nextInt(Integer.MAX_VALUE);
+      a.setReduceInputRecords(in_rec);
+      final int out_rec = r.nextInt(Integer.MAX_VALUE);
+      a.setReduceOutputRecords(out_rec);
+      final int out_bytes = r.nextInt(Integer.MAX_VALUE);
+      a.setReduceOutputBytes(out_bytes);
+      final int min = WritableUtils.getVIntSize(in_rec)
+                    + WritableUtils.getVIntSize(out_rec)
+                    + WritableUtils.getVIntSize(out_bytes);
+      assertEquals(min + 2, a.fixedBytes()); // meta + vint min
+      final int size = r.nextInt(1024) + a.fixedBytes() + 1;
+      setSerialize(a, r.nextLong(), size, out);
+      assertEquals(size, out.getLength());
+      assertTrue(a.equals(a));
+      assertEquals(0, a.compareTo(a));
+
+      in.reset(out.getData(), 0, out.getLength());
+
+      b.readFields(in);
+      assertEquals(size, b.getSize());
+      assertEquals(in_rec, b.getReduceInputRecords());
+      assertEquals(out_rec, b.getReduceOutputRecords());
+      assertEquals(out_bytes, b.getReduceOutputBytes());
+      assertTrue(a.equals(b));
+      assertEquals(0, a.compareTo(b));
+      assertEquals(a.hashCode(), b.hashCode());
+    }
+  }
+
+  static void setSerialize(GridmixRecord x, long seed, int size,
+      DataOutputBuffer out) throws IOException {
+    x.setSeed(seed);
+    x.setSize(size);
+    out.reset();
+    x.write(out);
+  }
+
+  @Test
+  public void testKeySpec() throws Exception {
+    final int min = 5;
+    final int max = 300;
+    final GridmixKey a = new GridmixKey(GridmixKey.REDUCE_SPEC, 1, 0L);
+    final GridmixKey b = new GridmixKey(GridmixKey.REDUCE_SPEC, 1, 0L);
+    lengthTest(a, b, min, max);
+    randomReplayTest(a, b, min, max);
+    binSortTest(a, b, min, max, new GridmixKey.Comparator());
+    // 2 fixed GR bytes, 1 type, 3 spec
+    eqSeedTest(a, b, max);
+    checkSpec(a, b);
+  }
+
+  @Test
+  public void testKeyData() throws Exception {
+    final int min = 2;
+    final int max = 300;
+    final GridmixKey a = new GridmixKey(GridmixKey.DATA, 1, 0L);
+    final GridmixKey b = new GridmixKey(GridmixKey.DATA, 1, 0L);
+    lengthTest(a, b, min, max);
+    randomReplayTest(a, b, min, max);
+    binSortTest(a, b, min, max, new GridmixKey.Comparator());
+    // 2 fixed GR bytes, 1 type
+    eqSeedTest(a, b, 300);
+  }
+
+  @Test
+  public void testBaseRecord() throws Exception {
+    final int min = 1;
+    final int max = 300;
+    final GridmixRecord a = new GridmixRecord();
+    final GridmixRecord b = new GridmixRecord();
+    lengthTest(a, b, min, max);
+    randomReplayTest(a, b, min, max);
+    binSortTest(a, b, min, max, new GridmixRecord.Comparator());
+    // 2 fixed GR bytes
+    eqSeedTest(a, b, 300);
+  }
+
+  public static void main(String[] argv) throws Exception {
+    boolean fail = false;
+    final TestGridmixRecord test = new TestGridmixRecord();
+    try { test.testKeySpec(); } catch (Exception e) {
+      fail = true;
+      e.printStackTrace();
+    }
+    try {test.testKeyData(); } catch (Exception e) {
+      fail = true;
+      e.printStackTrace();
+    }
+    try {test.testBaseRecord(); } catch (Exception e) {
+      fail = true;
+      e.printStackTrace();
+    }
+    System.exit(fail ? -1 : 0);
+  }
+
+  static void printDebug(GridmixRecord a, GridmixRecord b) throws IOException {
+    DataOutputBuffer out = new DataOutputBuffer();
+    a.write(out);
+    System.out.println("A " +
+        Arrays.toString(Arrays.copyOf(out.getData(), out.getLength())));
+    out.reset();
+    b.write(out);
+    System.out.println("B " +
+        Arrays.toString(Arrays.copyOf(out.getData(), out.getLength())));
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.TaskInfo;
+import org.apache.hadoop.util.ToolRunner;
+import static org.apache.hadoop.mapred.Task.Counter.*;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
+
+public class TestGridmixSubmission {
+  {
+    ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.mapred.gridmix")
+        ).getLogger().setLevel(Level.DEBUG);
+  }
+
+  private static FileSystem dfs = null;
+  private static MiniDFSCluster dfsCluster = null;
+  private static MiniMRCluster mrCluster = null;
+
+  private static final int NJOBS = 2;
+  private static final long GENDATA = 50; // in megabytes
+  private static final int GENSLOP = 100 * 1024; // +/- 100k for logs
+
+  @BeforeClass
+  public static void initCluster() throws IOException {
+    Configuration conf = new Configuration();
+    dfsCluster = new MiniDFSCluster(conf, 3, true, null);
+    dfs = dfsCluster.getFileSystem();
+    mrCluster = new MiniMRCluster(3, dfs.getUri().toString(), 1, null, null,
+        new JobConf(conf));
+  }
+
+  @AfterClass
+  public static void shutdownCluster() throws IOException {
+    if (mrCluster != null) {
+      mrCluster.shutdown();
+    }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+    }
+  }
+
+  static class TestMonitor extends JobMonitor {
+
+    static final long SLOPBYTES = 1024;
+    private final int expected;
+    private final BlockingQueue<Job> retiredJobs;
+
+    public TestMonitor(int expected) {
+      super();
+      this.expected = expected;
+      retiredJobs = new LinkedBlockingQueue<Job>();
+    }
+
+    public void verify(ArrayList<JobStory> submitted) throws Exception {
+      final ArrayList<Job> succeeded = new ArrayList<Job>();
+      assertEquals("Bad job count", expected, retiredJobs.drainTo(succeeded));
+      final HashMap<String,JobStory> sub = new HashMap<String,JobStory>();
+      for (JobStory spec : submitted) {
+        sub.put(spec.getName(), spec);
+      }
+      final JobClient client = new JobClient(mrCluster.createJobConf());
+      for (Job job : succeeded) {
+        final String jobname = job.getJobName();
+        if ("GRIDMIX_GENDATA".equals(jobname)) {
+          final Path in = new Path("foo").makeQualified(dfs);
+          final Path out = new Path("/gridmix").makeQualified(dfs);
+          final ContentSummary generated = dfs.getContentSummary(in);
+          assertTrue("Mismatched data gen", // +/- 100k for logs
+              (GENDATA << 20) < generated.getLength() + GENSLOP ||
+              (GENDATA << 20) > generated.getLength() - GENSLOP);
+          FileStatus[] outstat = dfs.listStatus(out);
+          assertEquals("Mismatched job count", NJOBS, outstat.length);
+          continue;
+        }
+        final JobStory spec =
+          sub.get(job.getJobName().replace("GRIDMIX", "MOCKJOB"));
+        assertNotNull("No spec for " + job.getJobName(), spec);
+        assertNotNull("No counters for " + job.getJobName(), job.getCounters());
+
+        final int nMaps = spec.getNumberMaps();
+        final int nReds = spec.getNumberReduces();
+
+        // TODO Blocked by MAPREDUCE-118
+        if (true) return;
+        // TODO
+        System.out.println(jobname + ": " + nMaps + "/" + nReds);
+        final TaskReport[] mReports =
+          client.getMapTaskReports(JobID.downgrade(job.getJobID()));
+        assertEquals("Mismatched map count", nMaps, mReports.length);
+        check(TaskType.MAP, job, spec, mReports,
+            0, 0, SLOPBYTES, nReds);
+
+        final TaskReport[] rReports =
+          client.getReduceTaskReports(JobID.downgrade(job.getJobID()));
+        assertEquals("Mismatched reduce count", nReds, rReports.length);
+        check(TaskType.REDUCE, job, spec, rReports,
+            nMaps * SLOPBYTES, 2 * nMaps, 0, 0);
+      }
+    }
+
+    public void check(final TaskType type, Job job, JobStory spec,
+          final TaskReport[] runTasks,
+          long extraInputBytes, int extraInputRecords,
+          long extraOutputBytes, int extraOutputRecords) throws Exception {
+
+      long[] runInputRecords = new long[runTasks.length];
+      long[] runInputBytes = new long[runTasks.length];
+      long[] runOutputRecords = new long[runTasks.length];
+      long[] runOutputBytes = new long[runTasks.length];
+      long[] specInputRecords = new long[runTasks.length];
+      long[] specInputBytes = new long[runTasks.length];
+      long[] specOutputRecords = new long[runTasks.length];
+      long[] specOutputBytes = new long[runTasks.length];
+
+      for (int i = 0; i < runTasks.length; ++i) {
+        final TaskInfo specInfo;
+        final Counters counters = runTasks[i].getCounters();
+        switch (type) {
+          case MAP:
+             runInputBytes[i] = counters.findCounter("FileSystemCounters",
+                 "HDFS_BYTES_READ").getValue();
+             runInputRecords[i] =
+               (int)counters.findCounter(MAP_INPUT_RECORDS).getValue();
+             runOutputBytes[i] =
+               counters.findCounter(MAP_OUTPUT_BYTES).getValue();
+             runOutputRecords[i] =
+               (int)counters.findCounter(MAP_OUTPUT_RECORDS).getValue();
+
+            specInfo = spec.getTaskInfo(TaskType.MAP, i);
+            specInputRecords[i] = specInfo.getInputRecords();
+            specInputBytes[i] = specInfo.getInputBytes();
+            specOutputRecords[i] = specInfo.getOutputRecords();
+            specOutputBytes[i] = specInfo.getOutputBytes();
+            System.out.printf(type + " SPEC: %9d -> %9d :: %5d -> %5d\n",
+                 specInputBytes[i], specOutputBytes[i],
+                 specInputRecords[i], specOutputRecords[i]);
+            System.out.printf(type + " RUN:  %9d -> %9d :: %5d -> %5d\n",
+                 runInputBytes[i], runOutputBytes[i],
+                 runInputRecords[i], runOutputRecords[i]);
+            break;
+          case REDUCE:
+            runInputBytes[i] = 0;
+            runInputRecords[i] =
+              (int)counters.findCounter(REDUCE_INPUT_RECORDS).getValue();
+            runOutputBytes[i] =
+              counters.findCounter("FileSystemCounters",
+                  "HDFS_BYTES_WRITTEN").getValue();
+            runOutputRecords[i] =
+              (int)counters.findCounter(REDUCE_OUTPUT_RECORDS).getValue();
+
+
+            specInfo = spec.getTaskInfo(TaskType.REDUCE, i);
+            // There is no reliable counter for reduce input bytes. The
+            // variable-length encoding of intermediate records and other noise
+            // make this quantity difficult to estimate. The shuffle and spec
+            // input bytes are included in debug output for reference, but are
+            // not checked
+            specInputBytes[i] = 0;
+            specInputRecords[i] = specInfo.getInputRecords();
+            specOutputRecords[i] = specInfo.getOutputRecords();
+            specOutputBytes[i] = specInfo.getOutputBytes();
+            System.out.printf(type + " SPEC: (%9d) -> %9d :: %5d -> %5d\n",
+                 specInfo.getInputBytes(), specOutputBytes[i],
+                 specInputRecords[i], specOutputRecords[i]);
+            System.out.printf(type + " RUN:  (%9d) -> %9d :: %5d -> %5d\n",
+                 counters.findCounter(REDUCE_SHUFFLE_BYTES).getValue(),
+                 runOutputBytes[i], runInputRecords[i], runOutputRecords[i]);
+            break;
+          default:
+            specInfo = null;
+            fail("Unexpected type: " + type);
+        }
+      }
+
+      // Check input bytes
+      Arrays.sort(specInputBytes);
+      Arrays.sort(runInputBytes);
+      for (int i = 0; i < runTasks.length; ++i) {
+        assertTrue("Mismatched " + type + " input bytes " +
+            specInputBytes[i] + "/" + runInputBytes[i],
+            eqPlusMinus(runInputBytes[i], specInputBytes[i], extraInputBytes));
+      }
+
+      // Check input records
+      Arrays.sort(specInputRecords);
+      Arrays.sort(runInputRecords);
+      for (int i = 0; i < runTasks.length; ++i) {
+        assertTrue("Mismatched " + type + " input records " +
+            specInputRecords[i] + "/" + runInputRecords[i],
+            eqPlusMinus(runInputRecords[i], specInputRecords[i],
+              extraInputRecords));
+      }
+
+      // Check output bytes
+      Arrays.sort(specOutputBytes);
+      Arrays.sort(runOutputBytes);
+      for (int i = 0; i < runTasks.length; ++i) {
+        assertTrue("Mismatched " + type + " output bytes " +
+            specOutputBytes[i] + "/" + runOutputBytes[i],
+            eqPlusMinus(runOutputBytes[i], specOutputBytes[i],
+              extraOutputBytes));
+      }
+
+      // Check output records
+      Arrays.sort(specOutputRecords);
+      Arrays.sort(runOutputRecords);
+      for (int i = 0; i < runTasks.length; ++i) {
+        assertTrue("Mismatched " + type + " output records " +
+            specOutputRecords[i] + "/" + runOutputRecords[i],
+            eqPlusMinus(runOutputRecords[i], specOutputRecords[i],
+              extraOutputRecords));
+      }
+
+    }
+
+    private static boolean eqPlusMinus(long a, long b, long x) {
+      final long diff = Math.abs(a - b);
+      return diff <= x;
+    }
+
+    @Override
+    protected void onSuccess(Job job) {
+      retiredJobs.add(job);
+    }
+    @Override
+    protected void onFailure(Job job) {
+      fail("Job failure: " + job);
+    }
+  }
+
+  static class DebugGridmix extends Gridmix {
+
+    private DebugJobFactory factory;
+    private TestMonitor monitor;
+
+    public void checkMonitor() throws Exception {
+      monitor.verify(factory.getSubmitted());
+    }
+
+    @Override
+    protected JobMonitor createJobMonitor() {
+      monitor = new TestMonitor(NJOBS + 1); // include data generation job
+      return monitor;
+    }
+
+    @Override
+    protected JobFactory createJobFactory(JobSubmitter submitter,
+        String traceIn, Path scratchDir, Configuration conf,
+        CountDownLatch startFlag) throws IOException {
+      factory =
+        new DebugJobFactory(submitter, scratchDir, NJOBS, conf, startFlag);
+      return factory;
+    }
+  }
+
+  @Test
+  public void testSubmit() throws Exception {
+    final Path in = new Path("foo").makeQualified(dfs);
+    final Path out = new Path("/gridmix").makeQualified(dfs);
+    final String[] argv = {
+      "-D" + FilePool.GRIDMIX_MIN_FILE + "=0",
+      "-D" + Gridmix.GRIDMIX_OUT_DIR + "=" + out,
+      "-generate", String.valueOf(GENDATA) + "m",
+      in.toString(),
+      "-" // ignored by DebugGridmix
+    };
+    DebugGridmix client = new DebugGridmix();
+    final Configuration conf = mrCluster.createJobConf();
+    //conf.setInt(Gridmix.GRIDMIX_KEY_LEN, 2);
+    int res = ToolRunner.run(conf, client, argv);
+    assertEquals("Client exited with nonzero status", 0, res);
+    client.checkMonitor();
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRecordFactory.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRecordFactory.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRecordFactory.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,79 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.util.Random;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+
+public class TestRecordFactory {
+  private static final Log LOG = LogFactory.getLog(TestRecordFactory.class);
+
+  public static void testFactory(long targetBytes, long targetRecs)
+      throws Exception {
+    final Configuration conf = new Configuration();
+    final GridmixKey key = new GridmixKey();
+    final GridmixRecord val = new GridmixRecord();
+    LOG.info("Target bytes/records: " + targetBytes + "/" + targetRecs);
+    final RecordFactory f = new AvgRecordFactory(targetBytes, targetRecs, conf);
+    targetRecs = targetRecs <= 0 && targetBytes >= 0
+      ? Math.max(1,
+          targetBytes / conf.getInt("gridmix.missing.rec.size", 64 * 1024))
+      : targetRecs;
+
+    long records = 0L;
+    final DataOutputBuffer out = new DataOutputBuffer();
+    while (f.next(key, val)) {
+      ++records;
+      key.write(out);
+      val.write(out);
+    }
+    assertEquals(targetRecs, records);
+    assertEquals(targetBytes, out.getLength());
+  }
+
+  @Test
+  public void testRandom() throws Exception {
+    final Random r = new Random();
+    final long targetBytes = r.nextInt(1 << 20) + 3 * (1 << 14);
+    final long targetRecs = r.nextInt(1 << 14);
+    testFactory(targetBytes, targetRecs);
+  }
+
+  @Test
+  public void testAvg() throws Exception {
+    final Random r = new Random();
+    final long avgsize = r.nextInt(1 << 10) + 1;
+    final long targetRecs = r.nextInt(1 << 14);
+    testFactory(targetRecs * avgsize, targetRecs);
+  }
+
+  @Test
+  public void testZero() throws Exception {
+    final Random r = new Random();
+    final long targetBytes = r.nextInt(1 << 20);
+    testFactory(targetBytes, 0);
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1077079&r1=1077078&r2=1077079&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java Fri Mar  4 03:38:20 2011
@@ -63,7 +63,7 @@ abstract public class Task implements Wr
     LogFactory.getLog(Task.class);
 
   // Counters used by Task subclasses
-  protected static enum Counter { 
+  public static enum Counter { 
     MAP_INPUT_RECORDS, 
     MAP_OUTPUT_RECORDS,
     MAP_SKIPPED_RECORDS,

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java?rev=1077079&r1=1077078&r2=1077079&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java Fri Mar  4 03:38:20 2011
@@ -33,7 +33,7 @@ import org.apache.hadoop.util.StringUtil
  * not intended to be a comprehensive piece of data.
  *
  **************************************************/
-abstract class TaskStatus implements Writable, Cloneable {
+public abstract class TaskStatus implements Writable, Cloneable {
   static final Log LOG =
     LogFactory.getLog(TaskStatus.class.getName());
   

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/HistogramRawTestData.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/HistogramRawTestData.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/HistogramRawTestData.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/HistogramRawTestData.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayList;
+import java.util.List;
+
+class HistogramRawTestData {
+  List<Long> data = new ArrayList<Long>();
+
+  List<Integer> percentiles = new ArrayList<Integer>();
+
+  int scale;
+
+  public List<Integer> getPercentiles() {
+    return percentiles;
+  }
+
+  public void setPercentiles(List<Integer> percentiles) {
+    this.percentiles = percentiles;
+  }
+
+  public int getScale() {
+    return scale;
+  }
+
+  public void setScale(int scale) {
+    this.scale = scale;
+  }
+
+  public List<Long> getData() {
+    return data;
+  }
+
+  public void setData(List<Long> data) {
+    this.data = data;
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestHistograms.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestHistograms.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestHistograms.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestHistograms.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+import java.io.IOException;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.codehaus.jackson.JsonEncoding;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestHistograms {
+
+  /**
+   * @throws IOException
+   * 
+   *           There should be files in the directory named by
+   *           ${test.build.data}/rumen/histogram-test .
+   * 
+   *           There will be pairs of files, inputXxx.json and goldXxx.json .
+   * 
+   *           We read the input file as a HistogramRawTestData in json. Then we
+   *           create a Histogram using the data field, and then a
+   *           LoggedDiscreteCDF using the percentiles and scale field. Finally,
+   *           we read the corresponding goldXxx.json as a LoggedDiscreteCDF and
+   *           deepCompare them.
+   */
+  @Test
+  public void testHistograms() throws IOException {
+    final Configuration conf = new Configuration();
+    final FileSystem lfs = FileSystem.getLocal(conf);
+    final Path rootInputDir = new Path(
+        System.getProperty("test.tools.input.dir", "")).makeQualified(lfs);
+    final Path rootInputFile = new Path(rootInputDir, "rumen/histogram-tests");
+
+
+    FileStatus[] tests = lfs.listStatus(rootInputFile);
+
+    for (int i = 0; i < tests.length; ++i) {
+      Path filePath = tests[i].getPath();
+      String fileName = filePath.getName();
+      if (fileName.startsWith("input")) {
+        String testName = fileName.substring("input".length());
+        Path goldFilePath = new Path(rootInputFile, "gold"+testName);
+        assertTrue("Gold file dies not exist", lfs.exists(goldFilePath));
+        LoggedDiscreteCDF newResult = histogramFileToCDF(filePath, lfs);
+        System.out.println("Testing a Histogram for " + fileName);
+        FSDataInputStream goldStream = lfs.open(goldFilePath);
+        JsonObjectMapperParser<LoggedDiscreteCDF> parser = new JsonObjectMapperParser<LoggedDiscreteCDF>(
+            goldStream, LoggedDiscreteCDF.class); 
+        try {
+          LoggedDiscreteCDF dcdf = parser.getNext();
+          dcdf.deepCompare(newResult, new TreePath(null, "<root>"));
+        } catch (DeepInequalityException e) {
+          fail(e.path.toString());
+        }
+        finally {
+            parser.close();
+        }
+      }
+    }
+  }
+
+  private static LoggedDiscreteCDF histogramFileToCDF(Path path, FileSystem fs)
+      throws IOException {
+    FSDataInputStream dataStream = fs.open(path);
+    JsonObjectMapperParser<HistogramRawTestData> parser = new JsonObjectMapperParser<HistogramRawTestData>(
+        dataStream, HistogramRawTestData.class);
+    HistogramRawTestData data;
+    try {
+      data = parser.getNext();
+    } finally {
+      parser.close();
+    }
+    
+    Histogram hist = new Histogram();
+    List<Long> measurements = data.getData();
+    List<Long> typeProbeData = new HistogramRawTestData().getData();
+
+    assertTrue(
+        "The data attribute of a jackson-reconstructed HistogramRawTestData "
+            + " should be a " + typeProbeData.getClass().getName()
+            + ", like a virgin HistogramRawTestData, but it's a "
+            + measurements.getClass().getName(),
+        measurements.getClass() == typeProbeData.getClass());
+
+    for (int j = 0; j < measurements.size(); ++j) {
+      hist.enter(measurements.get(j));
+    }
+
+    LoggedDiscreteCDF result = new LoggedDiscreteCDF();
+    int[] percentiles = new int[data.getPercentiles().size()];
+
+    for (int j = 0; j < data.getPercentiles().size(); ++j) {
+      percentiles[j] = data.getPercentiles().get(j);
+    }
+
+    result.setCDF(hist, percentiles, data.getScale());
+    return result;
+  }
+  
+  public static void main(String[] args) throws IOException {
+    final Configuration conf = new Configuration();
+    final FileSystem lfs = FileSystem.getLocal(conf);
+
+    for (String arg : args) {
+      Path filePath = new Path(arg).makeQualified(lfs);
+      String fileName = filePath.getName();
+      if (fileName.startsWith("input")) {
+        LoggedDiscreteCDF newResult = histogramFileToCDF(filePath, lfs);
+        String testName = fileName.substring("input".length());
+        Path goldFilePath = new Path(filePath.getParent(), "gold"+testName);
+
+        ObjectMapper mapper = new ObjectMapper();
+        JsonFactory factory = mapper.getJsonFactory();
+        FSDataOutputStream ostream = lfs.create(goldFilePath, true);
+        JsonGenerator gen = factory.createJsonGenerator(ostream,
+            JsonEncoding.UTF8);
+        gen.useDefaultPrettyPrinter();
+        
+        gen.writeObject(newResult);
+        
+        gen.close();
+      } else {
+        System.err.println("Input file not started with \"input\". File "+fileName+" skipped.");
+      }
+    }
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestPiecewiseLinearInterpolation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestPiecewiseLinearInterpolation.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestPiecewiseLinearInterpolation.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestPiecewiseLinearInterpolation.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayList;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestPiecewiseLinearInterpolation {
+
+  static private double maximumRelativeError = 0.002D;
+
+  static private LoggedSingleRelativeRanking makeRR(double ranking, long datum) {
+    LoggedSingleRelativeRanking result = new LoggedSingleRelativeRanking();
+
+    result.setDatum(datum);
+    result.setRelativeRanking(ranking);
+
+    return result;
+  }
+
+  @Test
+  public void testOneRun() {
+    LoggedDiscreteCDF input = new LoggedDiscreteCDF();
+
+    input.setMinimum(100000L);
+    input.setMaximum(1100000L);
+
+    ArrayList<LoggedSingleRelativeRanking> rankings = new ArrayList<LoggedSingleRelativeRanking>();
+
+    rankings.add(makeRR(0.1, 200000L));
+    rankings.add(makeRR(0.5, 800000L));
+    rankings.add(makeRR(0.9, 1000000L));
+
+    input.setRankings(rankings);
+    input.setNumberValues(3);
+
+    CDFRandomGenerator gen = new CDFPiecewiseLinearRandomGenerator(input);
+    Histogram values = new Histogram();
+
+    for (int i = 0; i < 1000000; ++i) {
+      long value = gen.randomValue();
+      values.enter(value);
+    }
+
+    /*
+     * Now we build a percentiles CDF, and compute the sum of the squares of the
+     * actual percentiles vrs. the predicted percentiles
+     */
+    int[] percentiles = new int[99];
+
+    for (int i = 0; i < 99; ++i) {
+      percentiles[i] = i + 1;
+    }
+
+    long[] result = values.getCDF(100, percentiles);
+    long sumErrorSquares = 0L;
+
+    for (int i = 0; i < 10; ++i) {
+      long error = result[i] - (10000L * i + 100000L);
+      System.out.println("element " + i + ", got " + result[i] + ", expected "
+          + (10000L * i + 100000L) + ", error = " + error);
+      sumErrorSquares += error * error;
+    }
+
+    for (int i = 10; i < 50; ++i) {
+      long error = result[i] - (15000L * i + 50000L);
+      System.out.println("element " + i + ", got " + result[i] + ", expected "
+          + (15000L * i + 50000L) + ", error = " + error);
+      sumErrorSquares += error * error;
+    }
+
+    for (int i = 50; i < 90; ++i) {
+      long error = result[i] - (5000L * i + 550000L);
+      System.out.println("element " + i + ", got " + result[i] + ", expected "
+          + (5000L * i + 550000L) + ", error = " + error);
+      sumErrorSquares += error * error;
+    }
+
+    for (int i = 90; i <= 100; ++i) {
+      long error = result[i] - (10000L * i + 100000L);
+      System.out.println("element " + i + ", got " + result[i] + ", expected "
+          + (10000L * i + 100000L) + ", error = " + error);
+      sumErrorSquares += error * error;
+    }
+
+    // normalize the error
+    double realSumErrorSquares = (double) sumErrorSquares;
+
+    double normalizedError = realSumErrorSquares / 100
+        / rankings.get(1).getDatum() / rankings.get(1).getDatum();
+    double RMSNormalizedError = Math.sqrt(normalizedError);
+
+    System.out.println("sumErrorSquares = " + sumErrorSquares);
+
+    System.out.println("normalizedError: " + normalizedError
+        + ", RMSNormalizedError: " + RMSNormalizedError);
+
+    System.out.println("Cumulative error is " + RMSNormalizedError);
+
+    assertTrue("The RMS relative error per bucket, " + RMSNormalizedError
+        + ", exceeds our tolerance of " + maximumRelativeError,
+        RMSNormalizedError <= maximumRelativeError);
+
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestRumenJobTraces {
+  @Test
+  public void testSmallTrace() throws Exception {
+    performSingleTest("sample-job-tracker-logs.gz",
+        "job-tracker-logs-topology-output", "job-tracker-logs-trace-output.gz");
+  }
+
+  @Test
+  public void testTruncatedTask() throws Exception {
+    performSingleTest("truncated-job-tracker-log", "truncated-topology-output",
+        "truncated-trace-output");
+  }
+
+  private void performSingleTest(String jtLogName, String goldTopology,
+      String goldTrace) throws Exception {
+    final Configuration conf = new Configuration();
+    final FileSystem lfs = FileSystem.getLocal(conf);
+
+    final Path rootInputDir =
+        new Path(System.getProperty("test.tools.input.dir", ""))
+            .makeQualified(lfs);
+    final Path rootTempDir =
+        new Path(System.getProperty("test.build.data", "/tmp"))
+            .makeQualified(lfs);
+
+    final Path rootInputFile = new Path(rootInputDir, "rumen/small-trace-test");
+    final Path tempDir = new Path(rootTempDir, "TestRumenJobTraces");
+    lfs.delete(tempDir, true);
+
+    final Path topologyFile = new Path(tempDir, jtLogName + "-topology.json");
+    final Path traceFile = new Path(tempDir, jtLogName + "-trace.json");
+
+    final Path inputFile = new Path(rootInputFile, jtLogName);
+
+    System.out.println("topology result file = " + topologyFile);
+    System.out.println("trace result file = " + traceFile);
+
+    String[] args = new String[6];
+
+    args[0] = "-v1";
+
+    args[1] = "-write-topology";
+    args[2] = topologyFile.toString();
+
+    args[3] = "-write-job-trace";
+    args[4] = traceFile.toString();
+
+    args[5] = inputFile.toString();
+
+    final Path topologyGoldFile = new Path(rootInputFile, goldTopology);
+    final Path traceGoldFile = new Path(rootInputFile, goldTrace);
+
+    HadoopLogsAnalyzer analyzer = new HadoopLogsAnalyzer();
+    int result = ToolRunner.run(analyzer, args);
+    assertEquals("Non-zero exit", 0, result);
+
+    TestRumenJobTraces
+        .<LoggedNetworkTopology> jsonFileMatchesGold(lfs, topologyFile,
+            topologyGoldFile, LoggedNetworkTopology.class, "topology");
+    TestRumenJobTraces.<LoggedJob> jsonFileMatchesGold(lfs, traceFile,
+        traceGoldFile, LoggedJob.class, "trace");
+  }
+
+  static private <T extends DeepCompare> void jsonFileMatchesGold(
+      FileSystem lfs, Path result, Path gold, Class<? extends T> clazz,
+      String fileDescription) throws IOException {
+    JsonObjectMapperParser<T> goldParser =
+        new JsonObjectMapperParser<T>(gold, clazz, new Configuration());
+    InputStream resultStream = lfs.open(result);
+    JsonObjectMapperParser<T> resultParser =
+        new JsonObjectMapperParser<T>(resultStream, clazz);
+    try {
+      while (true) {
+        DeepCompare goldJob = goldParser.getNext();
+        DeepCompare resultJob = resultParser.getNext();
+        if ((goldJob == null) || (resultJob == null)) {
+          assertTrue(goldJob == resultJob);
+          break;
+        }
+
+        try {
+          resultJob.deepCompare(goldJob, new TreePath(null, "<root>"));
+        } catch (DeepInequalityException e) {
+          String error = e.path.toString();
+
+          assertFalse(fileDescription + " mismatches: " + error, true);
+        }
+      }
+    } finally {
+      IOUtils.cleanup(null, goldParser, resultParser);
+    }
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestZombieJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestZombieJob.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestZombieJob.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestZombieJob.java Fri Mar  4 03:38:20 2011
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskStatus.State;
+import org.apache.hadoop.mapreduce.TaskType;
+
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestZombieJob {
+  final double epsilon = 0.01;
+  private final int[] attemptTimesPercentiles = new int[] { 10, 50, 90 };
+  private long[] succeededCDF = new long[] { 5268, 5268, 5268, 5268, 5268 };
+  private long[] failedCDF = new long[] { 18592, 18592, 18592, 18592, 18592 };
+  private double[] expectedPs = new double[] { 0.000001, 0.18707660239708182,
+      0.0013027618551328818, 2.605523710265763E-4 };
+
+  private final long[] mapTaskCounts = new long[] { 7838525L, 342277L, 100228L,
+      1564L, 1234L };
+  private final long[] reduceTaskCounts = new long[] { 4405338L, 139391L,
+      1514383L, 139391, 1234L };
+
+  List<LoggedJob> loggedJobs = new ArrayList<LoggedJob>();
+  List<JobStory> jobStories = new ArrayList<JobStory>();
+
+  @Before
+  public void setUp() throws Exception {
+    final Configuration conf = new Configuration();
+    final FileSystem lfs = FileSystem.getLocal(conf);
+
+    final Path rootInputDir = new Path(
+        System.getProperty("test.tools.input.dir", "")).makeQualified(lfs);
+    final Path rootInputFile = new Path(rootInputDir, "rumen/zombie");
+
+    ZombieJobProducer parser = new ZombieJobProducer(new Path(rootInputFile,
+        "input-trace.json"), new ZombieCluster(new Path(rootInputFile,
+        "input-topology.json"), null, conf), conf);
+
+    JobStory job = null;
+    for (int i = 0; i < 4; i++) {
+      job = parser.getNextJob();
+      ZombieJob zJob = (ZombieJob) job;
+      LoggedJob loggedJob = zJob.getLoggedJob();
+      System.out.println(i + ":" + job.getNumberMaps() + "m, "
+          + job.getNumberReduces() + "r");
+      System.out
+          .println(loggedJob.getOutcome() + ", " + loggedJob.getJobtype());
+
+      System.out.println("Input Splits -- " + job.getInputSplits().length
+          + ", " + job.getNumberMaps());
+
+      System.out.println("Successful Map CDF -------");
+      for (LoggedDiscreteCDF cdf : loggedJob.getSuccessfulMapAttemptCDFs()) {
+        System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum()
+            + "--" + cdf.getMaximum());
+        for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) {
+          System.out.println("   " + ranking.getRelativeRanking() + ":"
+              + ranking.getDatum());
+        }
+      }
+      System.out.println("Failed Map CDF -----------");
+      for (LoggedDiscreteCDF cdf : loggedJob.getFailedMapAttemptCDFs()) {
+        System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum()
+            + "--" + cdf.getMaximum());
+        for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) {
+          System.out.println("   " + ranking.getRelativeRanking() + ":"
+              + ranking.getDatum());
+        }
+      }
+      System.out.println("Successful Reduce CDF ----");
+      LoggedDiscreteCDF cdf = loggedJob.getSuccessfulReduceAttemptCDF();
+      System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum() + "--"
+          + cdf.getMaximum());
+      for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) {
+        System.out.println("   " + ranking.getRelativeRanking() + ":"
+            + ranking.getDatum());
+      }
+      System.out.println("Failed Reduce CDF --------");
+      cdf = loggedJob.getFailedReduceAttemptCDF();
+      System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum() + "--"
+          + cdf.getMaximum());
+      for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) {
+        System.out.println("   " + ranking.getRelativeRanking() + ":"
+            + ranking.getDatum());
+      }
+      System.out.print("map attempts to success -- ");
+      for (double p : loggedJob.getMapperTriesToSucceed()) {
+        System.out.print(p + ", ");
+      }
+      System.out.println();
+      System.out.println("===============");
+
+      loggedJobs.add(loggedJob);
+      jobStories.add(job);
+    }
+  }
+
+  @Test
+  public void testFirstJob() {
+    // 20th job seems reasonable: "totalMaps":329,"totalReduces":101
+    // successful map: 80 node-local, 196 rack-local, 53 rack-remote, 2 unknown
+    // failed map: 0-0-0-1
+    // successful reduce: 99 failed reduce: 13
+    // map attempts to success -- 0.9969879518072289, 0.0030120481927710845,
+    JobStory job = jobStories.get(0);
+    assertEquals(1, job.getNumberMaps());
+    assertEquals(1, job.getNumberReduces());
+
+    // get splits
+
+    TaskAttemptInfo taInfo = null;
+    long expectedRuntime = 2423;
+    // get a succeeded map task attempt, expect the exact same task attempt
+    taInfo = job.getMapTaskAttemptInfoAdjusted(14, 0, 1);
+    assertEquals(expectedRuntime, taInfo.getRuntime());
+    assertEquals(State.SUCCEEDED, taInfo.getRunState());
+
+    // get a succeeded map attempt, but reschedule with different locality.
+    taInfo = job.getMapTaskAttemptInfoAdjusted(14, 0, 2);
+    assertEquals(State.SUCCEEDED, taInfo.getRunState());
+    taInfo = job.getMapTaskAttemptInfoAdjusted(14, 0, 0);
+    assertEquals(State.SUCCEEDED, taInfo.getRunState());
+
+    expectedRuntime = 97502;
+    // get a succeeded reduce task attempt, expect the exact same task attempt
+    taInfo = job.getTaskAttemptInfo(TaskType.REDUCE, 14, 0);
+    assertEquals(State.SUCCEEDED, taInfo.getRunState());
+
+    // get a failed reduce task attempt, expect the exact same task attempt
+    taInfo = job.getTaskAttemptInfo(TaskType.REDUCE, 14, 0);
+    assertEquals(State.SUCCEEDED, taInfo.getRunState());
+
+    // get a non-exist reduce task attempt, expect a made-up task attempt
+    // TODO fill in test case
+  }
+
+  @Test
+  public void testSecondJob() {
+    // 7th job has many failed tasks.
+    // 3204 m, 0 r
+    // successful maps 497-586-23-1, failed maps 0-0-0-2714
+    // map attempts to success -- 0.8113600833767587, 0.18707660239708182,
+    // 0.0013027618551328818, 2.605523710265763E-4,
+    JobStory job = jobStories.get(1);
+    assertEquals(20, job.getNumberMaps());
+    assertEquals(1, job.getNumberReduces());
+
+    TaskAttemptInfo taInfo = null;
+    // get a succeeded map task attempt
+    taInfo = job.getMapTaskAttemptInfoAdjusted(17, 1, 1);
+    assertEquals(State.SUCCEEDED, taInfo.getRunState());
+
+    // get a succeeded map task attempt, with different locality
+    taInfo = job.getMapTaskAttemptInfoAdjusted(17, 1, 2);
+    assertEquals(State.SUCCEEDED, taInfo.getRunState());
+    taInfo = job.getMapTaskAttemptInfoAdjusted(17, 1, 0);
+    assertEquals(State.SUCCEEDED, taInfo.getRunState());
+
+    // get a failed map task attempt
+    taInfo = job.getMapTaskAttemptInfoAdjusted(14, 0, 1);
+    assertEquals(1927, taInfo.getRuntime());
+    assertEquals(State.SUCCEEDED, taInfo.getRunState());
+
+    // get a failed map task attempt, with different locality
+    // TODO: this test does not make sense here, because I don't have
+    // available data set.
+  }
+
+  @Test
+  public void testFourthJob() {
+    // 7th job has many failed tasks.
+    // 3204 m, 0 r
+    // successful maps 497-586-23-1, failed maps 0-0-0-2714
+    // map attempts to success -- 0.8113600833767587, 0.18707660239708182,
+    // 0.0013027618551328818, 2.605523710265763E-4,
+    JobStory job = jobStories.get(3);
+    assertEquals(131, job.getNumberMaps());
+    assertEquals(47, job.getNumberReduces());
+
+    TaskAttemptInfo taInfo = null;
+    // get a succeeded map task attempt
+    long runtime = 5268;
+    taInfo = job.getMapTaskAttemptInfoAdjusted(113, 1, 1);
+    assertEquals(State.SUCCEEDED, taInfo.getRunState());
+    assertEquals(runtime, taInfo.getRuntime());
+
+    // get a succeeded map task attempt, with different locality
+    taInfo = job.getMapTaskAttemptInfoAdjusted(113, 1, 2);
+    assertEquals(State.SUCCEEDED, taInfo.getRunState());
+    assertEquals(runtime, taInfo.getRuntime() / 2);
+    taInfo = job.getMapTaskAttemptInfoAdjusted(113, 1, 0);
+    assertEquals(State.SUCCEEDED, taInfo.getRunState());
+    assertEquals((long) (runtime / 1.5), taInfo.getRuntime());
+
+    // get a failed map task attempt
+    taInfo = job.getMapTaskAttemptInfoAdjusted(113, 0, 1);
+    assertEquals(18592, taInfo.getRuntime());
+    assertEquals(State.FAILED, taInfo.getRunState());
+  }
+
+  @Test
+  public void testRecordIOInfo() {
+    JobStory job = jobStories.get(3);
+
+    TaskInfo mapTask = job.getTaskInfo(TaskType.MAP, 113);
+
+    TaskInfo reduceTask = job.getTaskInfo(TaskType.REDUCE, 0);
+
+    assertEquals(mapTaskCounts[0], mapTask.getInputBytes());
+    assertEquals(mapTaskCounts[1], mapTask.getInputRecords());
+    assertEquals(mapTaskCounts[2], mapTask.getOutputBytes());
+    assertEquals(mapTaskCounts[3], mapTask.getOutputRecords());
+    assertEquals(mapTaskCounts[4], mapTask.getTaskMemory());
+
+    assertEquals(reduceTaskCounts[0], reduceTask.getInputBytes());
+    assertEquals(reduceTaskCounts[1], reduceTask.getInputRecords());
+    assertEquals(reduceTaskCounts[2], reduceTask.getOutputBytes());
+    assertEquals(reduceTaskCounts[3], reduceTask.getOutputRecords());
+    assertEquals(reduceTaskCounts[4], reduceTask.getTaskMemory());
+  }
+
+  @Test
+  public void testMakeUpInfo() {
+    // get many non-exist tasks
+    // total 3204 map tasks, 3300 is a non-exist task.
+    checkMakeUpTask(jobStories.get(3), 113, 1);
+  }
+
+  private void checkMakeUpTask(JobStory job, int taskNumber, int locality) {
+    TaskAttemptInfo taInfo = null;
+
+    Histogram sampleSucceeded = new Histogram();
+    Histogram sampleFailed = new Histogram();
+    List<Integer> sampleAttempts = new ArrayList<Integer>();
+    for (int i = 0; i < 100000; i++) {
+      int attemptId = 0;
+      while (true) {
+        taInfo = job.getMapTaskAttemptInfoAdjusted(taskNumber, attemptId, 1);
+        if (taInfo.getRunState() == State.SUCCEEDED) {
+          sampleSucceeded.enter(taInfo.getRuntime());
+          break;
+        }
+        sampleFailed.enter(taInfo.getRuntime());
+        attemptId++;
+      }
+      sampleAttempts.add(attemptId);
+    }
+
+    // check state distribution
+    int[] countTries = new int[] { 0, 0, 0, 0 };
+    for (int attempts : sampleAttempts) {
+      assertTrue(attempts < 4);
+      countTries[attempts]++;
+    }
+    /*
+     * System.out.print("Generated map attempts to success -- "); for (int
+     * count: countTries) { System.out.print((double)count/sampleAttempts.size()
+     * + ", "); } System.out.println(); System.out.println("===============");
+     */
+    for (int i = 0; i < 4; i++) {
+      int count = countTries[i];
+      double p = (double) count / sampleAttempts.size();
+      assertTrue(expectedPs[i] - p < epsilon);
+    }
+
+    // check succeeded attempts runtime distribution
+    long[] expectedCDF = succeededCDF;
+    LoggedDiscreteCDF cdf = new LoggedDiscreteCDF();
+    cdf.setCDF(sampleSucceeded, attemptTimesPercentiles, 100);
+    /*
+     * System.out.println("generated succeeded map runtime distribution");
+     * System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum() + "--"
+     * + cdf.getMaximum()); for (LoggedSingleRelativeRanking ranking:
+     * cdf.getRankings()) { System.out.println("   " +
+     * ranking.getRelativeRanking() + ":" + ranking.getDatum()); }
+     */
+    assertRuntimeEqual(cdf.getMinimum(), expectedCDF[0]);
+    assertRuntimeEqual(cdf.getMaximum(), expectedCDF[4]);
+    for (int i = 0; i < 3; i++) {
+      LoggedSingleRelativeRanking ranking = cdf.getRankings().get(i);
+      assertRuntimeEqual(expectedCDF[i + 1], ranking.getDatum());
+    }
+
+    // check failed attempts runtime distribution
+    expectedCDF = failedCDF;
+    cdf = new LoggedDiscreteCDF();
+    cdf.setCDF(sampleFailed, attemptTimesPercentiles, 100);
+
+    System.out.println("generated failed map runtime distribution");
+    System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum() + "--"
+        + cdf.getMaximum());
+    for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) {
+      System.out.println("   " + ranking.getRelativeRanking() + ":"
+          + ranking.getDatum());
+    }
+    assertRuntimeEqual(cdf.getMinimum(), expectedCDF[0]);
+    assertRuntimeEqual(cdf.getMaximum(), expectedCDF[4]);
+    for (int i = 0; i < 3; i++) {
+      LoggedSingleRelativeRanking ranking = cdf.getRankings().get(i);
+      assertRuntimeEqual(expectedCDF[i + 1], ranking.getDatum());
+    }
+  }
+
+  private void assertRuntimeEqual(long expected, long generated) {
+    if (expected == 0) {
+      assertTrue(generated > -1000 && generated < 1000);
+    } else {
+      long epsilon = Math.max(expected / 10, 5000);
+      assertTrue(expected - generated > -epsilon);
+      assertTrue(expected - generated < epsilon);
+    }
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-minimal.json
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-minimal.json?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-minimal.json (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-minimal.json Fri Mar  4 03:38:20 2011
@@ -0,0 +1,15 @@
+{
+  "minimum" : 12345,
+  "rankings" : [ {
+    "relativeRanking" : 0.25,
+    "datum" : 12345
+  }, {
+    "relativeRanking" : 0.5,
+    "datum" : 2345678901
+  }, {
+    "relativeRanking" : 0.75,
+    "datum" : 2345678902
+  } ],
+  "maximum" : 23456789012,
+  "numberValues" : 5
+}
\ No newline at end of file

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-one-value-many-repeats.json
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-one-value-many-repeats.json?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-one-value-many-repeats.json (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-one-value-many-repeats.json Fri Mar  4 03:38:20 2011
@@ -0,0 +1,15 @@
+{
+  "minimum" : 23456789012,
+  "rankings" : [ {
+    "relativeRanking" : 0.25,
+    "datum" : 23456789012
+  }, {
+    "relativeRanking" : 0.5,
+    "datum" : 23456789012
+  }, {
+    "relativeRanking" : 0.75,
+    "datum" : 23456789012
+  } ],
+  "maximum" : 23456789012,
+  "numberValues" : 64
+}
\ No newline at end of file

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-only-one-value.json
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-only-one-value.json?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-only-one-value.json (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-only-one-value.json Fri Mar  4 03:38:20 2011
@@ -0,0 +1,15 @@
+{
+  "minimum" : 23456789012,
+  "rankings" : [ {
+    "relativeRanking" : 0.25,
+    "datum" : 23456789012
+  }, {
+    "relativeRanking" : 0.5,
+    "datum" : 23456789012
+  }, {
+    "relativeRanking" : 0.75,
+    "datum" : 23456789012
+  } ],
+  "maximum" : 23456789012,
+  "numberValues" : 1
+}
\ No newline at end of file