You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by am...@apache.org on 2011/10/18 16:45:51 UTC
svn commit: r1185694 [4/7] - in
/hadoop/common/branches/branch-0.20-security: ./ src/contrib/
src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/
src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/
src/contrib/gridmix/sr...
Added: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java?rev=1185694&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java Tue Oct 18 14:45:48 2011
@@ -0,0 +1,563 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Utils;
+import org.apache.hadoop.mapred.gridmix.CompressionEmulationUtil.RandomTextDataMapper;
+import org.apache.hadoop.mapred.gridmix.GenerateData.GenSplit;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+/**
+ * Test {@link CompressionEmulationUtil}
+ */
+public class TestCompressionEmulationUtils {
+ //TODO Remove this once LocalJobRunner can run Gridmix.
+ static class CustomInputFormat extends GenerateData.GenDataFormat {
+ @Override
+ public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
+ // get the total data to be generated
+ long toGen =
+ jobCtxt.getConfiguration().getLong(GenerateData.GRIDMIX_GEN_BYTES, -1);
+ if (toGen < 0) {
+ throw new IOException("Invalid/missing generation bytes: " + toGen);
+ }
+ // get the total number of mappers configured
+ int totalMappersConfigured =
+ jobCtxt.getConfiguration().getInt("mapred.map.tasks", -1);
+ if (totalMappersConfigured < 0) {
+ throw new IOException("Invalid/missing num mappers: "
+ + totalMappersConfigured);
+ }
+
+ final long bytesPerTracker = toGen / totalMappersConfigured;
+ final ArrayList<InputSplit> splits =
+ new ArrayList<InputSplit>(totalMappersConfigured);
+ for (int i = 0; i < totalMappersConfigured; ++i) {
+ splits.add(new GenSplit(bytesPerTracker,
+ new String[] { "tracker_local" }));
+ }
+ return splits;
+ }
+ }
+
+ /**
+ * Test {@link RandomTextDataMapper} via {@link CompressionEmulationUtil}.
+ */
+ @Test
+ public void testRandomCompressedTextDataGenerator() throws Exception {
+ int wordSize = 10;
+ int listSize = 20;
+ long dataSize = 10*1024*1024;
+
+ Configuration conf = new Configuration();
+ CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+ CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
+
+ // configure the RandomTextDataGenerator to generate desired sized data
+ conf.setInt(RandomTextDataGenerator.GRIDMIX_DATAGEN_RANDOMTEXT_LISTSIZE,
+ listSize);
+ conf.setInt(RandomTextDataGenerator.GRIDMIX_DATAGEN_RANDOMTEXT_WORDSIZE,
+ wordSize);
+ conf.setLong(GenerateData.GRIDMIX_GEN_BYTES, dataSize);
+
+ FileSystem lfs = FileSystem.getLocal(conf);
+
+ // define the test's root temp directory
+ Path rootTempDir =
+ new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+ lfs.getUri(), lfs.getWorkingDirectory());
+
+ Path tempDir = new Path(rootTempDir, "TestRandomCompressedTextDataGenr");
+ lfs.delete(tempDir, true);
+
+ runDataGenJob(conf, tempDir);
+
+ // validate the output data
+ FileStatus[] files =
+ lfs.listStatus(tempDir, new Utils.OutputFileUtils.OutputFilesFilter());
+ long size = 0;
+ long maxLineSize = 0;
+
+ for (FileStatus status : files) {
+ InputStream in =
+ CompressionEmulationUtil
+ .getPossiblyDecompressedInputStream(status.getPath(), conf, 0);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+ String line = reader.readLine();
+ if (line != null) {
+ long lineSize = line.getBytes().length;
+ if (lineSize > maxLineSize) {
+ maxLineSize = lineSize;
+ }
+ while (line != null) {
+ for (String word : line.split("\\s")) {
+ size += word.getBytes().length;
+ }
+ line = reader.readLine();
+ }
+ }
+ reader.close();
+ }
+
+ assertTrue(size >= dataSize);
+ assertTrue(size <= dataSize + maxLineSize);
+ }
+
+ /**
+ * Runs a GridMix data-generation job.
+ */
+ private static void runDataGenJob(Configuration conf, Path tempDir)
+ throws IOException, ClassNotFoundException, InterruptedException {
+ JobConf jobConf = new JobConf(conf);
+ JobClient client = new JobClient(jobConf);
+
+ // get the local job runner
+ jobConf.setInt("mapred.map.tasks", 1);
+
+ Job job = new Job(jobConf);
+
+ CompressionEmulationUtil.configure(job);
+ job.setInputFormatClass(CustomInputFormat.class);
+
+ // set the output path
+ FileOutputFormat.setOutputPath(job, tempDir);
+
+ // submit and wait for completion
+ job.submit();
+ int ret = job.waitForCompletion(true) ? 0 : 1;
+
+ assertEquals("Job Failed", 0, ret);
+ }
+
+ /**
+ * Test if {@link RandomTextDataGenerator} can generate random text data
+ * with the desired compression ratio. This involves
+ * - using {@link CompressionEmulationUtil} to configure the MR job for
+ * generating the random text data with the desired compression ratio
+ * - running the MR job
+ * - test {@link RandomTextDataGenerator}'s output and match the output size
+ * (compressed) with the expected compression ratio.
+ */
+ private void testCompressionRatioConfigure(float ratio)
+ throws Exception {
+ long dataSize = 10*1024*1024;
+
+ Configuration conf = new Configuration();
+ CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+ CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
+
+ conf.setLong(GenerateData.GRIDMIX_GEN_BYTES, dataSize);
+
+ float expectedRatio = CompressionEmulationUtil.DEFAULT_COMPRESSION_RATIO;
+ if (ratio > 0) {
+ // set the compression ratio in the conf
+ CompressionEmulationUtil.setMapInputCompressionEmulationRatio(conf, ratio);
+ expectedRatio =
+ CompressionEmulationUtil.standardizeCompressionRatio(ratio);
+ }
+
+ // invoke the utility to map from ratio to word-size
+ CompressionEmulationUtil.setupDataGeneratorConfig(conf);
+
+ FileSystem lfs = FileSystem.getLocal(conf);
+
+ // define the test's root temp directory
+ Path rootTempDir =
+ new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+ lfs.getUri(), lfs.getWorkingDirectory());
+
+ Path tempDir =
+ new Path(rootTempDir, "TestCustomRandomCompressedTextDataGenr");
+ lfs.delete(tempDir, true);
+
+ runDataGenJob(conf, tempDir);
+
+ // validate the output data
+ FileStatus[] files =
+ lfs.listStatus(tempDir, new Utils.OutputFileUtils.OutputFilesFilter());
+ long size = 0;
+
+ for (FileStatus status : files) {
+ size += status.getLen();
+ }
+
+ float compressionRatio = ((float)size)/dataSize;
+ float stdRatio =
+ CompressionEmulationUtil.standardizeCompressionRatio(compressionRatio);
+
+ assertEquals(expectedRatio, stdRatio, 0.0D);
+ }
+
+ /**
+ * Test compression ratio with multiple compression ratios.
+ */
+ @Test
+ public void testCompressionRatios() throws Exception {
+ // test default compression ratio i.e 0.5
+ testCompressionRatioConfigure(0F);
+ // test for a sample compression ratio of 0.2
+ testCompressionRatioConfigure(0.2F);
+ // test for a sample compression ratio of 0.4
+ testCompressionRatioConfigure(0.4F);
+ // test for a sample compression ratio of 0.65
+ testCompressionRatioConfigure(0.65F);
+ // test for a compression ratio of 0.682 which should be standardized
+ // to round(0.682) i.e 0.68
+ testCompressionRatioConfigure(0.682F);
+ // test for a compression ratio of 0.567 which should be standardized
+ // to round(0.567) i.e 0.57
+ testCompressionRatioConfigure(0.567F);
+
+ // test with a compression ratio of 0.01 which less than the min supported
+ // value of 0.07
+ boolean failed = false;
+ try {
+ testCompressionRatioConfigure(0.01F);
+ } catch (RuntimeException re) {
+ failed = true;
+ }
+ assertTrue("Compression ratio min value (0.07) check failed!", failed);
+
+ // test with a compression ratio of 0.01 which less than the max supported
+ // value of 0.68
+ failed = false;
+ try {
+ testCompressionRatioConfigure(0.7F);
+ } catch (RuntimeException re) {
+ failed = true;
+ }
+ assertTrue("Compression ratio max value (0.68) check failed!", failed);
+ }
+
+ /**
+ * Test compression ratio standardization.
+ */
+ @Test
+ public void testCompressionRatioStandardization() throws Exception {
+ assertEquals(0.55F,
+ CompressionEmulationUtil.standardizeCompressionRatio(0.55F), 0.0D);
+ assertEquals(0.65F,
+ CompressionEmulationUtil.standardizeCompressionRatio(0.652F), 0.0D);
+ assertEquals(0.78F,
+ CompressionEmulationUtil.standardizeCompressionRatio(0.777F), 0.0D);
+ assertEquals(0.86F,
+ CompressionEmulationUtil.standardizeCompressionRatio(0.855F), 0.0D);
+ }
+
+ /**
+ * Test map input compression ratio configuration utilities.
+ */
+ @Test
+ public void testInputCompressionRatioConfiguration() throws Exception {
+ Configuration conf = new Configuration();
+ float ratio = 0.567F;
+ CompressionEmulationUtil.setMapInputCompressionEmulationRatio(conf, ratio);
+ assertEquals(ratio,
+ CompressionEmulationUtil.getMapInputCompressionEmulationRatio(conf),
+ 0.0D);
+ }
+
+ /**
+ * Test map output compression ratio configuration utilities.
+ */
+ @Test
+ public void testIntermediateCompressionRatioConfiguration()
+ throws Exception {
+ Configuration conf = new Configuration();
+ float ratio = 0.567F;
+ CompressionEmulationUtil.setMapOutputCompressionEmulationRatio(conf, ratio);
+ assertEquals(ratio,
+ CompressionEmulationUtil.getMapOutputCompressionEmulationRatio(conf),
+ 0.0D);
+ }
+
+ /**
+ * Test reduce output compression ratio configuration utilities.
+ */
+ @Test
+ public void testOutputCompressionRatioConfiguration() throws Exception {
+ Configuration conf = new Configuration();
+ float ratio = 0.567F;
+ CompressionEmulationUtil.setReduceOutputCompressionEmulationRatio(conf,
+ ratio);
+ assertEquals(ratio,
+ CompressionEmulationUtil.getReduceOutputCompressionEmulationRatio(conf),
+ 0.0D);
+ }
+
+ /**
+ * Test compressible {@link GridmixRecord}.
+ */
+ @Test
+ public void testCompressibleGridmixRecord() throws IOException {
+ JobConf conf = new JobConf();
+ CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+ CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
+
+ FileSystem lfs = FileSystem.getLocal(conf);
+ int dataSize = 1024 * 1024 * 10; // 10 MB
+ float ratio = 0.357F;
+
+ // define the test's root temp directory
+ Path rootTempDir =
+ new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+ lfs.getUri(), lfs.getWorkingDirectory());
+
+ Path tempDir = new Path(rootTempDir,
+ "TestPossiblyCompressibleGridmixRecord");
+ lfs.delete(tempDir, true);
+
+ // define a compressible GridmixRecord
+ GridmixRecord record = new GridmixRecord(dataSize, 0);
+ record.setCompressibility(true, ratio); // enable compression
+
+ conf.setClass("mapred.output.compression.codec", GzipCodec.class,
+ CompressionCodec.class);
+ org.apache.hadoop.mapred.FileOutputFormat.setCompressOutput(conf, true);
+
+ // write the record to a file
+ Path recordFile = new Path(tempDir, "record");
+ OutputStream outStream = CompressionEmulationUtil
+ .getPossiblyCompressedOutputStream(recordFile,
+ conf);
+ DataOutputStream out = new DataOutputStream(outStream);
+ record.write(out);
+ out.close();
+ outStream.close();
+
+ // open the compressed stream for reading
+ Path actualRecordFile = recordFile.suffix(".gz");
+ InputStream in =
+ CompressionEmulationUtil
+ .getPossiblyDecompressedInputStream(actualRecordFile, conf, 0);
+
+ // get the compressed file size
+ long compressedFileSize = lfs.listStatus(actualRecordFile)[0].getLen();
+
+ GridmixRecord recordRead = new GridmixRecord();
+ recordRead.readFields(new DataInputStream(in));
+
+ assertEquals("Record size mismatch in a compressible GridmixRecord",
+ dataSize, recordRead.getSize());
+ assertTrue("Failed to generate a compressible GridmixRecord",
+ recordRead.getSize() > compressedFileSize);
+
+ // check if the record can generate data with the desired compression ratio
+ float seenRatio = ((float)compressedFileSize)/dataSize;
+ assertEquals(CompressionEmulationUtil.standardizeCompressionRatio(ratio),
+ CompressionEmulationUtil.standardizeCompressionRatio(seenRatio), 1.0D);
+ }
+
+ /**
+ * Test
+ * {@link CompressionEmulationUtil#isCompressionEmulationEnabled(
+ * org.apache.hadoop.conf.Configuration)}.
+ */
+ @Test
+ public void testIsCompressionEmulationEnabled() {
+ Configuration conf = new Configuration();
+ // Check default values
+ assertTrue(CompressionEmulationUtil.isCompressionEmulationEnabled(conf));
+
+ // Check disabled
+ CompressionEmulationUtil.setCompressionEmulationEnabled(conf, false);
+ assertFalse(CompressionEmulationUtil.isCompressionEmulationEnabled(conf));
+
+ // Check enabled
+ CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+ assertTrue(CompressionEmulationUtil.isCompressionEmulationEnabled(conf));
+ }
+
+ /**
+ * Test
+ * {@link CompressionEmulationUtil#getPossiblyDecompressedInputStream(Path,
+ * Configuration, long)}
+ * and
+ * {@link CompressionEmulationUtil#getPossiblyCompressedOutputStream(Path,
+ * Configuration)}.
+ */
+ @Test
+ public void testPossiblyCompressedDecompressedStreams() throws IOException {
+ JobConf conf = new JobConf();
+ FileSystem lfs = FileSystem.getLocal(conf);
+ String inputLine = "Hi Hello!";
+
+ CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+ CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
+ conf.setBoolean("mapred.output.compress", true);
+ conf.setClass("mapred.output.compression.codec", GzipCodec.class,
+ CompressionCodec.class);
+
+ // define the test's root temp directory
+ Path rootTempDir =
+ new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+ lfs.getUri(), lfs.getWorkingDirectory());
+
+ Path tempDir =
+ new Path(rootTempDir, "TestPossiblyCompressedDecompressedStreams");
+ lfs.delete(tempDir, true);
+
+ // create a compressed file
+ Path compressedFile = new Path(tempDir, "test");
+ OutputStream out =
+ CompressionEmulationUtil.getPossiblyCompressedOutputStream(compressedFile,
+ conf);
+ BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
+ writer.write(inputLine);
+ writer.close();
+
+ // now read back the data from the compressed stream
+ compressedFile = compressedFile.suffix(".gz");
+ InputStream in =
+ CompressionEmulationUtil
+ .getPossiblyDecompressedInputStream(compressedFile, conf, 0);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+ String readLine = reader.readLine();
+ assertEquals("Compression/Decompression error", inputLine, readLine);
+ reader.close();
+ }
+
+ /**
+ * Test if
+ * {@link CompressionEmulationUtil#configureCompressionEmulation(
+ * org.apache.hadoop.mapred.JobConf, org.apache.hadoop.mapred.JobConf)}
+ * can extract compression related configuration parameters.
+ */
+ @Test
+ public void testExtractCompressionConfigs() {
+ JobConf source = new JobConf();
+ JobConf target = new JobConf();
+
+ // set the default values
+ source.setBoolean("mapred.output.compress", false);
+ source.set("mapred.output.compression.codec", "MyDefaultCodec");
+ source.set("mapred.output.compression.type", "MyDefaultType");
+ source.setBoolean("mapred.compress.map.output", false);
+ source.set("mapred.map.output.compression.codec", "MyDefaultCodec2");
+
+ CompressionEmulationUtil.configureCompressionEmulation(source, target);
+
+ // check default values
+ assertFalse(target.getBoolean("mapred.output.compress", true));
+ assertEquals("MyDefaultCodec",
+ target.get("mapred.output.compression.codec"));
+ assertEquals("MyDefaultType", target.get("mapred.output.compression.type"));
+ assertFalse(target.getBoolean("mapred.compress.map.output", true));
+ assertEquals("MyDefaultCodec2",
+ target.get("mapred.map.output.compression.codec"));
+ assertFalse(CompressionEmulationUtil
+ .isInputCompressionEmulationEnabled(target));
+
+ // set new values
+ source.setBoolean("mapred.output.compress", true);
+ source.set("mapred.output.compression.codec", "MyCodec");
+ source.set("mapred.output.compression.type", "MyType");
+ source.setBoolean("mapred.compress.map.output", true);
+ source.set("mapred.map.output.compression.codec", "MyCodec2");
+ org.apache.hadoop.mapred.FileInputFormat.setInputPaths(source, "file.gz");
+
+ target = new JobConf(); // reset
+ CompressionEmulationUtil.configureCompressionEmulation(source, target);
+
+ // check new values
+ assertTrue(target.getBoolean("mapred.output.compress", false));
+ assertEquals("MyCodec",
+ target.get("mapred.output.compression.codec"));
+ assertEquals("MyType", target.get("mapred.output.compression.type"));
+ assertTrue(target.getBoolean("mapred.compress.map.output", false));
+ assertEquals("MyCodec2",
+ target.get("mapred.map.output.compression.codec"));
+ assertTrue(CompressionEmulationUtil
+ .isInputCompressionEmulationEnabled(target));
+ }
+
+ /**
+ * Test of {@link FileQueue} can identify compressed file and provide
+ * readers to extract uncompressed data only if input-compression is enabled.
+ */
+ @Test
+ public void testFileQueueDecompression() throws IOException {
+ JobConf conf = new JobConf();
+ FileSystem lfs = FileSystem.getLocal(conf);
+ String inputLine = "Hi Hello!";
+
+ CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+ CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
+ org.apache.hadoop.mapred.FileOutputFormat.setCompressOutput(conf, true);
+ org.apache.hadoop.mapred.FileOutputFormat.setOutputCompressorClass(conf,
+ GzipCodec.class);
+
+ // define the test's root temp directory
+ Path rootTempDir =
+ new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+ lfs.getUri(), lfs.getWorkingDirectory());
+
+ Path tempDir = new Path(rootTempDir, "TestFileQueueDecompression");
+ lfs.delete(tempDir, true);
+
+ // create a compressed file
+ Path compressedFile = new Path(tempDir, "test");
+ OutputStream out =
+ CompressionEmulationUtil.getPossiblyCompressedOutputStream(compressedFile,
+ conf);
+ BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
+ writer.write(inputLine);
+ writer.close();
+
+ compressedFile = compressedFile.suffix(".gz");
+ // now read back the data from the compressed stream using FileQueue
+ long fileSize = lfs.listStatus(compressedFile)[0].getLen();
+ CombineFileSplit split =
+ new CombineFileSplit(new Path[] {compressedFile}, new long[] {fileSize});
+ FileQueue queue = new FileQueue(split, conf);
+ byte[] bytes = new byte[inputLine.getBytes().length];
+ queue.read(bytes);
+ queue.close();
+ String readLine = new String(bytes);
+ assertEquals("Compression/Decompression error", inputLine, readLine);
+ }
+}
Added: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java?rev=1185694&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java Tue Oct 18 14:45:48 2011
@@ -0,0 +1,498 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import static org.junit.Assert.*;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Validate emulation of distributed cache load in gridmix simulated jobs.
+ *
+ */
+public class TestDistCacheEmulation {
+
+ private DistributedCacheEmulator dce = null;
+
+ @BeforeClass
+ public static void init() throws IOException {
+ GridmixTestUtils.initCluster();
+ }
+
+ @AfterClass
+ public static void shutDown() throws IOException {
+ GridmixTestUtils.shutdownCluster();
+ }
+
+ /**
+ * Validate the dist cache files generated by GenerateDistCacheData job.
+ * @param jobConf configuration of GenerateDistCacheData job.
+ * @param sortedFileSizes array of sorted distributed cache file sizes
+ * @throws IOException
+ * @throws FileNotFoundException
+ */
+ private void validateDistCacheData(JobConf jobConf, long[] sortedFileSizes)
+ throws FileNotFoundException, IOException {
+ Path distCachePath = dce.getDistributedCacheDir();
+ String filesListFile =
+ jobConf.get(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_LIST);
+ FileSystem fs = FileSystem.get(jobConf);
+
+ // Validate the existence of Distributed Cache files list file directly
+ // under distributed cache directory
+ Path listFile = new Path(filesListFile);
+ assertTrue("Path of Distributed Cache files list file is wrong.",
+ distCachePath.equals(listFile.getParent().makeQualified(fs)));
+
+ // Delete the dist cache files list file
+ assertTrue("Failed to delete distributed Cache files list file " + listFile,
+ fs.delete(listFile));
+
+ List<Long> fileSizes = new ArrayList<Long>();
+ for (long size : sortedFileSizes) {
+ fileSizes.add(size);
+ }
+ // validate dist cache files after deleting the 'files list file'
+ validateDistCacheFiles(fileSizes, distCachePath);
+ }
+
+ /**
+ * Validate private/public distributed cache files.
+ * @param filesSizesExpected list of sizes of expected dist cache files
+ * @param distCacheDir the distributed cache dir to be validated
+ * @throws IOException
+ * @throws FileNotFoundException
+ */
+ private void validateDistCacheFiles(List filesSizesExpected,
+ Path distCacheDir) throws FileNotFoundException, IOException {
+ //RemoteIterator<LocatedFileStatus> iter =
+ FileStatus[] statuses = GridmixTestUtils.dfs.listStatus(distCacheDir);
+ int numFiles = filesSizesExpected.size();
+ assertEquals("Number of files under distributed cache dir is wrong.",
+ numFiles, statuses.length);
+ for (int i = 0; i < numFiles; i++) {
+ FileStatus stat = statuses[i];
+ assertTrue("File size of distributed cache file "
+ + stat.getPath().toUri().getPath() + " is wrong.",
+ filesSizesExpected.remove(stat.getLen()));
+
+ FsPermission perm = stat.getPermission();
+ assertEquals("Wrong permissions for distributed cache file "
+ + stat.getPath().toUri().getPath(),
+ new FsPermission((short)0644), perm);
+ }
+ }
+
+ /**
+ * Configures 5 HDFS-based dist cache files and 1 local-FS-based dist cache
+ * file in the given Configuration object <code>conf</code>.
+ * @param conf configuration where dist cache config properties are to be set
+ * @return array of sorted HDFS-based distributed cache file sizes
+ * @throws IOException
+ */
+ private long[] configureDummyDistCacheFiles(Configuration conf)
+ throws IOException {
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ conf.set("user.name", user);
+ // Set some dummy dist cache files in gridmix configuration so that they go
+ // into the configuration of JobStory objects.
+ String[] distCacheFiles = {"hdfs:///tmp/file1.txt",
+ "/tmp/" + user + "/.staging/job_1/file2.txt",
+ "hdfs:///user/user1/file3.txt",
+ "/home/user2/file4.txt",
+ "subdir1/file5.txt",
+ "subdir2/file6.gz"};
+ String[] fileSizes = {"400", "2500", "700", "1200", "1500", "500"};
+
+ String[] visibilities = {"true", "false", "false", "true", "true", "false"};
+ String[] timeStamps = {"1234", "2345", "34567", "5434", "125", "134"};
+
+ conf.setStrings(DistributedCache.CACHE_FILES, distCacheFiles);
+ conf.setStrings(DistributedCache.CACHE_FILES_SIZES, fileSizes);
+ conf.setStrings(JobContext.CACHE_FILE_VISIBILITIES, visibilities);
+ conf.setStrings(DistributedCache.CACHE_FILES_TIMESTAMPS, timeStamps);
+
+ // local FS based dist cache file whose path contains <user>/.staging is
+ // not created on HDFS. So file size 2500 is not added to sortedFileSizes.
+ long[] sortedFileSizes = new long[] {1500, 1200, 700, 500, 400};
+ return sortedFileSizes;
+ }
+
+ /**
+ * Runs setupGenerateDistCacheData() on a new DistrbutedCacheEmulator and
+ * and returns the jobConf. Fills the array <code>sortedFileSizes</code> that
+ * can be used for validation.
+ * Validation of exit code from setupGenerateDistCacheData() is done.
+ * @param generate true if -generate option is specified
+ * @param sortedFileSizes sorted HDFS-based distributed cache file sizes
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private JobConf runSetupGenerateDistCacheData(boolean generate,
+ long[] sortedFileSizes) throws IOException, InterruptedException {
+ Configuration conf = new Configuration();
+ long[] fileSizes = configureDummyDistCacheFiles(conf);
+ System.arraycopy(fileSizes, 0, sortedFileSizes, 0, fileSizes.length);
+
+ // Job stories of all 3 jobs will have same dist cache files in their
+ // configurations
+ final int numJobs = 3;
+ DebugJobProducer jobProducer = new DebugJobProducer(numJobs, conf);
+
+ JobConf jobConf =
+ GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
+ Path ioPath = new Path("testSetupGenerateDistCacheData")
+ .makeQualified(GridmixTestUtils.dfs);
+ FileSystem fs = FileSystem.get(jobConf);
+ if (fs.exists(ioPath)) {
+ fs.delete(ioPath, true);
+ }
+ FileSystem.mkdirs(fs, ioPath, new FsPermission((short)0777));
+
+ dce = createDistributedCacheEmulator(jobConf, ioPath, generate);
+ int exitCode = dce.setupGenerateDistCacheData(jobProducer);
+ int expectedExitCode = generate ? 0 : dce.MISSING_DIST_CACHE_FILES_ERROR;
+ assertEquals("setupGenerateDistCacheData failed.",
+ expectedExitCode, exitCode);
+
+ // reset back
+ resetDistCacheConfigProperties(jobConf);
+ return jobConf;
+ }
+
+ /**
+ * Reset the config properties related to Distributed Cache in the given
+ * job configuration <code>jobConf</code>.
+ * @param jobConf job configuration
+ */
+ private void resetDistCacheConfigProperties(JobConf jobConf) {
+ // reset current/latest property names
+ jobConf.setStrings(DistributedCache.CACHE_FILES, "");
+ jobConf.setStrings(DistributedCache.CACHE_FILES_SIZES, "");
+ jobConf.setStrings(DistributedCache.CACHE_FILES_TIMESTAMPS, "");
+ jobConf.setStrings(JobContext.CACHE_FILE_VISIBILITIES, "");
+ // reset old property names
+ jobConf.setStrings("mapred.cache.files", "");
+ jobConf.setStrings("mapred.cache.files.filesizes", "");
+ jobConf.setStrings("mapred.cache.files.visibilities", "");
+ jobConf.setStrings("mapred.cache.files.timestamps", "");
+ }
+
+ /**
+ * Validate GenerateDistCacheData job if it creates dist cache files properly.
+ * @throws Exception
+ */
+ @Test
+ public void testGenerateDistCacheData() throws Exception {
+ long[] sortedFileSizes = new long[5];
+ JobConf jobConf =
+ runSetupGenerateDistCacheData(true, sortedFileSizes);
+ GridmixJob gridmixJob = new GenerateDistCacheData(jobConf);
+ Job job = gridmixJob.call();
+ assertEquals("Number of reduce tasks in GenerateDistCacheData is not 0.",
+ 0, job.getNumReduceTasks());
+ assertTrue("GenerateDistCacheData job failed.",
+ job.waitForCompletion(false));
+ validateDistCacheData(jobConf, sortedFileSizes);
+ }
+
+ /**
+ * Validate setupGenerateDistCacheData by validating
+ * <li> permissions of the distributed cache directories and
+ * <li> content of the generated sequence file. This includes validation of
+ * dist cache file paths and their file sizes.
+ */
+ private void validateSetupGenDC(JobConf jobConf, long[] sortedFileSizes)
+ throws IOException, InterruptedException {
+ // build things needed for validation
+ long sumOfFileSizes = 0;
+ for (int i = 0; i < sortedFileSizes.length; i++) {
+ sumOfFileSizes += sortedFileSizes[i];
+ }
+
+ FileSystem fs = FileSystem.get(jobConf);
+ assertEquals("Number of distributed cache files to be generated is wrong.",
+ sortedFileSizes.length,
+ jobConf.getInt(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_COUNT, -1));
+ assertEquals("Total size of dist cache files to be generated is wrong.",
+ sumOfFileSizes, jobConf.getLong(
+ GenerateDistCacheData.GRIDMIX_DISTCACHE_BYTE_COUNT, -1));
+ Path filesListFile = new Path(jobConf.get(
+ GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_LIST));
+ FileStatus stat = fs.getFileStatus(filesListFile);
+ assertEquals("Wrong permissions of dist Cache files list file "
+ + filesListFile, new FsPermission((short)0644), stat.getPermission());
+
+ InputSplit split =
+ new FileSplit(filesListFile, 0, stat.getLen(), (String[])null);
+ TaskAttemptContext taskContext =
+ MapReduceTestUtil.createDummyMapTaskAttemptContext(jobConf);
+ RecordReader<LongWritable, BytesWritable> reader =
+ new GenerateDistCacheData.GenDCDataFormat().createRecordReader(
+ split, taskContext);
+ MapContext<LongWritable, BytesWritable, NullWritable, BytesWritable>
+ mapContext = new MapContext<LongWritable, BytesWritable,
+ NullWritable, BytesWritable>(jobConf, taskContext.getTaskAttemptID(),
+ reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
+ reader.initialize(split, mapContext);
+
+ // start validating setupGenerateDistCacheData
+ doValidateSetupGenDC(reader, fs, sortedFileSizes);
+ }
+
+ /**
+ * Validate setupGenerateDistCacheData by validating
+ * <li> permissions of the distributed cache directory and
+ * <li> content of the generated sequence file. This includes validation of
+ * dist cache file paths and their file sizes.
+ */
+ private void doValidateSetupGenDC(RecordReader<LongWritable, BytesWritable>
+ reader, FileSystem fs, long[] sortedFileSizes)
+ throws IOException, InterruptedException {
+
+ // Validate permissions of dist cache directory
+ Path distCacheDir = dce.getDistributedCacheDir();
+ assertEquals("Wrong permissions for distributed cache dir " + distCacheDir,
+ fs.getFileStatus(distCacheDir).getPermission()
+ .getOtherAction().and(FsAction.EXECUTE), FsAction.EXECUTE);
+
+ // Validate the content of the sequence file generated by
+ // dce.setupGenerateDistCacheData().
+ LongWritable key = new LongWritable();
+ BytesWritable val = new BytesWritable();
+ for (int i = 0; i < sortedFileSizes.length; i++) {
+ assertTrue("Number of files written to the sequence file by "
+ + "setupGenerateDistCacheData is less than the expected.",
+ reader.nextKeyValue());
+ key = reader.getCurrentKey();
+ val = reader.getCurrentValue();
+ long fileSize = key.get();
+ String file = new String(val.getBytes(), 0, val.getLength());
+
+ // Dist Cache files should be sorted based on file size.
+ assertEquals("Dist cache file size is wrong.",
+ sortedFileSizes[i], fileSize);
+
+ // Validate dist cache file path.
+
+ // parent dir of dist cache file
+ Path parent = new Path(file).getParent().makeQualified(fs);
+ // should exist in dist cache dir
+ assertTrue("Public dist cache file path is wrong.",
+ distCacheDir.equals(parent));
+ }
+ }
+
+ /**
+ * Test if DistributedCacheEmulator's setup of GenerateDistCacheData is
+ * working as expected.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testSetupGenerateDistCacheData()
+ throws IOException, InterruptedException {
+ long[] sortedFileSizes = new long[5];
+ JobConf jobConf = runSetupGenerateDistCacheData(true, sortedFileSizes);
+ validateSetupGenDC(jobConf, sortedFileSizes);
+
+ // Verify if correct exit code is seen when -generate option is missing and
+ // distributed cache files are missing in the expected path.
+ runSetupGenerateDistCacheData(false, sortedFileSizes);
+ }
+
+ /**
+ * Create DistributedCacheEmulator object and do the initialization by
+ * calling init() on it with dummy trace. Also configure the pseudo local FS.
+ */
+ private DistributedCacheEmulator createDistributedCacheEmulator(
+ Configuration conf, Path ioPath, boolean generate) throws IOException {
+ DistributedCacheEmulator dce =
+ new DistributedCacheEmulator(conf, ioPath);
+ JobCreator jobCreator = JobCreator.getPolicy(conf, JobCreator.LOADJOB);
+ jobCreator.setDistCacheEmulator(dce);
+ dce.init("dummytrace", jobCreator, generate);
+ return dce;
+ }
+
+ /**
+ * Test the configuration property for disabling/enabling emulation of
+ * distributed cache load.
+ */
+ @Test
+ public void testDistCacheEmulationConfigurability() throws IOException {
+ Configuration conf = new Configuration();
+ JobConf jobConf = GridmixTestUtils.mrCluster.createJobConf(
+ new JobConf(conf));
+ Path ioPath = new Path("testDistCacheEmulationConfigurability")
+ .makeQualified(GridmixTestUtils.dfs);
+ FileSystem fs = FileSystem.get(jobConf);
+ FileSystem.mkdirs(fs, ioPath, new FsPermission((short)0777));
+
+ // default config
+ dce = createDistributedCacheEmulator(jobConf, ioPath, false);
+ assertTrue("Default configuration of "
+ + DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE
+ + " is wrong.", dce.shouldEmulateDistCacheLoad());
+
+ // config property set to false
+ jobConf.setBoolean(
+ DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE, false);
+ dce = createDistributedCacheEmulator(jobConf, ioPath, false);
+ assertFalse("Disabling of emulation of distributed cache load by setting "
+ + DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE
+ + " to false is not working.", dce.shouldEmulateDistCacheLoad());
+ }
+
+ /**
+ * Verify if DistributedCacheEmulator can configure distributed cache files
+ * for simulated job if job conf from trace had no dist cache files.
+ * @param conf configuration for the simulated job to be run
+ * @param jobConf job configuration of original cluster's job, obtained from
+ * trace
+ * @throws IOException
+ */
+ private void validateJobConfWithOutDCFiles(Configuration conf,
+ JobConf jobConf) throws IOException {
+ // Validate if Gridmix can configure dist cache files properly if there are
+ // no HDFS-based dist cache files and localFS-based dist cache files in
+ // trace for a job.
+ dce.configureDistCacheFiles(conf, jobConf);
+ assertNull("Distributed cache files configured by GridMix is wrong.",
+ conf.get(DistributedCache.CACHE_FILES));
+ assertNull("Distributed cache files configured by Gridmix through -files "
+ + "option is wrong.", conf.get("tmpfiles"));
+ }
+
+ /**
+ * Verify if DistributedCacheEmulator can configure distributed cache files
+ * for simulated job if job conf from trace had HDFS-based dist cache files
+ * and local-FS-based dist cache files.
+ * @param conf configuration for the simulated job to be run
+ * @param jobConf job configuration of original cluster's job, obtained from
+ * trace
+ * @throws IOException
+ */
+ private void validateJobConfWithDCFiles(Configuration conf,
+ JobConf jobConf) throws IOException {
+ long[] sortedFileSizes = configureDummyDistCacheFiles(jobConf);
+
+ // 1 local FS based dist cache file and 5 HDFS based dist cache files. So
+ // total expected dist cache files count is 6.
+ assertEquals("Gridmix is not able to extract dist cache file sizes.",
+ 6, jobConf.getStrings(DistributedCache.CACHE_FILES_SIZES).length);
+ assertEquals("Gridmix is not able to extract dist cache file visibilities.",
+ 6, jobConf.getStrings(
+ JobContext.CACHE_FILE_VISIBILITIES).length);
+
+ dce.configureDistCacheFiles(conf, jobConf);
+
+ assertEquals("Configuring of HDFS-based dist cache files by gridmix is "
+ + "wrong.", sortedFileSizes.length,
+ conf.getStrings(DistributedCache.CACHE_FILES).length);
+ assertEquals("Configuring of local-FS-based dist cache files by gridmix is "
+ + "wrong.", 1, conf.getStrings("tmpfiles").length);
+ }
+
+ /**
+ * Verify if configureDistCacheFiles() works fine when there are distributed
+ * cache files set but visibilities are not set. This is to handle history
+ * traces of older hadoop version where there are no private/public
+ * Distributed Caches.
+ * @throws IOException
+ */
+ private void validateWithOutVisibilities() throws IOException {
+ Configuration conf = new Configuration();// configuration for simulated job
+ JobConf jobConf = new JobConf();
+ String user = "user1";
+ jobConf.setUser(user);
+ String[] files = {"/tmp/hdfs1.txt", "/tmp/"+ user + "/.staging/file1"};
+ jobConf.setStrings(DistributedCache.CACHE_FILES, files);
+ jobConf.setStrings(DistributedCache.CACHE_FILES_SIZES, "12,200");
+ jobConf.setStrings(DistributedCache.CACHE_FILES_TIMESTAMPS, "56789,98345");
+ dce.configureDistCacheFiles(conf, jobConf);
+ assertEquals("Configuring of HDFS-based dist cache files by gridmix is "
+ + "wrong.", files.length,
+ conf.getStrings(DistributedCache.CACHE_FILES).length);
+ assertNull("Configuring of local-FS-based dist cache files by gridmix is "
+ + "wrong.", conf.get("tmpfiles"));
+ }
+
+ /**
+ * Test if Gridmix can configure config properties related to Distributed
+ * Cache properly.
+ * @throws IOException
+ */
+ @Test
+ public void testDistCacheFilesConfiguration() throws IOException {
+ Configuration conf = new Configuration();
+ JobConf jobConf = GridmixTestUtils.mrCluster.createJobConf(
+ new JobConf(conf));
+ Path ioPath = new Path("testDistCacheEmulationConfigurability")
+ .makeQualified(GridmixTestUtils.dfs);
+ FileSystem fs = FileSystem.get(jobConf);
+ FileSystem.mkdirs(fs, ioPath, new FsPermission((short)0777));
+
+ // default config
+ dce = createDistributedCacheEmulator(jobConf, ioPath, false);
+ assertTrue("Default configuration of "
+ + DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE
+ + " is wrong.", dce.shouldEmulateDistCacheLoad());
+
+ // Validate if DistributedCacheEmulator can handle a JobStory with out
+ // Distributed Cache files properly.
+ validateJobConfWithOutDCFiles(conf, jobConf);
+
+ // Validate if Gridmix can configure dist cache files properly if there are
+ // HDFS-based dist cache files and localFS-based dist cache files in trace
+ // for a job.
+ validateJobConfWithDCFiles(conf, jobConf);
+
+ // Use new JobConf as JobStory conf and check if configureDistCacheFiles()
+ // doesn't throw NPE when there are dist cache files set but visibilities
+ // are not set.
+ validateWithOutVisibilities();
+ }
+}
Added: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixMemoryEmulation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixMemoryEmulation.java?rev=1185694&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixMemoryEmulation.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixMemoryEmulation.java Tue Oct 18 14:45:48 2011
@@ -0,0 +1,453 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.DummyResourceCalculatorPlugin;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.gridmix.DebugJobProducer.MockJob;
+import org.apache.hadoop.mapred.gridmix.TestHighRamJob.DummyGridmixJob;
+import org.apache.hadoop.mapred.gridmix.TestResourceUsageEmulators.FakeProgressive;
+import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.TotalHeapUsageEmulatorPlugin;
+import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.TotalHeapUsageEmulatorPlugin.DefaultHeapUsageEmulator;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.ResourceCalculatorPlugin;
+import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
+
+/**
+ * Test Gridmix memory emulation.
+ */
+public class TestGridmixMemoryEmulation {
+ /**
+ * This is a dummy class that fakes heap usage.
+ */
+ private static class FakeHeapUsageEmulatorCore
+ extends DefaultHeapUsageEmulator {
+ private int numCalls = 0;
+
+ @Override
+ public void load(long sizeInMB) {
+ ++numCalls;
+ super.load(sizeInMB);
+ }
+
+ // Get the total number of times load() was invoked
+ int getNumCalls() {
+ return numCalls;
+ }
+
+ // Get the total number of 1mb objects stored within
+ long getHeapUsageInMB() {
+ return heapSpace.size();
+ }
+
+ @Override
+ public void reset() {
+ // no op to stop emulate() from resetting
+ }
+
+ /**
+ * For re-testing purpose.
+ */
+ void resetFake() {
+ numCalls = 0;
+ super.reset();
+ }
+ }
+
+ /**
+ * This is a dummy class that fakes the heap usage emulator plugin.
+ */
+ private static class FakeHeapUsageEmulatorPlugin
+ extends TotalHeapUsageEmulatorPlugin {
+ private FakeHeapUsageEmulatorCore core;
+
+ public FakeHeapUsageEmulatorPlugin(FakeHeapUsageEmulatorCore core) {
+ super(core);
+ this.core = core;
+ }
+
+ @Override
+ protected long getMaxHeapUsageInMB() {
+ return Long.MAX_VALUE / ONE_MB;
+ }
+
+ @Override
+ protected long getTotalHeapUsageInMB() {
+ return core.getHeapUsageInMB();
+ }
+ }
+
+ /**
+ * Test {@link TotalHeapUsageEmulatorPlugin}'s core heap usage emulation
+ * engine.
+ */
+ @Test
+ public void testHeapUsageEmulator() throws IOException {
+ FakeHeapUsageEmulatorCore heapEmulator = new FakeHeapUsageEmulatorCore();
+
+ long testSizeInMB = 10; // 10 mb
+ long previousHeap = heapEmulator.getHeapUsageInMB();
+ heapEmulator.load(testSizeInMB);
+ long currentHeap = heapEmulator.getHeapUsageInMB();
+
+ // check if the heap has increased by expected value
+ assertEquals("Default heap emulator failed to load 10mb",
+ previousHeap + testSizeInMB, currentHeap);
+
+ // test reset
+ heapEmulator.resetFake();
+ assertEquals("Default heap emulator failed to reset",
+ 0, heapEmulator.getHeapUsageInMB());
+ }
+
+ /**
+ * Test {@link TotalHeapUsageEmulatorPlugin}.
+ */
+ @Test
+ public void testTotalHeapUsageEmulatorPlugin() throws Exception {
+ Configuration conf = new Configuration();
+ // set the dummy resource calculator for testing
+ ResourceCalculatorPlugin monitor = new DummyResourceCalculatorPlugin();
+ long maxHeapUsage = 1024 * TotalHeapUsageEmulatorPlugin.ONE_MB; // 1GB
+ conf.setLong(DummyResourceCalculatorPlugin.MAXPMEM_TESTING_PROPERTY,
+ maxHeapUsage);
+ monitor.setConf(conf);
+
+ // no buffer to be reserved
+ conf.setFloat(TotalHeapUsageEmulatorPlugin.MIN_HEAP_FREE_RATIO, 0F);
+ // only 1 call to be made per cycle
+ conf.setFloat(TotalHeapUsageEmulatorPlugin.HEAP_LOAD_RATIO, 1F);
+ long targetHeapUsageInMB = 200; // 200mb
+
+ // fake progress indicator
+ FakeProgressive fakeProgress = new FakeProgressive();
+
+ // fake heap usage generator
+ FakeHeapUsageEmulatorCore fakeCore = new FakeHeapUsageEmulatorCore();
+
+ // a heap usage emulator with fake core
+ FakeHeapUsageEmulatorPlugin heapPlugin =
+ new FakeHeapUsageEmulatorPlugin(fakeCore);
+
+ // test with invalid or missing resource usage value
+ ResourceUsageMetrics invalidUsage =
+ TestResourceUsageEmulators.createMetrics(0);
+ heapPlugin.initialize(conf, invalidUsage, null, null);
+
+ // test if disabled heap emulation plugin's emulate() call is a no-operation
+ // this will test if the emulation plugin is disabled or not
+ int numCallsPre = fakeCore.getNumCalls();
+ long heapUsagePre = fakeCore.getHeapUsageInMB();
+ heapPlugin.emulate();
+ int numCallsPost = fakeCore.getNumCalls();
+ long heapUsagePost = fakeCore.getHeapUsageInMB();
+
+ // test if no calls are made heap usage emulator core
+ assertEquals("Disabled heap usage emulation plugin works!",
+ numCallsPre, numCallsPost);
+ // test if no calls are made heap usage emulator core
+ assertEquals("Disabled heap usage emulation plugin works!",
+ heapUsagePre, heapUsagePost);
+
+ // test with wrong/invalid configuration
+ Boolean failed = null;
+ invalidUsage =
+ TestResourceUsageEmulators.createMetrics(maxHeapUsage
+ + TotalHeapUsageEmulatorPlugin.ONE_MB);
+ try {
+ heapPlugin.initialize(conf, invalidUsage, monitor, null);
+ failed = false;
+ } catch (Exception e) {
+ failed = true;
+ }
+ assertNotNull("Fail case failure!", failed);
+ assertTrue("Expected failure!", failed);
+
+ // test with valid resource usage value
+ ResourceUsageMetrics metrics =
+ TestResourceUsageEmulators.createMetrics(targetHeapUsageInMB
+ * TotalHeapUsageEmulatorPlugin.ONE_MB);
+
+ // test with default emulation interval
+ // in every interval, the emulator will add 100% of the expected usage
+ // (since gridmix.emulators.resource-usage.heap.load-ratio=1)
+ // so at 10%, emulator will add 10% (difference), at 20% it will add 10% ...
+ // So to emulate 200MB, it will add
+ // 20mb + 20mb + 20mb + 20mb + .. = 200mb
+ testEmulationAccuracy(conf, fakeCore, monitor, metrics, heapPlugin, 200,
+ 10);
+
+ // test with custom value for emulation interval of 20%
+ conf.setFloat(TotalHeapUsageEmulatorPlugin.HEAP_EMULATION_PROGRESS_INTERVAL,
+ 0.2F);
+ // 40mb + 40mb + 40mb + 40mb + 40mb = 200mb
+ testEmulationAccuracy(conf, fakeCore, monitor, metrics, heapPlugin, 200, 5);
+
+ // test with custom value of free heap ratio and load ratio = 1
+ conf.setFloat(TotalHeapUsageEmulatorPlugin.HEAP_LOAD_RATIO, 1F);
+ conf.setFloat(TotalHeapUsageEmulatorPlugin.MIN_HEAP_FREE_RATIO, 0.5F);
+ // 40mb + 0mb + 80mb + 0mb + 0mb = 120mb
+ testEmulationAccuracy(conf, fakeCore, monitor, metrics, heapPlugin, 120, 2);
+
+ // test with custom value of heap load ratio and min free heap ratio = 0
+ conf.setFloat(TotalHeapUsageEmulatorPlugin.HEAP_LOAD_RATIO, 0.5F);
+ conf.setFloat(TotalHeapUsageEmulatorPlugin.MIN_HEAP_FREE_RATIO, 0F);
+ // 20mb (call#1) + 20mb (call#1) + 20mb (call#2) + 20mb (call#2) +.. = 200mb
+ testEmulationAccuracy(conf, fakeCore, monitor, metrics, heapPlugin, 200,
+ 10);
+
+ // test with custom value of free heap ratio = 0.3 and load ratio = 0.5
+ conf.setFloat(TotalHeapUsageEmulatorPlugin.MIN_HEAP_FREE_RATIO, 0.25F);
+ conf.setFloat(TotalHeapUsageEmulatorPlugin.HEAP_LOAD_RATIO, 0.5F);
+ // 20mb (call#1) + 20mb (call#1) + 30mb (call#2) + 0mb (call#2)
+ // + 30mb (call#3) + 0mb (call#3) + 35mb (call#4) + 0mb (call#4)
+ // + 37mb (call#5) + 0mb (call#5) = 162mb
+ testEmulationAccuracy(conf, fakeCore, monitor, metrics, heapPlugin, 162, 6);
+
+ // test if emulation interval boundary is respected
+ fakeProgress = new FakeProgressive(); // initialize
+ conf.setFloat(TotalHeapUsageEmulatorPlugin.MIN_HEAP_FREE_RATIO, 0F);
+ conf.setFloat(TotalHeapUsageEmulatorPlugin.HEAP_LOAD_RATIO, 1F);
+ conf.setFloat(TotalHeapUsageEmulatorPlugin.HEAP_EMULATION_PROGRESS_INTERVAL,
+ 0.25F);
+ heapPlugin.initialize(conf, metrics, monitor, fakeProgress);
+ fakeCore.resetFake();
+ // take a snapshot after the initialization
+ long initHeapUsage = fakeCore.getHeapUsageInMB();
+ long initNumCallsUsage = fakeCore.getNumCalls();
+ // test with 0 progress
+ testEmulationBoundary(0F, fakeCore, fakeProgress, heapPlugin, initHeapUsage,
+ initNumCallsUsage, "[no-op, 0 progress]");
+ // test with 24% progress
+ testEmulationBoundary(0.24F, fakeCore, fakeProgress, heapPlugin,
+ initHeapUsage, initNumCallsUsage,
+ "[no-op, 24% progress]");
+ // test with 25% progress
+ testEmulationBoundary(0.25F, fakeCore, fakeProgress, heapPlugin,
+ targetHeapUsageInMB / 4, 1, "[op, 25% progress]");
+ // test with 80% progress
+ testEmulationBoundary(0.80F, fakeCore, fakeProgress, heapPlugin,
+ (targetHeapUsageInMB * 4) / 5, 2, "[op, 80% progress]");
+
+ // now test if the final call with 100% progress ramps up the heap usage
+ testEmulationBoundary(1F, fakeCore, fakeProgress, heapPlugin,
+ targetHeapUsageInMB, 3, "[op, 100% progress]");
+ }
+
+ // test whether the heap usage emulator achieves the desired target using
+ // desired calls to the underling core engine.
+ private static void testEmulationAccuracy(Configuration conf,
+ FakeHeapUsageEmulatorCore fakeCore,
+ ResourceCalculatorPlugin monitor,
+ ResourceUsageMetrics metrics,
+ TotalHeapUsageEmulatorPlugin heapPlugin,
+ long expectedTotalHeapUsageInMB,
+ long expectedTotalNumCalls)
+ throws Exception {
+ FakeProgressive fakeProgress = new FakeProgressive();
+ fakeCore.resetFake();
+ heapPlugin.initialize(conf, metrics, monitor, fakeProgress);
+ int numLoops = 0;
+ while (fakeProgress.getProgress() < 1) {
+ ++numLoops;
+ float progress = numLoops / 100.0F;
+ fakeProgress.setProgress(progress);
+ heapPlugin.emulate();
+ }
+
+ // test if the resource plugin shows the expected usage
+ assertEquals("Cumulative heap usage emulator plugin failed (total usage)!",
+ expectedTotalHeapUsageInMB, fakeCore.getHeapUsageInMB(), 1L);
+ // test if the resource plugin shows the expected num calls
+ assertEquals("Cumulative heap usage emulator plugin failed (num calls)!",
+ expectedTotalNumCalls, fakeCore.getNumCalls(), 0L);
+ }
+
+ // tests if the heap usage emulation plugin emulates only at the expected
+ // progress gaps
+ private static void testEmulationBoundary(float progress,
+ FakeHeapUsageEmulatorCore fakeCore, FakeProgressive fakeProgress,
+ TotalHeapUsageEmulatorPlugin heapPlugin, long expectedTotalHeapUsageInMB,
+ long expectedTotalNumCalls, String info) throws Exception {
+ fakeProgress.setProgress(progress);
+ heapPlugin.emulate();
+ // test heap usage
+ assertEquals("Emulation interval test for heap usage failed " + info + "!",
+ expectedTotalHeapUsageInMB, fakeCore.getHeapUsageInMB(), 0L);
+ // test num calls
+ assertEquals("Emulation interval test for heap usage failed " + info + "!",
+ expectedTotalNumCalls, fakeCore.getNumCalls(), 0L);
+ }
+
+ /**
+ * Test the specified task java heap options.
+ */
+ @SuppressWarnings("deprecation")
+ private void testJavaHeapOptions(String mapOptions,
+ String reduceOptions, String taskOptions, String defaultMapOptions,
+ String defaultReduceOptions, String defaultTaskOptions,
+ String expectedMapOptions, String expectedReduceOptions,
+ String expectedTaskOptions) throws Exception {
+ Configuration simulatedConf = new Configuration(false);
+// // reset the configuration parameters
+// simulatedConf.set(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, "");
+// simulatedConf.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "");
+// simulatedConf.set(JobConf.MAPRED_TASK_JAVA_OPTS, "");
+
+ // set the default map task options
+ if (defaultMapOptions != null) {
+ simulatedConf.set(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, defaultMapOptions);
+ }
+ // set the default reduce task options
+ if (defaultReduceOptions != null) {
+ simulatedConf.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS,
+ defaultReduceOptions);
+ }
+ // set the default task options
+ if (defaultTaskOptions != null) {
+ simulatedConf.set(JobConf.MAPRED_TASK_JAVA_OPTS, defaultTaskOptions);
+ }
+
+ Configuration originalConf = new Configuration(false);
+// // reset the configuration parameters
+// originalConf.set(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, "");
+// originalConf.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "");
+// originalConf.set(JobConf.MAPRED_TASK_JAVA_OPTS, "");
+
+ // set the map task options
+ if (mapOptions != null) {
+ originalConf.set(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, mapOptions);
+ }
+ // set the reduce task options
+ if (reduceOptions != null) {
+ originalConf.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, reduceOptions);
+ }
+ // set the task options
+ if (taskOptions != null) {
+ originalConf.set(JobConf.MAPRED_TASK_JAVA_OPTS, taskOptions);
+ }
+
+ // configure the task jvm's heap options
+ GridmixJob.configureTaskJVMOptions(originalConf, simulatedConf);
+
+ assertEquals("Map heap options mismatch!", expectedMapOptions,
+ simulatedConf.get(JobConf.MAPRED_MAP_TASK_JAVA_OPTS));
+ assertEquals("Reduce heap options mismatch!", expectedReduceOptions,
+ simulatedConf.get(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS));
+ assertEquals("Task heap options mismatch!", expectedTaskOptions,
+ simulatedConf.get(JobConf.MAPRED_TASK_JAVA_OPTS));
+ }
+
+ /**
+ * Test task-level java heap options configuration in {@link GridmixJob}.
+ */
+ @Test
+ public void testJavaHeapOptions() throws Exception {
+ // test missing opts
+ testJavaHeapOptions(null, null, null, null, null, null, null, null,
+ null);
+
+ // test original heap opts and missing default opts
+ testJavaHeapOptions("-Xms10m", "-Xms20m", "-Xms30m", null, null, null,
+ null, null, null);
+
+ // test missing opts with default opts
+ testJavaHeapOptions(null, null, null, "-Xms10m", "-Xms20m", "-Xms30m",
+ "-Xms10m", "-Xms20m", "-Xms30m");
+
+ // test empty option
+ testJavaHeapOptions("", "", "", null, null, null, null, null, null);
+
+ // test empty default option and no original heap options
+ testJavaHeapOptions(null, null, null, "", "", "", "", "", "");
+
+ // test empty opts and default opts
+ testJavaHeapOptions("", "", "", "-Xmx10m -Xms1m", "-Xmx50m -Xms2m",
+ "-Xms2m -Xmx100m", "-Xmx10m -Xms1m", "-Xmx50m -Xms2m",
+ "-Xms2m -Xmx100m");
+
+ // test custom heap opts with no default opts
+ testJavaHeapOptions("-Xmx10m", "-Xmx20m", "-Xmx30m", null, null, null,
+ "-Xmx10m", "-Xmx20m", "-Xmx30m");
+
+ // test heap opts with default opts (multiple value)
+ testJavaHeapOptions("-Xms5m -Xmx200m", "-Xms15m -Xmx300m",
+ "-Xms25m -Xmx50m", "-XXabc", "-XXxyz", "-XXdef",
+ "-XXabc -Xmx200m", "-XXxyz -Xmx300m", "-XXdef -Xmx50m");
+
+ // test heap opts with default opts (duplication of -Xmx)
+ testJavaHeapOptions("-Xms5m -Xmx200m", "-Xms15m -Xmx300m",
+ "-Xms25m -Xmx50m", "-XXabc -Xmx500m", "-XXxyz -Xmx600m",
+ "-XXdef -Xmx700m", "-XXabc -Xmx200m", "-XXxyz -Xmx300m",
+ "-XXdef -Xmx50m");
+
+ // test heap opts with default opts (single value)
+ testJavaHeapOptions("-Xmx10m", "-Xmx20m", "-Xmx50m", "-Xms2m",
+ "-Xms3m", "-Xms5m", "-Xms2m -Xmx10m", "-Xms3m -Xmx20m",
+ "-Xms5m -Xmx50m");
+
+ // test heap opts with default opts (duplication of -Xmx)
+ testJavaHeapOptions("-Xmx10m", "-Xmx20m", "-Xmx50m", "-Xmx2m",
+ "-Xmx3m", "-Xmx5m", "-Xmx10m", "-Xmx20m", "-Xmx50m");
+ }
+
+ /**
+ * Test disabled task heap options configuration in {@link GridmixJob}.
+ */
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testJavaHeapOptionsDisabled() throws Exception {
+ Configuration gridmixConf = new Configuration();
+ gridmixConf.setBoolean(GridmixJob.GRIDMIX_TASK_JVM_OPTIONS_ENABLE, false);
+
+ // set the default values of simulated job
+ gridmixConf.set(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, "-Xmx1m");
+ gridmixConf.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "-Xmx2m");
+ gridmixConf.set(JobConf.MAPRED_TASK_JAVA_OPTS, "-Xmx3m");
+
+ // set the default map and reduce task options for original job
+ final JobConf originalConf = new JobConf();
+ originalConf.set(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, "-Xmx10m");
+ originalConf.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "-Xmx20m");
+ originalConf.set(JobConf.MAPRED_TASK_JAVA_OPTS, "-Xmx30m");
+
+ // define a mock job
+ MockJob story = new MockJob(originalConf) {
+ public JobConf getJobConf() {
+ return originalConf;
+ }
+ };
+
+ GridmixJob job = new DummyGridmixJob(gridmixConf, story);
+ Job simulatedJob = job.getJob();
+ Configuration simulatedConf = simulatedJob.getConfiguration();
+
+ assertEquals("Map heap options works when disabled!", "-Xmx1m",
+ simulatedConf.get(JobConf.MAPRED_MAP_TASK_JAVA_OPTS));
+ assertEquals("Reduce heap options works when disabled!", "-Xmx2m",
+ simulatedConf.get(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS));
+ assertEquals("Task heap options works when disabled!", "-Xmx3m",
+ simulatedConf.get(JobConf.MAPRED_TASK_JAVA_OPTS));
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java Tue Oct 18 14:45:48 2011
@@ -176,7 +176,8 @@ public class TestGridmixRecord {
a.setReduceOutputBytes(out_bytes);
final int min = WritableUtils.getVIntSize(in_rec)
+ WritableUtils.getVIntSize(out_rec)
- + WritableUtils.getVIntSize(out_bytes);
+ + WritableUtils.getVIntSize(out_bytes)
+ + WritableUtils.getVIntSize(0);
assertEquals(min + 2, a.fixedBytes()); // meta + vint min
final int size = r.nextInt(1024) + a.fixedBytes() + 1;
setSerialize(a, r.nextLong(), size, out);
@@ -207,7 +208,7 @@ public class TestGridmixRecord {
@Test
public void testKeySpec() throws Exception {
- final int min = 5;
+ final int min = 6;
final int max = 300;
final GridmixKey a = new GridmixKey(GridmixKey.REDUCE_SPEC, 1, 0L);
final GridmixKey b = new GridmixKey(GridmixKey.REDUCE_SPEC, 1, 0L);
Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Tue Oct 18 14:45:48 2011
@@ -23,6 +23,8 @@ import org.apache.commons.logging.impl.L
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.Counters;
@@ -34,6 +36,7 @@ import org.apache.hadoop.mapred.TaskRepo
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
import org.apache.hadoop.tools.rumen.TaskInfo;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Level;
@@ -41,13 +44,16 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.io.InputStream;
import java.io.IOException;
+import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.zip.GZIPInputStream;
import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_RECORDS;
import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_BYTES;
@@ -103,17 +109,10 @@ public class TestGridmixSubmission {
GridmixTestUtils.mrCluster.createJobConf());
for (Job job : succeeded) {
final String jobname = job.getJobName();
- if ("GRIDMIX_GENDATA".equals(jobname)) {
- if (!job.getConfiguration().getBoolean(
- GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE, true)) {
- assertEquals(
- " Improper queue for " + job.getJobName(),
- job.getConfiguration().get("mapred.job.queue.name"), "q1");
- } else {
- assertEquals(
- " Improper queue for " + job.getJobName(),
- job.getConfiguration().get("mapred.job.queue.name"), "default");
- }
+ final String jobName = job.getJobName();
+ Configuration conf = job.getConfiguration();
+ if (GenerateData.JOB_NAME.equals(jobName)) {
+ verifyQueue(conf, jobName);
final Path in = new Path("foo").makeQualified(GridmixTestUtils.dfs);
final Path out = new Path("/gridmix").makeQualified(GridmixTestUtils.dfs);
final ContentSummary generated = GridmixTestUtils.dfs.getContentSummary(in);
@@ -123,37 +122,55 @@ public class TestGridmixSubmission {
FileStatus[] outstat = GridmixTestUtils.dfs.listStatus(out);
assertEquals("Mismatched job count", NJOBS, outstat.length);
continue;
+ } else if (GenerateDistCacheData.JOB_NAME.equals(jobName)) {
+ verifyQueue(conf, jobName);
+ continue;
}
-
- if (!job.getConfiguration().getBoolean(
- GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE, true)) {
- assertEquals(" Improper queue for " + job.getJobName() + " " ,
- job.getConfiguration().get("mapred.job.queue.name"),"q1" );
+
+ if (!conf.getBoolean(
+ GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE, true)) {
+ assertEquals(" Improper queue for " + jobName + " " ,
+ conf.get("mapred.queue.name"), "q1" );
} else {
- assertEquals(
- " Improper queue for " + job.getJobName() + " ",
- job.getConfiguration().get("mapred.job.queue.name"), sub.get(
- job.getConfiguration().get(GridmixJob.ORIGNAME)).getQueueName());
+ assertEquals(" Improper queue for " + jobName + " ",
+ conf.get("mapred.queue.name"),
+ sub.get(conf.get(Gridmix.ORIGINAL_JOB_ID)).getQueueName());
}
- final JobStory spec =
- sub.get(job.getConfiguration().get(GridmixJob.ORIGNAME));
- assertNotNull("No spec for " + job.getJobName(), spec);
- assertNotNull("No counters for " + job.getJobName(), job.getCounters());
- final String specname = spec.getName();
- final FileStatus stat = GridmixTestUtils.dfs.getFileStatus(new Path(
- GridmixTestUtils.DEST, "" +
- Integer.valueOf(specname.substring(specname.length() - 5))));
- assertEquals("Wrong owner for " + job.getJobName(), spec.getUser(),
- stat.getOwner());
-
+ final String originalJobId = conf.get(Gridmix.ORIGINAL_JOB_ID);
+ final JobStory spec = sub.get(originalJobId);
+ assertNotNull("No spec for " + jobName, spec);
+ assertNotNull("No counters for " + jobName, job.getCounters());
+ final String originalJobName = spec.getName();
+ System.out.println("originalJobName=" + originalJobName
+ + ";GridmixJobName=" + jobName + ";originalJobID=" + originalJobId);
+ assertTrue("Original job name is wrong.", originalJobName.equals(
+ conf.get(Gridmix.ORIGINAL_JOB_NAME)));
+
+ // Gridmix job seqNum contains 6 digits
+ int seqNumLength = 6;
+ String jobSeqNum = new DecimalFormat("000000").format(
+ conf.getInt(GridmixJob.GRIDMIX_JOB_SEQ, -1));
+ // Original job name is of the format MOCKJOB<6 digit sequence number>
+ // because MockJob jobNames are of this format.
+ assertTrue(originalJobName.substring(
+ originalJobName.length() - seqNumLength).equals(jobSeqNum));
+
+ assertTrue("Gridmix job name is not in the expected format.",
+ jobName.equals(
+ GridmixJob.JOB_NAME_PREFIX + jobSeqNum));
+ final FileStatus stat =
+ GridmixTestUtils.dfs.getFileStatus(
+ new Path(GridmixTestUtils.DEST, "" + Integer.valueOf(jobSeqNum)));
+ assertEquals("Wrong owner for " + jobName, spec.getUser(),
+ stat.getOwner());
final int nMaps = spec.getNumberMaps();
final int nReds = spec.getNumberReduces();
// TODO Blocked by MAPREDUCE-118
if (true) return;
// TODO
- System.out.println(jobname + ": " + nMaps + "/" + nReds);
+ System.out.println(jobName + ": " + nMaps + "/" + nReds);
final TaskReport[] mReports =
client.getMapTaskReports(JobID.downgrade(job.getJobID()));
assertEquals("Mismatched map count", nMaps, mReports.length);
@@ -168,6 +185,18 @@ public class TestGridmixSubmission {
}
}
+ // Verify if correct job queue is used
+ private void verifyQueue(Configuration conf, String jobName) {
+ if (!conf.getBoolean(
+ GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE, true)) {
+ assertEquals(" Improper queue for " + jobName,
+ conf.get("mapred.job.queue.name"), "q1");
+ } else {
+ assertEquals(" Improper queue for " + jobName,
+ conf.get("mapred.job.queue.name"), "default");
+ }
+ }
+
public void check(final TaskType type, Job job, JobStory spec,
final TaskReport[] runTasks,
long extraInputBytes, int extraInputRecords,
@@ -325,19 +354,118 @@ public class TestGridmixSubmission {
}
}
+ /**
+ * Verifies that the given {@code JobStory} corresponds to the checked-in
+ * WordCount {@code JobStory}. The verification is effected via JUnit
+ * assertions.
+ *
+ * @param js the candidate JobStory.
+ */
+ private void verifyWordCountJobStory(JobStory js) {
+ assertNotNull("Null JobStory", js);
+ String expectedJobStory = "WordCount:johndoe:default:1285322645148:3:1";
+ String actualJobStory = js.getName() + ":" + js.getUser() + ":"
+ + js.getQueueName() + ":" + js.getSubmissionTime() + ":"
+ + js.getNumberMaps() + ":" + js.getNumberReduces();
+ assertEquals("Unexpected JobStory", expectedJobStory, actualJobStory);
+ }
+
+ /**
+ * Expands a file compressed using {@code gzip}.
+ *
+ * @param fs the {@code FileSystem} corresponding to the given
+ * file.
+ *
+ * @param in the path to the compressed file.
+ *
+ * @param out the path to the uncompressed output.
+ *
+ * @throws Exception if there was an error during the operation.
+ */
+ private void expandGzippedTrace(FileSystem fs, Path in, Path out)
+ throws Exception {
+ byte[] buff = new byte[4096];
+ GZIPInputStream gis = new GZIPInputStream(fs.open(in));
+ FSDataOutputStream fsdos = fs.create(out);
+ int numRead;
+ while ((numRead = gis.read(buff, 0, buff.length)) != -1) {
+ fsdos.write(buff, 0, numRead);
+ }
+ gis.close();
+ fsdos.close();
+ }
+
+ /**
+ * Tests the reading of traces in GridMix3. These traces are generated
+ * by Rumen and are in the JSON format. The traces can optionally be
+ * compressed and uncompressed traces can also be passed to GridMix3 via
+ * its standard input stream. The testing is effected via JUnit assertions.
+ *
+ * @throws Exception if there was an error.
+ */
+ @Test
+ public void testTraceReader() throws Exception {
+ Configuration conf = new Configuration();
+ FileSystem lfs = FileSystem.getLocal(conf);
+ Path rootInputDir = new Path(System.getProperty("src.test.data"));
+ rootInputDir
+ = rootInputDir.makeQualified(lfs.getUri(), lfs.getWorkingDirectory());
+ Path rootTempDir
+ = new Path(System.getProperty("test.build.data",
+ System.getProperty("java.io.tmpdir")), "testTraceReader");
+ rootTempDir
+ = rootTempDir.makeQualified(lfs.getUri(), lfs.getWorkingDirectory());
+ Path inputFile = new Path(rootInputDir, "wordcount.json.gz");
+ Path tempFile = new Path(rootTempDir, "gridmix3-wc.json");
+
+ InputStream origStdIn = System.in;
+ InputStream tmpIs = null;
+ try {
+ DebugGridmix dgm = new DebugGridmix();
+ JobStoryProducer jsp
+ = dgm.createJobStoryProducer(inputFile.toString(), conf);
+
+ System.out.println("Verifying JobStory from compressed trace...");
+ verifyWordCountJobStory(jsp.getNextJob());
+
+ expandGzippedTrace(lfs, inputFile, tempFile);
+ jsp = dgm.createJobStoryProducer(tempFile.toString(), conf);
+ System.out.println("Verifying JobStory from uncompressed trace...");
+ verifyWordCountJobStory(jsp.getNextJob());
+
+ tmpIs = lfs.open(tempFile);
+ System.setIn(tmpIs);
+ System.out.println("Verifying JobStory from trace in standard input...");
+ jsp = dgm.createJobStoryProducer("-", conf);
+ verifyWordCountJobStory(jsp.getNextJob());
+ } finally {
+ System.setIn(origStdIn);
+ if (tmpIs != null) {
+ tmpIs.close();
+ }
+ lfs.delete(rootTempDir, true);
+ }
+ }
+
@Test
public void testReplaySubmit() throws Exception {
policy = GridmixJobSubmissionPolicy.REPLAY;
System.out.println(" Replay started at " + System.currentTimeMillis());
- doSubmission(false);
+ doSubmission(false, false);
System.out.println(" Replay ended at " + System.currentTimeMillis());
+
+ System.out.println(" Replay started with default output path at time "
+ + System.currentTimeMillis());
+ doSubmission(false, true);
+ System.out.println(" Replay ended with default output path at time "
+ + System.currentTimeMillis());
}
@Test
public void testStressSubmit() throws Exception {
policy = GridmixJobSubmissionPolicy.STRESS;
System.out.println(" Stress started at " + System.currentTimeMillis());
- doSubmission(false);
+ doSubmission(false, false);
System.out.println(" Stress ended at " + System.currentTimeMillis());
}
@@ -346,7 +474,7 @@ public class TestGridmixSubmission {
policy = GridmixJobSubmissionPolicy.STRESS;
System.out.println(
" Stress with default q started at " + System.currentTimeMillis());
- doSubmission(true);
+ doSubmission(true, false);
System.out.println(
" Stress with default q ended at " + System.currentTimeMillis());
}
@@ -355,26 +483,39 @@ public class TestGridmixSubmission {
public void testSerialSubmit() throws Exception {
policy = GridmixJobSubmissionPolicy.SERIAL;
System.out.println("Serial started at " + System.currentTimeMillis());
- doSubmission(false);
+ doSubmission(false, false);
System.out.println("Serial ended at " + System.currentTimeMillis());
}
- private void doSubmission(boolean useDefaultQueue) throws Exception {
+ private void doSubmission(boolean useDefaultQueue,
+ boolean defaultOutputPath) throws Exception {
final Path in = new Path("foo").makeQualified(GridmixTestUtils.dfs);
final Path out = GridmixTestUtils.DEST.makeQualified(GridmixTestUtils.dfs);
final Path root = new Path("/user");
Configuration conf = null;
+
try{
- final String[] argv = {
- "-D" + FilePool.GRIDMIX_MIN_FILE + "=0",
- "-D" + Gridmix.GRIDMIX_OUT_DIR + "=" + out,
- "-D" + Gridmix.GRIDMIX_USR_RSV + "=" + EchoUserResolver.class.getName(),
- "-generate", String.valueOf(GENDATA) + "m",
- in.toString(),
- "-" // ignored by DebugGridmix
- };
- DebugGridmix client = new DebugGridmix();
- conf = new Configuration();
+ ArrayList<String> argsList = new ArrayList<String>();
+
+ argsList.add("-D" + FilePool.GRIDMIX_MIN_FILE + "=0");
+ argsList.add("-D" + Gridmix.GRIDMIX_USR_RSV + "="
+ + EchoUserResolver.class.getName());
+
+ // Set the config property gridmix.output.directory only if
+ // defaultOutputPath is false. If defaultOutputPath is true, then
+ // let us allow gridmix to use the path foo/gridmix/ as output dir.
+ if (!defaultOutputPath) {
+ argsList.add("-D" + Gridmix.GRIDMIX_OUT_DIR + "=" + out);
+ }
+ argsList.add("-generate");
+ argsList.add(String.valueOf(GENDATA) + "m");
+ argsList.add(in.toString());
+ argsList.add("-"); // ignored by DebugGridmix
+
+ String[] argv = argsList.toArray(new String[argsList.size()]);
+
+ DebugGridmix client = new DebugGridmix();
+ conf = new Configuration();
conf.setEnum(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY,policy);
if (useDefaultQueue) {
conf.setBoolean(GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE, false);
@@ -382,13 +523,13 @@ public class TestGridmixSubmission {
} else {
conf.setBoolean(GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE, true);
}
- conf = GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
- // allow synthetic users to create home directories
- GridmixTestUtils.dfs.mkdirs(root, new FsPermission((short)0777));
- GridmixTestUtils.dfs.setPermission(root, new FsPermission((short)0777));
- int res = ToolRunner.run(conf, client, argv);
- assertEquals("Client exited with nonzero status", 0, res);
- client.checkMonitor();
+ conf = GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
+ // allow synthetic users to create home directories
+ GridmixTestUtils.dfs.mkdirs(root, new FsPermission((short)0777));
+ GridmixTestUtils.dfs.setPermission(root, new FsPermission((short)0777));
+ int res = ToolRunner.run(conf, client, argv);
+ assertEquals("Client exited with nonzero status", 0, res);
+ client.checkMonitor();
} catch (Exception e) {
e.printStackTrace();
} finally {