You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by am...@apache.org on 2011/10/18 16:45:51 UTC

svn commit: r1185694 [4/7] - in /hadoop/common/branches/branch-0.20-security: ./ src/contrib/ src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/ src/contrib/gridmix/sr...

Added: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java?rev=1185694&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java Tue Oct 18 14:45:48 2011
@@ -0,0 +1,563 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Utils;
+import org.apache.hadoop.mapred.gridmix.CompressionEmulationUtil.RandomTextDataMapper;
+import org.apache.hadoop.mapred.gridmix.GenerateData.GenSplit;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+/**
+ * Test {@link CompressionEmulationUtil}
+ */
+public class TestCompressionEmulationUtils {
+  //TODO Remove this once LocalJobRunner can run Gridmix.
+  static class CustomInputFormat extends GenerateData.GenDataFormat {
+    @Override
+    public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
+      // get the total data to be generated
+      long toGen =
+        jobCtxt.getConfiguration().getLong(GenerateData.GRIDMIX_GEN_BYTES, -1);
+      if (toGen < 0) {
+        throw new IOException("Invalid/missing generation bytes: " + toGen);
+      }
+      // get the total number of mappers configured
+      int totalMappersConfigured =
+        jobCtxt.getConfiguration().getInt("mapred.map.tasks", -1);
+      if (totalMappersConfigured < 0) {
+        throw new IOException("Invalid/missing num mappers: " 
+                              + totalMappersConfigured);
+      }
+      
+      final long bytesPerTracker = toGen / totalMappersConfigured;
+      final ArrayList<InputSplit> splits = 
+        new ArrayList<InputSplit>(totalMappersConfigured);
+      for (int i = 0; i < totalMappersConfigured; ++i) {
+        splits.add(new GenSplit(bytesPerTracker, 
+                   new String[] { "tracker_local" }));
+      }
+      return splits;
+    }
+  }
+  
+  /**
+   * Test {@link RandomTextDataMapper} via {@link CompressionEmulationUtil}.
+   */
+  @Test
+  public void testRandomCompressedTextDataGenerator() throws Exception {
+    int wordSize = 10;
+    int listSize = 20;
+    long dataSize = 10*1024*1024;
+    
+    Configuration conf = new Configuration();
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+    CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
+    
+    // configure the RandomTextDataGenerator to generate desired sized data
+    conf.setInt(RandomTextDataGenerator.GRIDMIX_DATAGEN_RANDOMTEXT_LISTSIZE, 
+                listSize);
+    conf.setInt(RandomTextDataGenerator.GRIDMIX_DATAGEN_RANDOMTEXT_WORDSIZE, 
+                wordSize);
+    conf.setLong(GenerateData.GRIDMIX_GEN_BYTES, dataSize);
+    
+    FileSystem lfs = FileSystem.getLocal(conf);
+    
+    // define the test's root temp directory
+    Path rootTempDir =
+        new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+            lfs.getUri(), lfs.getWorkingDirectory());
+
+    Path tempDir = new Path(rootTempDir, "TestRandomCompressedTextDataGenr");
+    lfs.delete(tempDir, true);
+    
+    runDataGenJob(conf, tempDir);
+    
+    // validate the output data
+    FileStatus[] files = 
+      lfs.listStatus(tempDir, new Utils.OutputFileUtils.OutputFilesFilter());
+    long size = 0;
+    long maxLineSize = 0;
+    
+    for (FileStatus status : files) {
+      InputStream in = 
+        CompressionEmulationUtil
+          .getPossiblyDecompressedInputStream(status.getPath(), conf, 0);
+      BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+      String line = reader.readLine();
+      if (line != null) {
+        long lineSize = line.getBytes().length;
+        if (lineSize > maxLineSize) {
+          maxLineSize = lineSize;
+        }
+        while (line != null) {
+          for (String word : line.split("\\s")) {
+            size += word.getBytes().length;
+          }
+          line = reader.readLine();
+        }
+      }
+      reader.close();
+    }
+
+    assertTrue(size >= dataSize);
+    assertTrue(size <= dataSize + maxLineSize);
+  }
+  
+  /**
+   * Runs a GridMix data-generation job.
+   */
+  private static void runDataGenJob(Configuration conf, Path tempDir) 
+  throws IOException, ClassNotFoundException, InterruptedException {
+    JobConf jobConf = new JobConf(conf);
+    JobClient client = new JobClient(jobConf);
+    
+    // get the local job runner
+    jobConf.setInt("mapred.map.tasks", 1);
+    
+    Job job = new Job(jobConf);
+
+    CompressionEmulationUtil.configure(job);
+    job.setInputFormatClass(CustomInputFormat.class);
+    
+    // set the output path
+    FileOutputFormat.setOutputPath(job, tempDir);
+    
+    // submit and wait for completion
+    job.submit();
+    int ret = job.waitForCompletion(true) ? 0 : 1;
+
+    assertEquals("Job Failed", 0, ret);
+  }
+  
+  /**
+   * Test if {@link RandomTextDataGenerator} can generate random text data 
+   * with the desired compression ratio. This involves
+   *   - using {@link CompressionEmulationUtil} to configure the MR job for 
+   *     generating the random text data with the desired compression ratio
+   *   - running the MR job
+   *   - test {@link RandomTextDataGenerator}'s output and match the output size
+   *     (compressed) with the expected compression ratio.
+   */
+  private void testCompressionRatioConfigure(float ratio)
+  throws Exception {
+    long dataSize = 10*1024*1024;
+    
+    Configuration conf = new Configuration();
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+    CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
+    
+    conf.setLong(GenerateData.GRIDMIX_GEN_BYTES, dataSize);
+    
+    float expectedRatio = CompressionEmulationUtil.DEFAULT_COMPRESSION_RATIO;
+    if (ratio > 0) {
+      // set the compression ratio in the conf
+      CompressionEmulationUtil.setMapInputCompressionEmulationRatio(conf, ratio);
+      expectedRatio = 
+        CompressionEmulationUtil.standardizeCompressionRatio(ratio);
+    }
+    
+    // invoke the utility to map from ratio to word-size
+    CompressionEmulationUtil.setupDataGeneratorConfig(conf);
+    
+    FileSystem lfs = FileSystem.getLocal(conf);
+    
+    // define the test's root temp directory
+    Path rootTempDir =
+        new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+            lfs.getUri(), lfs.getWorkingDirectory());
+
+    Path tempDir = 
+      new Path(rootTempDir, "TestCustomRandomCompressedTextDataGenr");
+    lfs.delete(tempDir, true);
+    
+    runDataGenJob(conf, tempDir);
+    
+    // validate the output data
+    FileStatus[] files = 
+      lfs.listStatus(tempDir, new Utils.OutputFileUtils.OutputFilesFilter());
+    long size = 0;
+    
+    for (FileStatus status : files) {
+      size += status.getLen();
+    }
+
+    float compressionRatio = ((float)size)/dataSize;
+    float stdRatio = 
+      CompressionEmulationUtil.standardizeCompressionRatio(compressionRatio);
+    
+    assertEquals(expectedRatio, stdRatio, 0.0D);
+  }
+  
+  /**
+   * Test compression ratio with multiple compression ratios.
+   */
+  @Test
+  public void testCompressionRatios() throws Exception {
+    // test default compression ratio i.e 0.5
+    testCompressionRatioConfigure(0F);
+    // test for a sample compression ratio of 0.2
+    testCompressionRatioConfigure(0.2F);
+    // test for a sample compression ratio of 0.4
+    testCompressionRatioConfigure(0.4F);
+    // test for a sample compression ratio of 0.65
+    testCompressionRatioConfigure(0.65F);
+    // test for a compression ratio of 0.682 which should be standardized
+    // to round(0.682) i.e 0.68
+    testCompressionRatioConfigure(0.682F);
+    // test for a compression ratio of 0.567 which should be standardized
+    // to round(0.567) i.e 0.57
+    testCompressionRatioConfigure(0.567F);
+    
+    // test with a compression ratio of 0.01 which less than the min supported
+    // value of 0.07
+    boolean failed = false;
+    try {
+      testCompressionRatioConfigure(0.01F);
+    } catch (RuntimeException re) {
+      failed = true;
+    }
+    assertTrue("Compression ratio min value (0.07) check failed!", failed);
+    
+    // test with a compression ratio of 0.01 which less than the max supported
+    // value of 0.68
+    failed = false;
+    try {
+      testCompressionRatioConfigure(0.7F);
+    } catch (RuntimeException re) {
+      failed = true;
+    }
+    assertTrue("Compression ratio max value (0.68) check failed!", failed);
+  }
+  
+  /**
+   * Test compression ratio standardization.
+   */
+  @Test
+  public void testCompressionRatioStandardization() throws Exception {
+    assertEquals(0.55F, 
+        CompressionEmulationUtil.standardizeCompressionRatio(0.55F), 0.0D);
+    assertEquals(0.65F, 
+        CompressionEmulationUtil.standardizeCompressionRatio(0.652F), 0.0D);
+    assertEquals(0.78F, 
+        CompressionEmulationUtil.standardizeCompressionRatio(0.777F), 0.0D);
+    assertEquals(0.86F, 
+        CompressionEmulationUtil.standardizeCompressionRatio(0.855F), 0.0D);
+  }
+  
+  /**
+   * Test map input compression ratio configuration utilities.
+   */
+  @Test
+  public void testInputCompressionRatioConfiguration() throws Exception {
+    Configuration conf = new Configuration();
+    float ratio = 0.567F;
+    CompressionEmulationUtil.setMapInputCompressionEmulationRatio(conf, ratio);
+    assertEquals(ratio, 
+        CompressionEmulationUtil.getMapInputCompressionEmulationRatio(conf), 
+        0.0D);
+  }
+  
+  /**
+   * Test map output compression ratio configuration utilities.
+   */
+  @Test
+  public void testIntermediateCompressionRatioConfiguration() 
+  throws Exception {
+    Configuration conf = new Configuration();
+    float ratio = 0.567F;
+    CompressionEmulationUtil.setMapOutputCompressionEmulationRatio(conf, ratio);
+    assertEquals(ratio, 
+        CompressionEmulationUtil.getMapOutputCompressionEmulationRatio(conf), 
+        0.0D);
+  }
+  
+  /**
+   * Test reduce output compression ratio configuration utilities.
+   */
+  @Test
+  public void testOutputCompressionRatioConfiguration() throws Exception {
+    Configuration conf = new Configuration();
+    float ratio = 0.567F;
+    CompressionEmulationUtil.setReduceOutputCompressionEmulationRatio(conf, 
+                                                                      ratio);
+    assertEquals(ratio, 
+        CompressionEmulationUtil.getReduceOutputCompressionEmulationRatio(conf),
+        0.0D);
+  }
+  
+  /**
+   * Test compressible {@link GridmixRecord}.
+   */
+  @Test
+  public void testCompressibleGridmixRecord() throws IOException {
+    JobConf conf = new JobConf();
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+    CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
+    
+    FileSystem lfs = FileSystem.getLocal(conf);
+    int dataSize = 1024 * 1024 * 10; // 10 MB
+    float ratio = 0.357F;
+    
+    // define the test's root temp directory
+    Path rootTempDir =
+        new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+            lfs.getUri(), lfs.getWorkingDirectory());
+
+    Path tempDir = new Path(rootTempDir, 
+                            "TestPossiblyCompressibleGridmixRecord");
+    lfs.delete(tempDir, true);
+    
+    // define a compressible GridmixRecord
+    GridmixRecord record = new GridmixRecord(dataSize, 0);
+    record.setCompressibility(true, ratio); // enable compression
+    
+    conf.setClass("mapred.output.compression.codec", GzipCodec.class, 
+                  CompressionCodec.class);
+    org.apache.hadoop.mapred.FileOutputFormat.setCompressOutput(conf, true);
+    
+    // write the record to a file
+    Path recordFile = new Path(tempDir, "record");
+    OutputStream outStream = CompressionEmulationUtil
+                               .getPossiblyCompressedOutputStream(recordFile, 
+                                                                  conf);    
+    DataOutputStream out = new DataOutputStream(outStream);
+    record.write(out);
+    out.close();
+    outStream.close();
+    
+    // open the compressed stream for reading
+    Path actualRecordFile = recordFile.suffix(".gz");
+    InputStream in = 
+      CompressionEmulationUtil
+        .getPossiblyDecompressedInputStream(actualRecordFile, conf, 0);
+    
+    // get the compressed file size
+    long compressedFileSize = lfs.listStatus(actualRecordFile)[0].getLen();
+    
+    GridmixRecord recordRead = new GridmixRecord();
+    recordRead.readFields(new DataInputStream(in));
+    
+    assertEquals("Record size mismatch in a compressible GridmixRecord",
+                 dataSize, recordRead.getSize());
+    assertTrue("Failed to generate a compressible GridmixRecord",
+               recordRead.getSize() > compressedFileSize);
+    
+    // check if the record can generate data with the desired compression ratio
+    float seenRatio = ((float)compressedFileSize)/dataSize;
+    assertEquals(CompressionEmulationUtil.standardizeCompressionRatio(ratio), 
+        CompressionEmulationUtil.standardizeCompressionRatio(seenRatio), 1.0D);
+  }
+  
+  /**
+   * Test 
+   * {@link CompressionEmulationUtil#isCompressionEmulationEnabled(
+   *          org.apache.hadoop.conf.Configuration)}.
+   */
+  @Test
+  public void testIsCompressionEmulationEnabled() {
+    Configuration conf = new Configuration();
+    // Check default values
+    assertTrue(CompressionEmulationUtil.isCompressionEmulationEnabled(conf));
+    
+    // Check disabled
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, false);
+    assertFalse(CompressionEmulationUtil.isCompressionEmulationEnabled(conf));
+    
+    // Check enabled
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+    assertTrue(CompressionEmulationUtil.isCompressionEmulationEnabled(conf));
+  }
+  
+  /**
+   * Test 
+   * {@link CompressionEmulationUtil#getPossiblyDecompressedInputStream(Path, 
+   *                                   Configuration, long)}
+   *  and
+   *  {@link CompressionEmulationUtil#getPossiblyCompressedOutputStream(Path, 
+   *                                    Configuration)}.
+   */
+  @Test
+  public void testPossiblyCompressedDecompressedStreams() throws IOException {
+    JobConf conf = new JobConf();
+    FileSystem lfs = FileSystem.getLocal(conf);
+    String inputLine = "Hi Hello!";
+
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+    CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
+    conf.setBoolean("mapred.output.compress", true);
+    conf.setClass("mapred.output.compression.codec", GzipCodec.class, 
+                  CompressionCodec.class);
+
+    // define the test's root temp directory
+    Path rootTempDir =
+        new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+            lfs.getUri(), lfs.getWorkingDirectory());
+
+    Path tempDir =
+      new Path(rootTempDir, "TestPossiblyCompressedDecompressedStreams");
+    lfs.delete(tempDir, true);
+
+    // create a compressed file
+    Path compressedFile = new Path(tempDir, "test");
+    OutputStream out = 
+      CompressionEmulationUtil.getPossiblyCompressedOutputStream(compressedFile, 
+                                                                 conf);
+    BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
+    writer.write(inputLine);
+    writer.close();
+    
+    // now read back the data from the compressed stream
+    compressedFile = compressedFile.suffix(".gz");
+    InputStream in = 
+      CompressionEmulationUtil
+        .getPossiblyDecompressedInputStream(compressedFile, conf, 0);
+    BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+    String readLine = reader.readLine();
+    assertEquals("Compression/Decompression error", inputLine, readLine);
+    reader.close();
+  }
+  
+  /**
+   * Test if 
+   * {@link CompressionEmulationUtil#configureCompressionEmulation(
+   *        org.apache.hadoop.mapred.JobConf, org.apache.hadoop.mapred.JobConf)}
+   *  can extract compression related configuration parameters.
+   */
+  @Test
+  public void testExtractCompressionConfigs() {
+    JobConf source = new JobConf();
+    JobConf target = new JobConf();
+    
+    // set the default values
+    source.setBoolean("mapred.output.compress", false);
+    source.set("mapred.output.compression.codec", "MyDefaultCodec");
+    source.set("mapred.output.compression.type", "MyDefaultType");
+    source.setBoolean("mapred.compress.map.output", false); 
+    source.set("mapred.map.output.compression.codec", "MyDefaultCodec2");
+    
+    CompressionEmulationUtil.configureCompressionEmulation(source, target);
+    
+    // check default values
+    assertFalse(target.getBoolean("mapred.output.compress", true));
+    assertEquals("MyDefaultCodec",
+                 target.get("mapred.output.compression.codec"));
+    assertEquals("MyDefaultType", target.get("mapred.output.compression.type"));
+    assertFalse(target.getBoolean("mapred.compress.map.output", true));
+    assertEquals("MyDefaultCodec2", 
+                 target.get("mapred.map.output.compression.codec"));
+    assertFalse(CompressionEmulationUtil
+                .isInputCompressionEmulationEnabled(target));
+    
+    // set new values
+    source.setBoolean("mapred.output.compress", true);
+    source.set("mapred.output.compression.codec", "MyCodec");
+    source.set("mapred.output.compression.type", "MyType");
+    source.setBoolean("mapred.compress.map.output", true);
+    source.set("mapred.map.output.compression.codec", "MyCodec2");
+    org.apache.hadoop.mapred.FileInputFormat.setInputPaths(source, "file.gz");
+    
+    target = new JobConf(); // reset
+    CompressionEmulationUtil.configureCompressionEmulation(source, target);
+    
+    // check new values
+    assertTrue(target.getBoolean("mapred.output.compress", false));
+    assertEquals("MyCodec",
+                 target.get("mapred.output.compression.codec"));
+    assertEquals("MyType", target.get("mapred.output.compression.type"));
+    assertTrue(target.getBoolean("mapred.compress.map.output", false));
+    assertEquals("MyCodec2", 
+                 target.get("mapred.map.output.compression.codec"));
+    assertTrue(CompressionEmulationUtil
+               .isInputCompressionEmulationEnabled(target));
+  }
+  
+  /**
+   * Test of {@link FileQueue} can identify compressed file and provide
+   * readers to extract uncompressed data only if input-compression is enabled.
+   */
+  @Test
+  public void testFileQueueDecompression() throws IOException {
+    JobConf conf = new JobConf();
+    FileSystem lfs = FileSystem.getLocal(conf);
+    String inputLine = "Hi Hello!";
+    
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+    CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
+    org.apache.hadoop.mapred.FileOutputFormat.setCompressOutput(conf, true);
+    org.apache.hadoop.mapred.FileOutputFormat.setOutputCompressorClass(conf, 
+                                                GzipCodec.class);
+
+    // define the test's root temp directory
+    Path rootTempDir =
+        new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+            lfs.getUri(), lfs.getWorkingDirectory());
+
+    Path tempDir = new Path(rootTempDir, "TestFileQueueDecompression");
+    lfs.delete(tempDir, true);
+
+    // create a compressed file
+    Path compressedFile = new Path(tempDir, "test");
+    OutputStream out = 
+      CompressionEmulationUtil.getPossiblyCompressedOutputStream(compressedFile, 
+                                                                 conf);
+    BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
+    writer.write(inputLine);
+    writer.close();
+    
+    compressedFile = compressedFile.suffix(".gz");
+    // now read back the data from the compressed stream using FileQueue
+    long fileSize = lfs.listStatus(compressedFile)[0].getLen();
+    CombineFileSplit split = 
+      new CombineFileSplit(new Path[] {compressedFile}, new long[] {fileSize});
+    FileQueue queue = new FileQueue(split, conf);
+    byte[] bytes = new byte[inputLine.getBytes().length];
+    queue.read(bytes);
+    queue.close();
+    String readLine = new String(bytes);
+    assertEquals("Compression/Decompression error", inputLine, readLine);
+  }
+}

Added: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java?rev=1185694&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java Tue Oct 18 14:45:48 2011
@@ -0,0 +1,498 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import static org.junit.Assert.*;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Validate emulation of distributed cache load in gridmix simulated jobs.
+ *
+ */
+public class TestDistCacheEmulation {
+
+  private DistributedCacheEmulator dce = null;
+
+  @BeforeClass
+  public static void init() throws IOException {
+    GridmixTestUtils.initCluster();
+  }
+
+  @AfterClass
+  public static void shutDown() throws IOException {
+    GridmixTestUtils.shutdownCluster();
+  }
+
+  /**
+   * Validate the dist cache files generated by GenerateDistCacheData job.
+   * @param jobConf configuration of GenerateDistCacheData job.
+   * @param sortedFileSizes array of sorted distributed cache file sizes 
+   * @throws IOException 
+   * @throws FileNotFoundException 
+   */
+  private void validateDistCacheData(JobConf jobConf, long[] sortedFileSizes)
+      throws FileNotFoundException, IOException {
+    Path distCachePath = dce.getDistributedCacheDir();
+    String filesListFile =
+        jobConf.get(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_LIST);
+    FileSystem fs = FileSystem.get(jobConf);
+
+    // Validate the existence of Distributed Cache files list file directly
+    // under distributed cache directory
+    Path listFile = new Path(filesListFile);
+    assertTrue("Path of Distributed Cache files list file is wrong.",
+        distCachePath.equals(listFile.getParent().makeQualified(fs)));
+
+    // Delete the dist cache files list file
+    assertTrue("Failed to delete distributed Cache files list file " + listFile,
+               fs.delete(listFile));
+
+    List<Long> fileSizes = new ArrayList<Long>();
+    for (long size : sortedFileSizes) {
+      fileSizes.add(size);
+    }
+    // validate dist cache files after deleting the 'files list file'
+    validateDistCacheFiles(fileSizes, distCachePath);
+  }
+
+  /**
+   * Validate private/public distributed cache files.
+   * @param filesSizesExpected list of sizes of expected dist cache files
+   * @param distCacheDir the distributed cache dir to be validated
+   * @throws IOException 
+   * @throws FileNotFoundException 
+   */
+  private void validateDistCacheFiles(List filesSizesExpected,
+      Path distCacheDir) throws FileNotFoundException, IOException {
+    //RemoteIterator<LocatedFileStatus> iter =
+    FileStatus[] statuses = GridmixTestUtils.dfs.listStatus(distCacheDir);
+    int numFiles = filesSizesExpected.size();
+    assertEquals("Number of files under distributed cache dir is wrong.",
+                 numFiles, statuses.length);
+    for (int i = 0; i < numFiles; i++) {
+      FileStatus stat = statuses[i];
+      assertTrue("File size of distributed cache file "
+          + stat.getPath().toUri().getPath() + " is wrong.",
+          filesSizesExpected.remove(stat.getLen()));
+
+      FsPermission perm = stat.getPermission();
+      assertEquals("Wrong permissions for distributed cache file "
+          + stat.getPath().toUri().getPath(),
+          new FsPermission((short)0644), perm);
+    }
+  }
+
+  /**
+   * Configures 5 HDFS-based dist cache files and 1 local-FS-based dist cache
+   * file in the given Configuration object <code>conf</code>.
+   * @param conf configuration where dist cache config properties are to be set
+   * @return array of sorted HDFS-based distributed cache file sizes
+   * @throws IOException
+   */
+  private long[] configureDummyDistCacheFiles(Configuration conf)
+      throws IOException {
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    conf.set("user.name", user);
+    // Set some dummy dist cache files in gridmix configuration so that they go
+    // into the configuration of JobStory objects.
+    String[] distCacheFiles = {"hdfs:///tmp/file1.txt",
+                               "/tmp/" + user + "/.staging/job_1/file2.txt",
+                               "hdfs:///user/user1/file3.txt",
+                               "/home/user2/file4.txt",
+                               "subdir1/file5.txt",
+                               "subdir2/file6.gz"};
+    String[] fileSizes = {"400", "2500", "700", "1200", "1500", "500"};
+
+    String[] visibilities = {"true", "false", "false", "true", "true", "false"};
+    String[] timeStamps = {"1234", "2345", "34567", "5434", "125", "134"};
+
+    conf.setStrings(DistributedCache.CACHE_FILES, distCacheFiles);
+    conf.setStrings(DistributedCache.CACHE_FILES_SIZES, fileSizes);
+    conf.setStrings(JobContext.CACHE_FILE_VISIBILITIES, visibilities);
+    conf.setStrings(DistributedCache.CACHE_FILES_TIMESTAMPS, timeStamps);
+
+    // local FS based dist cache file whose path contains <user>/.staging is
+    // not created on HDFS. So file size 2500 is not added to sortedFileSizes.
+    long[] sortedFileSizes = new long[] {1500, 1200, 700, 500, 400};
+    return sortedFileSizes;
+  }
+
+  /**
+   * Runs setupGenerateDistCacheData() on a new DistrbutedCacheEmulator and
+   * and returns the jobConf. Fills the array <code>sortedFileSizes</code> that
+   * can be used for validation.
+   * Validation of exit code from setupGenerateDistCacheData() is done.
+   * @param generate true if -generate option is specified
+   * @param sortedFileSizes sorted HDFS-based distributed cache file sizes
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private JobConf runSetupGenerateDistCacheData(boolean generate,
+      long[] sortedFileSizes) throws IOException, InterruptedException {
+    Configuration conf = new Configuration();
+    long[] fileSizes = configureDummyDistCacheFiles(conf);
+    System.arraycopy(fileSizes, 0, sortedFileSizes, 0, fileSizes.length);
+
+    // Job stories of all 3 jobs will have same dist cache files in their
+    // configurations
+    final int numJobs = 3;
+    DebugJobProducer jobProducer = new DebugJobProducer(numJobs, conf);
+
+    JobConf jobConf =
+        GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
+    Path ioPath = new Path("testSetupGenerateDistCacheData")
+                    .makeQualified(GridmixTestUtils.dfs);
+    FileSystem fs = FileSystem.get(jobConf);
+    if (fs.exists(ioPath)) {
+      fs.delete(ioPath, true);
+    }
+    FileSystem.mkdirs(fs, ioPath, new FsPermission((short)0777));
+
+    dce = createDistributedCacheEmulator(jobConf, ioPath, generate);
+    int exitCode = dce.setupGenerateDistCacheData(jobProducer);
+    int expectedExitCode = generate ? 0 : dce.MISSING_DIST_CACHE_FILES_ERROR;
+    assertEquals("setupGenerateDistCacheData failed.",
+                 expectedExitCode, exitCode);
+
+    // reset back
+    resetDistCacheConfigProperties(jobConf);
+    return jobConf;
+  }
+
+  /**
+   * Reset the config properties related to Distributed Cache in the given
+   * job configuration <code>jobConf</code>.
+   * @param jobConf job configuration
+   */
+  private void resetDistCacheConfigProperties(JobConf jobConf) {
+    // reset current/latest property names
+    jobConf.setStrings(DistributedCache.CACHE_FILES, "");
+    jobConf.setStrings(DistributedCache.CACHE_FILES_SIZES, "");
+    jobConf.setStrings(DistributedCache.CACHE_FILES_TIMESTAMPS, "");
+    jobConf.setStrings(JobContext.CACHE_FILE_VISIBILITIES, "");
+    // reset old property names
+    jobConf.setStrings("mapred.cache.files", "");
+    jobConf.setStrings("mapred.cache.files.filesizes", "");
+    jobConf.setStrings("mapred.cache.files.visibilities", "");
+    jobConf.setStrings("mapred.cache.files.timestamps", "");
+  }
+
+  /**
+   * Validate GenerateDistCacheData job if it creates dist cache files properly.
+   * @throws Exception
+   */
+  @Test
+  public void testGenerateDistCacheData() throws Exception {
+    long[] sortedFileSizes = new long[5];
+    JobConf jobConf =
+        runSetupGenerateDistCacheData(true, sortedFileSizes);
+    GridmixJob gridmixJob = new GenerateDistCacheData(jobConf);
+    Job job = gridmixJob.call();
+    assertEquals("Number of reduce tasks in GenerateDistCacheData is not 0.",
+        0, job.getNumReduceTasks());
+    assertTrue("GenerateDistCacheData job failed.",
+        job.waitForCompletion(false));
+    validateDistCacheData(jobConf, sortedFileSizes);
+  }
+
+  /**
+   *  Validate setupGenerateDistCacheData by validating
+   *  <li> permissions of the distributed cache directories and
+   *  <li> content of the generated sequence file. This includes validation of
+   *       dist cache file paths and their file sizes.
+   */
+  private void validateSetupGenDC(JobConf jobConf, long[] sortedFileSizes)
+      throws IOException, InterruptedException {
+    // build things needed for validation
+    long sumOfFileSizes = 0;
+    for (int i = 0; i < sortedFileSizes.length; i++) {
+      sumOfFileSizes += sortedFileSizes[i];
+    }
+
+    FileSystem fs = FileSystem.get(jobConf);
+    assertEquals("Number of distributed cache files to be generated is wrong.",
+        sortedFileSizes.length,
+        jobConf.getInt(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_COUNT, -1));
+    assertEquals("Total size of dist cache files to be generated is wrong.",
+        sumOfFileSizes, jobConf.getLong(
+        GenerateDistCacheData.GRIDMIX_DISTCACHE_BYTE_COUNT, -1));
+    Path filesListFile = new Path(jobConf.get(
+        GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_LIST));
+    FileStatus stat = fs.getFileStatus(filesListFile);
+    assertEquals("Wrong permissions of dist Cache files list file "
+        + filesListFile, new FsPermission((short)0644), stat.getPermission());
+
+    InputSplit split =
+        new FileSplit(filesListFile, 0, stat.getLen(), (String[])null);
+    TaskAttemptContext taskContext =
+        MapReduceTestUtil.createDummyMapTaskAttemptContext(jobConf);
+    RecordReader<LongWritable, BytesWritable> reader =
+      new GenerateDistCacheData.GenDCDataFormat().createRecordReader(
+      split, taskContext);
+    MapContext<LongWritable, BytesWritable, NullWritable, BytesWritable>
+        mapContext = new MapContext<LongWritable, BytesWritable,
+        NullWritable, BytesWritable>(jobConf, taskContext.getTaskAttemptID(),
+        reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
+    reader.initialize(split, mapContext);
+
+    // start validating setupGenerateDistCacheData
+    doValidateSetupGenDC(reader, fs, sortedFileSizes);
+  }
+
+  /**
+   *  Validate setupGenerateDistCacheData by validating
+   *  <li> permissions of the distributed cache directory and
+   *  <li> content of the generated sequence file. This includes validation of
+   *       dist cache file paths and their file sizes.
+   */
+  private void doValidateSetupGenDC(RecordReader<LongWritable, BytesWritable>
+      reader, FileSystem fs, long[] sortedFileSizes)
+      throws IOException, InterruptedException {
+
+    // Validate permissions of dist cache directory
+    Path distCacheDir = dce.getDistributedCacheDir();
+    assertEquals("Wrong permissions for distributed cache dir " + distCacheDir,
+        fs.getFileStatus(distCacheDir).getPermission()
+        .getOtherAction().and(FsAction.EXECUTE), FsAction.EXECUTE);
+
+    // Validate the content of the sequence file generated by
+    // dce.setupGenerateDistCacheData().
+    LongWritable key = new LongWritable();
+    BytesWritable val = new BytesWritable();
+    for (int i = 0; i < sortedFileSizes.length; i++) {
+      assertTrue("Number of files written to the sequence file by "
+          + "setupGenerateDistCacheData is less than the expected.",
+          reader.nextKeyValue());
+      key = reader.getCurrentKey();
+      val = reader.getCurrentValue();
+      long fileSize = key.get();
+      String file = new String(val.getBytes(), 0, val.getLength());
+
+      // Dist Cache files should be sorted based on file size.
+      assertEquals("Dist cache file size is wrong.",
+          sortedFileSizes[i], fileSize);
+
+      // Validate dist cache file path.
+
+      // parent dir of dist cache file
+      Path parent = new Path(file).getParent().makeQualified(fs);
+      // should exist in dist cache dir
+      assertTrue("Public dist cache file path is wrong.",
+          distCacheDir.equals(parent));
+    }
+  }
+
+  /**
+   *  Test if DistributedCacheEmulator's setup of GenerateDistCacheData is
+   *  working as expected.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testSetupGenerateDistCacheData()
+      throws IOException, InterruptedException {
+    long[] sortedFileSizes = new long[5];
+    JobConf jobConf = runSetupGenerateDistCacheData(true, sortedFileSizes);
+    validateSetupGenDC(jobConf, sortedFileSizes);
+
+    // Verify if correct exit code is seen when -generate option is missing and
+    // distributed cache files are missing in the expected path.
+    runSetupGenerateDistCacheData(false, sortedFileSizes);
+  }
+
+  /**
+   *  Create DistributedCacheEmulator object and do the initialization by
+   *  calling init() on it with dummy trace. Also configure the pseudo local FS.
+   */
+  private DistributedCacheEmulator createDistributedCacheEmulator(
+      Configuration conf, Path ioPath, boolean generate) throws IOException {
+    DistributedCacheEmulator dce =
+        new DistributedCacheEmulator(conf, ioPath);
+    JobCreator jobCreator = JobCreator.getPolicy(conf, JobCreator.LOADJOB);
+    jobCreator.setDistCacheEmulator(dce);
+    dce.init("dummytrace", jobCreator, generate);
+    return dce;
+  }
+
+  /**
+   *  Test the configuration property for disabling/enabling emulation of
+   *  distributed cache load.
+   */
+  @Test
+  public void testDistCacheEmulationConfigurability() throws IOException {
+    Configuration conf = new Configuration();
+    JobConf jobConf = GridmixTestUtils.mrCluster.createJobConf(
+        new JobConf(conf));
+    Path ioPath = new Path("testDistCacheEmulationConfigurability")
+        .makeQualified(GridmixTestUtils.dfs);
+    FileSystem fs = FileSystem.get(jobConf);
+    FileSystem.mkdirs(fs, ioPath, new FsPermission((short)0777));
+
+    // default config
+    dce = createDistributedCacheEmulator(jobConf, ioPath, false);
+    assertTrue("Default configuration of "
+        + DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE
+        + " is wrong.", dce.shouldEmulateDistCacheLoad());
+
+    // config property set to false
+    jobConf.setBoolean(
+        DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE, false);
+    dce = createDistributedCacheEmulator(jobConf, ioPath, false);
+    assertFalse("Disabling of emulation of distributed cache load by setting "
+        + DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE
+        + " to false is not working.", dce.shouldEmulateDistCacheLoad());
+  }
+
+  /**
+   * Verify if DistributedCacheEmulator can configure distributed cache files
+   * for simulated job if job conf from trace had no dist cache files.
+   * @param conf configuration for the simulated job to be run
+   * @param jobConf job configuration of original cluster's job, obtained from
+   *                trace
+   * @throws IOException
+   */
+  private void validateJobConfWithOutDCFiles(Configuration conf,
+      JobConf jobConf) throws IOException {
+    // Validate if Gridmix can configure dist cache files properly if there are
+    // no HDFS-based dist cache files and localFS-based dist cache files in
+    // trace for a job.
+    dce.configureDistCacheFiles(conf, jobConf);
+    assertNull("Distributed cache files configured by GridMix is wrong.",
+               conf.get(DistributedCache.CACHE_FILES));
+    assertNull("Distributed cache files configured by Gridmix through -files "
+               + "option is wrong.", conf.get("tmpfiles"));
+  }
+
+  /**
+   * Verify if DistributedCacheEmulator can configure distributed cache files
+   * for simulated job if job conf from trace had HDFS-based dist cache files
+   * and local-FS-based dist cache files.
+   * @param conf configuration for the simulated job to be run
+   * @param jobConf job configuration of original cluster's job, obtained from
+   *                trace
+   * @throws IOException
+   */
+  private void validateJobConfWithDCFiles(Configuration conf,
+      JobConf jobConf) throws IOException {
+    long[] sortedFileSizes = configureDummyDistCacheFiles(jobConf);
+
+    // 1 local FS based dist cache file and 5 HDFS based dist cache files. So
+    // total expected dist cache files count is 6.
+    assertEquals("Gridmix is not able to extract dist cache file sizes.",
+                 6, jobConf.getStrings(DistributedCache.CACHE_FILES_SIZES).length);
+    assertEquals("Gridmix is not able to extract dist cache file visibilities.",
+                 6, jobConf.getStrings(
+                      JobContext.CACHE_FILE_VISIBILITIES).length);
+
+    dce.configureDistCacheFiles(conf, jobConf);
+
+    assertEquals("Configuring of HDFS-based dist cache files by gridmix is "
+                 + "wrong.", sortedFileSizes.length,
+                 conf.getStrings(DistributedCache.CACHE_FILES).length);
+    assertEquals("Configuring of local-FS-based dist cache files by gridmix is "
+                 + "wrong.", 1, conf.getStrings("tmpfiles").length);
+  }
+
+  /**
+   * Verify if configureDistCacheFiles() works fine when there are distributed
+   * cache files set but visibilities are not set. This is to handle history
+   * traces of older hadoop version where there are no private/public
+   * Distributed Caches.
+   * @throws IOException
+   */
+  private void validateWithOutVisibilities() throws IOException {
+    Configuration conf = new Configuration();// configuration for simulated job
+    JobConf jobConf = new JobConf();
+    String user = "user1";
+    jobConf.setUser(user);
+    String[] files = {"/tmp/hdfs1.txt", "/tmp/"+ user + "/.staging/file1"};
+    jobConf.setStrings(DistributedCache.CACHE_FILES, files);
+    jobConf.setStrings(DistributedCache.CACHE_FILES_SIZES, "12,200");
+    jobConf.setStrings(DistributedCache.CACHE_FILES_TIMESTAMPS, "56789,98345");
+    dce.configureDistCacheFiles(conf, jobConf);
+    assertEquals("Configuring of HDFS-based dist cache files by gridmix is "
+                 + "wrong.", files.length,
+                 conf.getStrings(DistributedCache.CACHE_FILES).length);
+    assertNull("Configuring of local-FS-based dist cache files by gridmix is "
+               + "wrong.", conf.get("tmpfiles"));
+  }
+
+  /**
+   * Test if Gridmix can configure config properties related to Distributed
+   * Cache properly.
+   * @throws IOException
+   */
+  @Test
+  public void testDistCacheFilesConfiguration() throws IOException {
+    Configuration conf = new Configuration();
+    JobConf jobConf = GridmixTestUtils.mrCluster.createJobConf(
+                        new JobConf(conf));
+    Path ioPath = new Path("testDistCacheEmulationConfigurability")
+                    .makeQualified(GridmixTestUtils.dfs);
+    FileSystem fs = FileSystem.get(jobConf);
+    FileSystem.mkdirs(fs, ioPath, new FsPermission((short)0777));
+
+    // default config
+    dce = createDistributedCacheEmulator(jobConf, ioPath, false);
+    assertTrue("Default configuration of "
+               + DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE
+               + " is wrong.", dce.shouldEmulateDistCacheLoad());
+
+    // Validate if DistributedCacheEmulator can handle a JobStory with out
+    // Distributed Cache files properly.
+    validateJobConfWithOutDCFiles(conf, jobConf);
+
+    // Validate if Gridmix can configure dist cache files properly if there are
+    // HDFS-based dist cache files and localFS-based dist cache files in trace
+    // for a job.
+    validateJobConfWithDCFiles(conf, jobConf);
+    
+    // Use new JobConf as JobStory conf and check if configureDistCacheFiles()
+    // doesn't throw NPE when there are dist cache files set but visibilities
+    // are not set.
+    validateWithOutVisibilities();
+  }
+}

Added: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixMemoryEmulation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixMemoryEmulation.java?rev=1185694&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixMemoryEmulation.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixMemoryEmulation.java Tue Oct 18 14:45:48 2011
@@ -0,0 +1,453 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.DummyResourceCalculatorPlugin;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.gridmix.DebugJobProducer.MockJob;
+import org.apache.hadoop.mapred.gridmix.TestHighRamJob.DummyGridmixJob;
+import org.apache.hadoop.mapred.gridmix.TestResourceUsageEmulators.FakeProgressive;
+import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.TotalHeapUsageEmulatorPlugin;
+import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.TotalHeapUsageEmulatorPlugin.DefaultHeapUsageEmulator;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.ResourceCalculatorPlugin;
+import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
+
+/**
+ * Test Gridmix memory emulation.
+ */
+public class TestGridmixMemoryEmulation {
+  /**
+   * This is a dummy class that fakes heap usage.
+   */
+  private static class FakeHeapUsageEmulatorCore 
+  extends DefaultHeapUsageEmulator {
+    private int numCalls = 0;
+    
+    @Override
+    public void load(long sizeInMB) {
+      ++numCalls;
+      super.load(sizeInMB);
+    }
+    
+    // Get the total number of times load() was invoked
+    int getNumCalls() {
+      return numCalls;
+    }
+    
+    // Get the total number of 1mb objects stored within
+    long getHeapUsageInMB() {
+      return heapSpace.size();
+    }
+    
+    @Override
+    public void reset() {
+      // no op to stop emulate() from resetting
+    }
+    
+    /**
+     * For re-testing purpose.
+     */
+    void resetFake() {
+      numCalls = 0;
+      super.reset();
+    }
+  }
+
+  /**
+   * This is a dummy class that fakes the heap usage emulator plugin.
+   */
+  private static class FakeHeapUsageEmulatorPlugin 
+  extends TotalHeapUsageEmulatorPlugin {
+    private FakeHeapUsageEmulatorCore core;
+    
+    public FakeHeapUsageEmulatorPlugin(FakeHeapUsageEmulatorCore core) {
+      super(core);
+      this.core = core;
+    }
+    
+    @Override
+    protected long getMaxHeapUsageInMB() {
+      return Long.MAX_VALUE / ONE_MB;
+    }
+    
+    @Override
+    protected long getTotalHeapUsageInMB() {
+      return core.getHeapUsageInMB();
+    }
+  }
+  
+  /**
+   * Test {@link TotalHeapUsageEmulatorPlugin}'s core heap usage emulation 
+   * engine.
+   */
+  @Test
+  public void testHeapUsageEmulator() throws IOException {
+    FakeHeapUsageEmulatorCore heapEmulator = new FakeHeapUsageEmulatorCore();
+    
+    long testSizeInMB = 10; // 10 mb
+    long previousHeap = heapEmulator.getHeapUsageInMB();
+    heapEmulator.load(testSizeInMB);
+    long currentHeap = heapEmulator.getHeapUsageInMB();
+    
+    // check if the heap has increased by expected value
+    assertEquals("Default heap emulator failed to load 10mb", 
+                 previousHeap + testSizeInMB, currentHeap);
+    
+    // test reset
+    heapEmulator.resetFake();
+    assertEquals("Default heap emulator failed to reset", 
+                 0, heapEmulator.getHeapUsageInMB());
+  }
+
+  /**
+   * Test {@link TotalHeapUsageEmulatorPlugin}.
+   */
+  @Test
+  public void testTotalHeapUsageEmulatorPlugin() throws Exception {
+    Configuration conf = new Configuration();
+    // set the dummy resource calculator for testing
+    ResourceCalculatorPlugin monitor = new DummyResourceCalculatorPlugin();
+    long maxHeapUsage = 1024 * TotalHeapUsageEmulatorPlugin.ONE_MB; // 1GB
+    conf.setLong(DummyResourceCalculatorPlugin.MAXPMEM_TESTING_PROPERTY, 
+                 maxHeapUsage);
+    monitor.setConf(conf);
+    
+    // no buffer to be reserved
+    conf.setFloat(TotalHeapUsageEmulatorPlugin.MIN_HEAP_FREE_RATIO, 0F);
+    // only 1 call to be made per cycle
+    conf.setFloat(TotalHeapUsageEmulatorPlugin.HEAP_LOAD_RATIO, 1F);
+    long targetHeapUsageInMB = 200; // 200mb
+    
+    // fake progress indicator
+    FakeProgressive fakeProgress = new FakeProgressive();
+    
+    // fake heap usage generator
+    FakeHeapUsageEmulatorCore fakeCore = new FakeHeapUsageEmulatorCore();
+    
+    // a heap usage emulator with fake core
+    FakeHeapUsageEmulatorPlugin heapPlugin = 
+      new FakeHeapUsageEmulatorPlugin(fakeCore);
+    
+    // test with invalid or missing resource usage value
+    ResourceUsageMetrics invalidUsage = 
+      TestResourceUsageEmulators.createMetrics(0);
+    heapPlugin.initialize(conf, invalidUsage, null, null);
+    
+    // test if disabled heap emulation plugin's emulate() call is a no-operation
+    // this will test if the emulation plugin is disabled or not
+    int numCallsPre = fakeCore.getNumCalls();
+    long heapUsagePre = fakeCore.getHeapUsageInMB();
+    heapPlugin.emulate();
+    int numCallsPost = fakeCore.getNumCalls();
+    long heapUsagePost = fakeCore.getHeapUsageInMB();
+    
+    //  test if no calls are made heap usage emulator core
+    assertEquals("Disabled heap usage emulation plugin works!", 
+                 numCallsPre, numCallsPost);
+    //  test if no calls are made heap usage emulator core
+    assertEquals("Disabled heap usage emulation plugin works!", 
+                 heapUsagePre, heapUsagePost);
+    
+    // test with wrong/invalid configuration
+    Boolean failed = null;
+    invalidUsage = 
+      TestResourceUsageEmulators.createMetrics(maxHeapUsage 
+                                   + TotalHeapUsageEmulatorPlugin.ONE_MB);
+    try {
+      heapPlugin.initialize(conf, invalidUsage, monitor, null);
+      failed = false;
+    } catch (Exception e) {
+      failed = true;
+    }
+    assertNotNull("Fail case failure!", failed);
+    assertTrue("Expected failure!", failed); 
+    
+    // test with valid resource usage value
+    ResourceUsageMetrics metrics = 
+      TestResourceUsageEmulators.createMetrics(targetHeapUsageInMB 
+                                   * TotalHeapUsageEmulatorPlugin.ONE_MB);
+    
+    // test with default emulation interval
+    // in every interval, the emulator will add 100% of the expected usage 
+    // (since gridmix.emulators.resource-usage.heap.load-ratio=1)
+    // so at 10%, emulator will add 10% (difference), at 20% it will add 10% ...
+    // So to emulate 200MB, it will add
+    //   20mb + 20mb + 20mb + 20mb + .. = 200mb 
+    testEmulationAccuracy(conf, fakeCore, monitor, metrics, heapPlugin, 200, 
+                          10);
+    
+    // test with custom value for emulation interval of 20%
+    conf.setFloat(TotalHeapUsageEmulatorPlugin.HEAP_EMULATION_PROGRESS_INTERVAL,
+                  0.2F);
+    //  40mb + 40mb + 40mb + 40mb + 40mb = 200mb
+    testEmulationAccuracy(conf, fakeCore, monitor, metrics, heapPlugin, 200, 5);
+    
+    // test with custom value of free heap ratio and load ratio = 1
+    conf.setFloat(TotalHeapUsageEmulatorPlugin.HEAP_LOAD_RATIO, 1F);
+    conf.setFloat(TotalHeapUsageEmulatorPlugin.MIN_HEAP_FREE_RATIO, 0.5F);
+    //  40mb + 0mb + 80mb + 0mb + 0mb = 120mb
+    testEmulationAccuracy(conf, fakeCore, monitor, metrics, heapPlugin, 120, 2);
+    
+    // test with custom value of heap load ratio and min free heap ratio = 0
+    conf.setFloat(TotalHeapUsageEmulatorPlugin.HEAP_LOAD_RATIO, 0.5F);
+    conf.setFloat(TotalHeapUsageEmulatorPlugin.MIN_HEAP_FREE_RATIO, 0F);
+    // 20mb (call#1) + 20mb (call#1) + 20mb (call#2) + 20mb (call#2) +.. = 200mb
+    testEmulationAccuracy(conf, fakeCore, monitor, metrics, heapPlugin, 200, 
+                          10);
+    
+    // test with custom value of free heap ratio = 0.3 and load ratio = 0.5
+    conf.setFloat(TotalHeapUsageEmulatorPlugin.MIN_HEAP_FREE_RATIO, 0.25F);
+    conf.setFloat(TotalHeapUsageEmulatorPlugin.HEAP_LOAD_RATIO, 0.5F);
+    // 20mb (call#1) + 20mb (call#1) + 30mb (call#2) + 0mb (call#2) 
+    // + 30mb (call#3) + 0mb (call#3) + 35mb (call#4) + 0mb (call#4)
+    // + 37mb (call#5) + 0mb (call#5) = 162mb
+    testEmulationAccuracy(conf, fakeCore, monitor, metrics, heapPlugin, 162, 6);
+    
+    // test if emulation interval boundary is respected
+    fakeProgress = new FakeProgressive(); // initialize
+    conf.setFloat(TotalHeapUsageEmulatorPlugin.MIN_HEAP_FREE_RATIO, 0F);
+    conf.setFloat(TotalHeapUsageEmulatorPlugin.HEAP_LOAD_RATIO, 1F);
+    conf.setFloat(TotalHeapUsageEmulatorPlugin.HEAP_EMULATION_PROGRESS_INTERVAL,
+                  0.25F);
+    heapPlugin.initialize(conf, metrics, monitor, fakeProgress);
+    fakeCore.resetFake();
+    // take a snapshot after the initialization
+    long initHeapUsage = fakeCore.getHeapUsageInMB();
+    long initNumCallsUsage = fakeCore.getNumCalls();
+    // test with 0 progress
+    testEmulationBoundary(0F, fakeCore, fakeProgress, heapPlugin, initHeapUsage, 
+                          initNumCallsUsage, "[no-op, 0 progress]");
+    // test with 24% progress
+    testEmulationBoundary(0.24F, fakeCore, fakeProgress, heapPlugin, 
+                          initHeapUsage, initNumCallsUsage, 
+                          "[no-op, 24% progress]");
+    // test with 25% progress
+    testEmulationBoundary(0.25F, fakeCore, fakeProgress, heapPlugin, 
+        targetHeapUsageInMB / 4, 1, "[op, 25% progress]");
+    // test with 80% progress
+    testEmulationBoundary(0.80F, fakeCore, fakeProgress, heapPlugin, 
+        (targetHeapUsageInMB * 4) / 5, 2, "[op, 80% progress]");
+    
+    // now test if the final call with 100% progress ramps up the heap usage
+    testEmulationBoundary(1F, fakeCore, fakeProgress, heapPlugin, 
+        targetHeapUsageInMB, 3, "[op, 100% progress]");
+  }
+
+  // test whether the heap usage emulator achieves the desired target using
+  // desired calls to the underling core engine.
+  private static void testEmulationAccuracy(Configuration conf, 
+                        FakeHeapUsageEmulatorCore fakeCore,
+                        ResourceCalculatorPlugin monitor,
+                        ResourceUsageMetrics metrics,
+                        TotalHeapUsageEmulatorPlugin heapPlugin,
+                        long expectedTotalHeapUsageInMB,
+                        long expectedTotalNumCalls)
+  throws Exception {
+    FakeProgressive fakeProgress = new FakeProgressive();
+    fakeCore.resetFake();
+    heapPlugin.initialize(conf, metrics, monitor, fakeProgress);
+    int numLoops = 0;
+    while (fakeProgress.getProgress() < 1) {
+      ++numLoops;
+      float progress = numLoops / 100.0F;
+      fakeProgress.setProgress(progress);
+      heapPlugin.emulate();
+    }
+    
+    // test if the resource plugin shows the expected usage
+    assertEquals("Cumulative heap usage emulator plugin failed (total usage)!", 
+                 expectedTotalHeapUsageInMB, fakeCore.getHeapUsageInMB(), 1L);
+    // test if the resource plugin shows the expected num calls
+    assertEquals("Cumulative heap usage emulator plugin failed (num calls)!", 
+                 expectedTotalNumCalls, fakeCore.getNumCalls(), 0L);
+  }
+
+  // tests if the heap usage emulation plugin emulates only at the expected
+  // progress gaps
+  private static void testEmulationBoundary(float progress, 
+      FakeHeapUsageEmulatorCore fakeCore, FakeProgressive fakeProgress, 
+      TotalHeapUsageEmulatorPlugin heapPlugin, long expectedTotalHeapUsageInMB, 
+      long expectedTotalNumCalls, String info) throws Exception {
+    fakeProgress.setProgress(progress);
+    heapPlugin.emulate();
+    // test heap usage
+    assertEquals("Emulation interval test for heap usage failed " + info + "!", 
+                 expectedTotalHeapUsageInMB, fakeCore.getHeapUsageInMB(), 0L);
+    // test num calls
+    assertEquals("Emulation interval test for heap usage failed " + info + "!", 
+                 expectedTotalNumCalls, fakeCore.getNumCalls(), 0L);
+  }
+  
+  /**
+   * Test the specified task java heap options.
+   */
+  @SuppressWarnings("deprecation")
+  private void testJavaHeapOptions(String mapOptions, 
+      String reduceOptions, String taskOptions, String defaultMapOptions, 
+      String defaultReduceOptions, String defaultTaskOptions, 
+      String expectedMapOptions, String expectedReduceOptions, 
+      String expectedTaskOptions) throws Exception {
+    Configuration simulatedConf = new Configuration(false);
+//    // reset the configuration parameters
+//    simulatedConf.set(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, "");
+//    simulatedConf.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "");
+//    simulatedConf.set(JobConf.MAPRED_TASK_JAVA_OPTS, "");
+    
+    // set the default map task options
+    if (defaultMapOptions != null) {
+      simulatedConf.set(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, defaultMapOptions);
+    }
+    // set the default reduce task options
+    if (defaultReduceOptions != null) {
+      simulatedConf.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS,
+                        defaultReduceOptions);
+    }
+    // set the default task options
+    if (defaultTaskOptions != null) {
+      simulatedConf.set(JobConf.MAPRED_TASK_JAVA_OPTS, defaultTaskOptions);
+    }
+    
+    Configuration originalConf = new Configuration(false);
+//    // reset the configuration parameters
+//    originalConf.set(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, "");
+//    originalConf.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "");
+//    originalConf.set(JobConf.MAPRED_TASK_JAVA_OPTS, "");
+    
+    // set the map task options
+    if (mapOptions != null) {
+      originalConf.set(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, mapOptions);
+    }
+    // set the reduce task options
+    if (reduceOptions != null) {
+      originalConf.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, reduceOptions);
+    }
+    // set the task options
+    if (taskOptions != null) {
+      originalConf.set(JobConf.MAPRED_TASK_JAVA_OPTS, taskOptions);
+    }
+    
+    // configure the task jvm's heap options
+    GridmixJob.configureTaskJVMOptions(originalConf, simulatedConf);
+    
+    assertEquals("Map heap options mismatch!", expectedMapOptions, 
+                 simulatedConf.get(JobConf.MAPRED_MAP_TASK_JAVA_OPTS));
+    assertEquals("Reduce heap options mismatch!", expectedReduceOptions, 
+                 simulatedConf.get(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS));
+    assertEquals("Task heap options mismatch!", expectedTaskOptions, 
+                 simulatedConf.get(JobConf.MAPRED_TASK_JAVA_OPTS));
+  }
+  
+  /**
+   * Test task-level java heap options configuration in {@link GridmixJob}.
+   */
+  @Test
+  public void testJavaHeapOptions() throws Exception {
+    // test missing opts
+    testJavaHeapOptions(null, null, null, null, null, null, null, null, 
+                        null);
+    
+    // test original heap opts and missing default opts
+    testJavaHeapOptions("-Xms10m", "-Xms20m", "-Xms30m", null, null, null,
+                        null, null, null);
+    
+    // test missing opts with default opts
+    testJavaHeapOptions(null, null, null, "-Xms10m", "-Xms20m", "-Xms30m",
+                        "-Xms10m", "-Xms20m", "-Xms30m");
+    
+    // test empty option
+    testJavaHeapOptions("", "", "", null, null, null, null, null, null);
+    
+    // test empty default option and no original heap options
+    testJavaHeapOptions(null, null, null, "", "", "", "", "", "");
+    
+    // test empty opts and default opts
+    testJavaHeapOptions("", "", "", "-Xmx10m -Xms1m", "-Xmx50m -Xms2m", 
+                        "-Xms2m -Xmx100m", "-Xmx10m -Xms1m", "-Xmx50m -Xms2m", 
+                        "-Xms2m -Xmx100m");
+    
+    // test custom heap opts with no default opts
+    testJavaHeapOptions("-Xmx10m", "-Xmx20m", "-Xmx30m", null, null, null,
+                        "-Xmx10m", "-Xmx20m", "-Xmx30m");
+    
+    // test heap opts with default opts (multiple value)
+    testJavaHeapOptions("-Xms5m -Xmx200m", "-Xms15m -Xmx300m", 
+                        "-Xms25m -Xmx50m", "-XXabc", "-XXxyz", "-XXdef", 
+                        "-XXabc -Xmx200m", "-XXxyz -Xmx300m", "-XXdef -Xmx50m");
+    
+    // test heap opts with default opts (duplication of -Xmx)
+    testJavaHeapOptions("-Xms5m -Xmx200m", "-Xms15m -Xmx300m", 
+                        "-Xms25m -Xmx50m", "-XXabc -Xmx500m", "-XXxyz -Xmx600m",
+                        "-XXdef -Xmx700m", "-XXabc -Xmx200m", "-XXxyz -Xmx300m",
+                        "-XXdef -Xmx50m");
+    
+    // test heap opts with default opts (single value)
+    testJavaHeapOptions("-Xmx10m", "-Xmx20m", "-Xmx50m", "-Xms2m", 
+                        "-Xms3m", "-Xms5m", "-Xms2m -Xmx10m", "-Xms3m -Xmx20m",
+                        "-Xms5m -Xmx50m");
+    
+    // test heap opts with default opts (duplication of -Xmx)
+    testJavaHeapOptions("-Xmx10m", "-Xmx20m", "-Xmx50m", "-Xmx2m", 
+                        "-Xmx3m", "-Xmx5m", "-Xmx10m", "-Xmx20m", "-Xmx50m");
+  }
+  
+  /**
+   * Test disabled task heap options configuration in {@link GridmixJob}.
+   */
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testJavaHeapOptionsDisabled() throws Exception {
+    Configuration gridmixConf = new Configuration();
+    gridmixConf.setBoolean(GridmixJob.GRIDMIX_TASK_JVM_OPTIONS_ENABLE, false);
+    
+    // set the default values of simulated job
+    gridmixConf.set(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, "-Xmx1m");
+    gridmixConf.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "-Xmx2m");
+    gridmixConf.set(JobConf.MAPRED_TASK_JAVA_OPTS, "-Xmx3m");
+    
+    // set the default map and reduce task options for original job
+    final JobConf originalConf = new JobConf();
+    originalConf.set(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, "-Xmx10m");
+    originalConf.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "-Xmx20m");
+    originalConf.set(JobConf.MAPRED_TASK_JAVA_OPTS, "-Xmx30m");
+    
+    // define a mock job
+    MockJob story = new MockJob(originalConf) {
+      public JobConf getJobConf() {
+        return originalConf;
+      }
+    };
+    
+    GridmixJob job = new DummyGridmixJob(gridmixConf, story);
+    Job simulatedJob = job.getJob();
+    Configuration simulatedConf = simulatedJob.getConfiguration();
+    
+    assertEquals("Map heap options works when disabled!", "-Xmx1m", 
+                 simulatedConf.get(JobConf.MAPRED_MAP_TASK_JAVA_OPTS));
+    assertEquals("Reduce heap options works when disabled!", "-Xmx2m", 
+                 simulatedConf.get(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS));
+    assertEquals("Task heap options works when disabled!", "-Xmx3m", 
+                 simulatedConf.get(JobConf.MAPRED_TASK_JAVA_OPTS));
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java Tue Oct 18 14:45:48 2011
@@ -176,7 +176,8 @@ public class TestGridmixRecord {
       a.setReduceOutputBytes(out_bytes);
       final int min = WritableUtils.getVIntSize(in_rec)
                     + WritableUtils.getVIntSize(out_rec)
-                    + WritableUtils.getVIntSize(out_bytes);
+                    + WritableUtils.getVIntSize(out_bytes)
+                    + WritableUtils.getVIntSize(0);
       assertEquals(min + 2, a.fixedBytes()); // meta + vint min
       final int size = r.nextInt(1024) + a.fixedBytes() + 1;
       setSerialize(a, r.nextLong(), size, out);
@@ -207,7 +208,7 @@ public class TestGridmixRecord {
 
   @Test
   public void testKeySpec() throws Exception {
-    final int min = 5;
+    final int min = 6;
     final int max = 300;
     final GridmixKey a = new GridmixKey(GridmixKey.REDUCE_SPEC, 1, 0L);
     final GridmixKey b = new GridmixKey(GridmixKey.REDUCE_SPEC, 1, 0L);

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Tue Oct 18 14:45:48 2011
@@ -23,6 +23,8 @@ import org.apache.commons.logging.impl.L
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.Counters;
@@ -34,6 +36,7 @@ import org.apache.hadoop.mapred.TaskRepo
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
 import org.apache.hadoop.tools.rumen.TaskInfo;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.log4j.Level;
@@ -41,13 +44,16 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.InputStream;
 import java.io.IOException;
+import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.zip.GZIPInputStream;
 
 import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_RECORDS;
 import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_BYTES;
@@ -103,17 +109,10 @@ public class TestGridmixSubmission {
         GridmixTestUtils.mrCluster.createJobConf());
       for (Job job : succeeded) {
         final String jobname = job.getJobName();
-        if ("GRIDMIX_GENDATA".equals(jobname)) {
-          if (!job.getConfiguration().getBoolean(
-            GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE, true)) {
-            assertEquals(
-              " Improper queue for " + job.getJobName(),
-              job.getConfiguration().get("mapred.job.queue.name"), "q1");
-          } else {
-            assertEquals(
-              " Improper queue for " + job.getJobName(),
-              job.getConfiguration().get("mapred.job.queue.name"), "default");
-          }
+        final String jobName = job.getJobName();
+        Configuration conf = job.getConfiguration();
+        if (GenerateData.JOB_NAME.equals(jobName)) {
+          verifyQueue(conf, jobName);
           final Path in = new Path("foo").makeQualified(GridmixTestUtils.dfs);
           final Path out = new Path("/gridmix").makeQualified(GridmixTestUtils.dfs);
           final ContentSummary generated = GridmixTestUtils.dfs.getContentSummary(in);
@@ -123,37 +122,55 @@ public class TestGridmixSubmission {
           FileStatus[] outstat = GridmixTestUtils.dfs.listStatus(out);
           assertEquals("Mismatched job count", NJOBS, outstat.length);
           continue;
+        } else if (GenerateDistCacheData.JOB_NAME.equals(jobName)) {
+          verifyQueue(conf, jobName);
+          continue;
         }
-        
-        if (!job.getConfiguration().getBoolean(
-          GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE, true)) {
-          assertEquals(" Improper queue for  " + job.getJobName() + " " ,
-          job.getConfiguration().get("mapred.job.queue.name"),"q1" );
+
+        if (!conf.getBoolean(
+            GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE, true)) {
+          assertEquals(" Improper queue for  " + jobName + " " ,
+              conf.get("mapred.queue.name"), "q1" );
         } else {
-          assertEquals(
-            " Improper queue for  " + job.getJobName() + " ",
-            job.getConfiguration().get("mapred.job.queue.name"), sub.get(
-              job.getConfiguration().get(GridmixJob.ORIGNAME)).getQueueName());
+          assertEquals(" Improper queue for  " + jobName + " ",
+              conf.get("mapred.queue.name"),
+              sub.get(conf.get(Gridmix.ORIGINAL_JOB_ID)).getQueueName());
         }
 
-        final JobStory spec =
-          sub.get(job.getConfiguration().get(GridmixJob.ORIGNAME));
-        assertNotNull("No spec for " + job.getJobName(), spec);
-        assertNotNull("No counters for " + job.getJobName(), job.getCounters());
-        final String specname = spec.getName();
-        final FileStatus stat = GridmixTestUtils.dfs.getFileStatus(new Path(
-          GridmixTestUtils.DEST, "" +
-              Integer.valueOf(specname.substring(specname.length() - 5))));
-        assertEquals("Wrong owner for " + job.getJobName(), spec.getUser(),
-            stat.getOwner());
-
+        final String originalJobId = conf.get(Gridmix.ORIGINAL_JOB_ID);
+        final JobStory spec = sub.get(originalJobId);
+        assertNotNull("No spec for " + jobName, spec);
+        assertNotNull("No counters for " + jobName, job.getCounters());
+        final String originalJobName = spec.getName();
+        System.out.println("originalJobName=" + originalJobName
+            + ";GridmixJobName=" + jobName + ";originalJobID=" + originalJobId);
+        assertTrue("Original job name is wrong.", originalJobName.equals(
+            conf.get(Gridmix.ORIGINAL_JOB_NAME)));
+
+        // Gridmix job seqNum contains 6 digits
+        int seqNumLength = 6;
+        String jobSeqNum = new DecimalFormat("000000").format(
+            conf.getInt(GridmixJob.GRIDMIX_JOB_SEQ, -1));
+        // Original job name is of the format MOCKJOB<6 digit sequence number>
+        // because MockJob jobNames are of this format.
+        assertTrue(originalJobName.substring(
+            originalJobName.length() - seqNumLength).equals(jobSeqNum));
+
+        assertTrue("Gridmix job name is not in the expected format.",
+            jobName.equals(
+                GridmixJob.JOB_NAME_PREFIX + jobSeqNum));
+        final FileStatus stat =
+          GridmixTestUtils.dfs.getFileStatus(
+            new Path(GridmixTestUtils.DEST, "" + Integer.valueOf(jobSeqNum)));
+        assertEquals("Wrong owner for " + jobName, spec.getUser(),
+                     stat.getOwner());
         final int nMaps = spec.getNumberMaps();
         final int nReds = spec.getNumberReduces();
 
         // TODO Blocked by MAPREDUCE-118
         if (true) return;
         // TODO
-        System.out.println(jobname + ": " + nMaps + "/" + nReds);
+        System.out.println(jobName + ": " + nMaps + "/" + nReds);
         final TaskReport[] mReports =
           client.getMapTaskReports(JobID.downgrade(job.getJobID()));
         assertEquals("Mismatched map count", nMaps, mReports.length);
@@ -168,6 +185,18 @@ public class TestGridmixSubmission {
       }
     }
 
+    // Verify if correct job queue is used
+    private void verifyQueue(Configuration conf, String jobName) {
+      if (!conf.getBoolean(
+          GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE, true)) {
+        assertEquals(" Improper queue for " + jobName,
+            conf.get("mapred.job.queue.name"), "q1");
+      } else {
+        assertEquals(" Improper queue for " + jobName,
+            conf.get("mapred.job.queue.name"), "default");
+      }
+    }
+
     public void check(final TaskType type, Job job, JobStory spec,
           final TaskReport[] runTasks,
           long extraInputBytes, int extraInputRecords,
@@ -325,19 +354,118 @@ public class TestGridmixSubmission {
     }
   }
 
+  /**
+   * Verifies that the given {@code JobStory} corresponds to the checked-in
+   * WordCount {@code JobStory}. The verification is effected via JUnit
+   * assertions.
+   *
+   * @param js the candidate JobStory.
+   */
+  private void verifyWordCountJobStory(JobStory js) {
+    assertNotNull("Null JobStory", js);
+    String expectedJobStory = "WordCount:johndoe:default:1285322645148:3:1";
+    String actualJobStory = js.getName() + ":" + js.getUser() + ":"
+      + js.getQueueName() + ":" + js.getSubmissionTime() + ":"
+      + js.getNumberMaps() + ":" + js.getNumberReduces();
+    assertEquals("Unexpected JobStory", expectedJobStory, actualJobStory);
+  }
+
+  /**
+   * Expands a file compressed using {@code gzip}.
+   *
+   * @param fs the {@code FileSystem} corresponding to the given
+   * file.
+   *
+   * @param in the path to the compressed file.
+   *
+   * @param out the path to the uncompressed output.
+   *
+   * @throws Exception if there was an error during the operation.
+   */
+  private void expandGzippedTrace(FileSystem fs, Path in, Path out)
+    throws Exception {
+    byte[] buff = new byte[4096];
+    GZIPInputStream gis = new GZIPInputStream(fs.open(in));
+    FSDataOutputStream fsdos = fs.create(out);
+    int numRead;
+    while ((numRead = gis.read(buff, 0, buff.length)) != -1) {
+      fsdos.write(buff, 0, numRead);
+    }
+    gis.close();
+    fsdos.close();
+  }
+
+  /**
+   * Tests the reading of traces in GridMix3. These traces are generated
+   * by Rumen and are in the JSON format. The traces can optionally be
+   * compressed and uncompressed traces can also be passed to GridMix3 via
+   * its standard input stream. The testing is effected via JUnit assertions.
+   *
+   * @throws Exception if there was an error.
+   */
+  @Test
+  public void testTraceReader() throws Exception {
+    Configuration conf = new Configuration();
+    FileSystem lfs = FileSystem.getLocal(conf);
+    Path rootInputDir = new Path(System.getProperty("src.test.data"));
+    rootInputDir
+      = rootInputDir.makeQualified(lfs.getUri(), lfs.getWorkingDirectory());
+    Path rootTempDir
+      = new Path(System.getProperty("test.build.data",
+        System.getProperty("java.io.tmpdir")), "testTraceReader");
+    rootTempDir
+      = rootTempDir.makeQualified(lfs.getUri(), lfs.getWorkingDirectory());
+    Path inputFile = new Path(rootInputDir, "wordcount.json.gz");
+    Path tempFile = new Path(rootTempDir, "gridmix3-wc.json");
+
+    InputStream origStdIn = System.in;
+    InputStream tmpIs = null;
+    try {
+      DebugGridmix dgm = new DebugGridmix();
+      JobStoryProducer jsp
+        = dgm.createJobStoryProducer(inputFile.toString(), conf);
+
+      System.out.println("Verifying JobStory from compressed trace...");
+      verifyWordCountJobStory(jsp.getNextJob());
+
+      expandGzippedTrace(lfs, inputFile, tempFile);
+      jsp = dgm.createJobStoryProducer(tempFile.toString(), conf);
+      System.out.println("Verifying JobStory from uncompressed trace...");
+      verifyWordCountJobStory(jsp.getNextJob());
+
+      tmpIs = lfs.open(tempFile);
+      System.setIn(tmpIs);
+      System.out.println("Verifying JobStory from trace in standard input...");
+      jsp = dgm.createJobStoryProducer("-", conf);
+      verifyWordCountJobStory(jsp.getNextJob());
+    } finally {
+      System.setIn(origStdIn);
+      if (tmpIs != null) {
+	tmpIs.close();
+      }
+      lfs.delete(rootTempDir, true);
+    }
+  }
+
   @Test
   public void testReplaySubmit() throws Exception {
     policy = GridmixJobSubmissionPolicy.REPLAY;
     System.out.println(" Replay started at " + System.currentTimeMillis());
-    doSubmission(false);
+    doSubmission(false, false);
     System.out.println(" Replay ended at " + System.currentTimeMillis());
+
+    System.out.println(" Replay started with default output path at time "
+        + System.currentTimeMillis());
+    doSubmission(false, true);
+    System.out.println(" Replay ended with default output path at time "
+        + System.currentTimeMillis());
   }
   
   @Test
   public void testStressSubmit() throws Exception {
     policy = GridmixJobSubmissionPolicy.STRESS;
     System.out.println(" Stress started at " + System.currentTimeMillis());
-    doSubmission(false);
+    doSubmission(false, false);
     System.out.println(" Stress ended at " + System.currentTimeMillis());
   }
 
@@ -346,7 +474,7 @@ public class TestGridmixSubmission {
     policy = GridmixJobSubmissionPolicy.STRESS;
     System.out.println(
       " Stress with default q started at " + System.currentTimeMillis());
-    doSubmission(true);
+    doSubmission(true, false);
     System.out.println(
       " Stress with default q ended at " + System.currentTimeMillis());
   }
@@ -355,26 +483,39 @@ public class TestGridmixSubmission {
   public void testSerialSubmit() throws Exception {
     policy = GridmixJobSubmissionPolicy.SERIAL;
     System.out.println("Serial started at " + System.currentTimeMillis());
-    doSubmission(false);
+    doSubmission(false, false);
     System.out.println("Serial ended at " + System.currentTimeMillis());
   }
 
-  private void doSubmission(boolean useDefaultQueue) throws Exception {
+  private void doSubmission(boolean useDefaultQueue,
+      boolean defaultOutputPath) throws Exception {
     final Path in = new Path("foo").makeQualified(GridmixTestUtils.dfs);
     final Path out = GridmixTestUtils.DEST.makeQualified(GridmixTestUtils.dfs);
     final Path root = new Path("/user");
     Configuration conf = null;
+
     try{
-    final String[] argv = {
-      "-D" + FilePool.GRIDMIX_MIN_FILE + "=0",
-      "-D" + Gridmix.GRIDMIX_OUT_DIR + "=" + out,
-      "-D" + Gridmix.GRIDMIX_USR_RSV + "=" + EchoUserResolver.class.getName(),
-      "-generate", String.valueOf(GENDATA) + "m",
-      in.toString(),
-      "-" // ignored by DebugGridmix
-    };
-    DebugGridmix client = new DebugGridmix();
-    conf = new Configuration();
+      ArrayList<String> argsList = new ArrayList<String>();
+
+      argsList.add("-D" + FilePool.GRIDMIX_MIN_FILE + "=0");
+      argsList.add("-D" + Gridmix.GRIDMIX_USR_RSV + "="
+          + EchoUserResolver.class.getName());
+
+      // Set the config property gridmix.output.directory only if
+      // defaultOutputPath is false. If defaultOutputPath is true, then
+      // let us allow gridmix to use the path foo/gridmix/ as output dir.
+      if (!defaultOutputPath) {
+        argsList.add("-D" + Gridmix.GRIDMIX_OUT_DIR + "=" + out);
+      }
+      argsList.add("-generate");
+      argsList.add(String.valueOf(GENDATA) + "m");
+      argsList.add(in.toString());
+      argsList.add("-"); // ignored by DebugGridmix
+
+      String[] argv = argsList.toArray(new String[argsList.size()]);
+
+      DebugGridmix client = new DebugGridmix();
+      conf = new Configuration();
       conf.setEnum(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY,policy);
       if (useDefaultQueue) {
         conf.setBoolean(GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE, false);
@@ -382,13 +523,13 @@ public class TestGridmixSubmission {
       } else {
         conf.setBoolean(GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE, true);
       }
-    conf = GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
-    // allow synthetic users to create home directories
-    GridmixTestUtils.dfs.mkdirs(root, new FsPermission((short)0777));
-    GridmixTestUtils.dfs.setPermission(root, new FsPermission((short)0777));
-    int res = ToolRunner.run(conf, client, argv);
-    assertEquals("Client exited with nonzero status", 0, res);
-    client.checkMonitor();
+      conf = GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
+      // allow synthetic users to create home directories
+      GridmixTestUtils.dfs.mkdirs(root, new FsPermission((short)0777));
+      GridmixTestUtils.dfs.setPermission(root, new FsPermission((short)0777));
+      int res = ToolRunner.run(conf, client, argv);
+      assertEquals("Client exited with nonzero status", 0, res);
+      client.checkMonitor();
      } catch (Exception e) {
        e.printStackTrace();
      } finally {