You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2009/10/18 12:00:28 UTC

svn commit: r826392 - in /hadoop/mapreduce/trunk: ./ src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/

Author: cdouglas
Date: Sun Oct 18 10:00:27 2009
New Revision: 826392

URL: http://svn.apache.org/viewvc?rev=826392&view=rev
Log:
MAPREDUCE-1061. Add unit test validating byte specifications for gridmix jobs.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=826392&r1=826391&r2=826392&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Sun Oct 18 10:00:27 2009
@@ -776,3 +776,6 @@
 
     MAPREDUCE-1104. Initialize RecoveryManager in JobTracker cstr called by
     Mumak. (Hong Tang via cdouglas)
+
+    MAPREDUCE-1061. Add unit test validating byte specifications for gridmix
+    jobs. (cdouglas)

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=826392&r1=826391&r2=826392&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Sun Oct 18 10:00:27 2009
@@ -200,10 +200,7 @@
         }
         // scan input dir contents
         submitter.refreshFilePool();
-      } catch (IOException e) {
-        LOG.error("Startup failed", e);
-        if (factory != null) factory.abort(); // abort pipeline
-      } catch (InterruptedException e) {
+      } catch (Throwable e) {
         LOG.error("Startup failed", e);
         if (factory != null) factory.abort(); // abort pipeline
       } finally {
@@ -214,8 +211,10 @@
       if (factory != null) {
         // wait for input exhaustion
         factory.join();
-        if (null != factory.error()) {
-          throw factory.error();
+        final Throwable badTraceException = factory.error();
+        if (null != badTraceException) {
+          LOG.error("Error in trace", badTraceException);
+          throw new IOException("Error in trace", badTraceException);
         }
         // wait for pending tasks to be submitted
         submitter.shutdown();

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java?rev=826392&r1=826391&r2=826392&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java Sun Oct 18 10:00:27 2009
@@ -530,6 +530,16 @@
         }
       }
     }
+    @Override
+    protected void cleanup(Context context)
+        throws IOException, InterruptedException {
+      while (written < outBytes) {
+        final int len = (int) Math.min(outBytes - written, val.getCapacity());
+        fillBytes(val, len);
+        context.write(NullWritable.get(), val);
+        written += len;
+      }
+    }
   }
 
   static class GridmixRecordReader

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java?rev=826392&r1=826391&r2=826392&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java Sun Oct 18 10:00:27 2009
@@ -112,13 +112,13 @@
 
     public MockJob(Configuration conf) {
       this(conf.getInt(MIN_BYTES_IN, 1 << 20),
-           conf.getInt(VAR_BYTES_IN, 10 << 20),
+           conf.getInt(VAR_BYTES_IN, 5 << 20),
            conf.getInt(MIN_BYTES_OUT, 1 << 20),
-           conf.getInt(VAR_BYTES_OUT, 10 << 20),
+           conf.getInt(VAR_BYTES_OUT, 5 << 20),
            conf.getInt(MIN_REC_SIZE , 100),
            conf.getInt(VAR_REC_SIZE , 1 << 15),
-           conf.getInt(MAX_MAPS, 6),
-           conf.getInt(MAX_REDS, 4));
+           conf.getInt(MAX_MAPS, 5),
+           conf.getInt(MAX_REDS, 3));
     }
 
     public MockJob(int min_bytes_in, int var_bytes_in,
@@ -126,7 +126,7 @@
                    int min_rec_size, int var_rec_size,
                    int max_maps, int max_reds) {
       final Random r = new Random();
-      name = String.format("MOCKJOB%04d", seq.getAndIncrement());
+      name = String.format("MOCKJOB%05d", seq.getAndIncrement());
       submitTime = timestamp.addAndGet(TimeUnit.MILLISECONDS.convert(
             r.nextInt(10), TimeUnit.SECONDS));
       int iMapBTotal = 0, oMapBTotal = 0, iRedBTotal = 0, oRedBTotal = 0;

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java?rev=826392&r1=826391&r2=826392&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Sun Oct 18 10:00:27 2009
@@ -19,6 +19,9 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
@@ -30,12 +33,20 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.TaskInfo;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import static org.apache.hadoop.mapreduce.TaskCounter.*;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -64,9 +75,13 @@
   public static void initCluster() throws IOException {
     Configuration conf = new Configuration();
     conf.setBoolean(JTConfig.JT_RETIREJOBS, false);
+    conf.setInt(JTConfig.JT_RETIREJOB_CACHE_SIZE, 1000);
+    conf.setBoolean(JTConfig.JT_PERSIST_JOBSTATUS, true);
+    conf.setInt(JTConfig.JT_PERSIST_JOBSTATUS_HOURS, 1);
     dfsCluster = new MiniDFSCluster(conf, 3, true, null);
     dfs = dfsCluster.getFileSystem();
-    mrCluster = new MiniMRCluster(3, dfs.getUri().toString(), 1);
+    mrCluster = new MiniMRCluster(3, dfs.getUri().toString(), 1, null, null,
+        new JobConf(conf));
   }
 
   @AfterClass
@@ -81,6 +96,7 @@
 
   static class TestMonitor extends JobMonitor {
 
+    static final long SLOPBYTES = 5 * 1024;
     private final int expected;
     private final BlockingQueue<Job> retiredJobs;
 
@@ -90,9 +106,142 @@
       retiredJobs = new LinkedBlockingQueue<Job>();
     }
 
-    public void verify(ArrayList<JobStory> submitted) {
+    public void verify(ArrayList<JobStory> submitted) throws Exception {
       final ArrayList<Job> succeeded = new ArrayList<Job>();
       assertEquals("Bad job count", expected, retiredJobs.drainTo(succeeded));
+      final HashMap<String,JobStory> sub = new HashMap<String,JobStory>();
+      for (JobStory spec : submitted) {
+        sub.put(spec.getName(), spec);
+      }
+      for (Job job : succeeded) {
+        final String jobname = job.getJobName();
+        if ("GRIDMIX_GENDATA".equals(jobname)) {
+          final Path in = new Path("foo").makeQualified(dfs);
+          final Path out = new Path("/gridmix").makeQualified(dfs);
+          final ContentSummary generated = dfs.getContentSummary(in);
+          assertTrue("Mismatched data gen", // +/- 100k for logs
+              (GENDATA << 20) < generated.getLength() + GENSLOP ||
+              (GENDATA << 20) > generated.getLength() - GENSLOP);
+          FileStatus[] outstat = dfs.listStatus(out);
+          assertEquals("Mismatched job count", NJOBS, outstat.length);
+          continue;
+        }
+        final JobStory spec =
+          sub.get(job.getJobName().replace("GRIDMIX", "MOCKJOB"));
+        assertNotNull("No spec for " + job.getJobName(), spec);
+        assertNotNull("No counters for " + job.getJobName(), job.getCounters());
+
+        final int nMaps = spec.getNumberMaps();
+        final int nReds = spec.getNumberReduces();
+
+        System.out.println(jobname + ": " + nMaps + "/" + nReds);
+        final TaskReport[] mReports = job.getTaskReports(TaskType.MAP);
+        assertEquals("Mismatched map count", nMaps, mReports.length);
+        check(TaskType.MAP, job, spec, mReports,
+            0, 1, nReds * SLOPBYTES, nReds + 1);
+
+        final TaskReport[] rReports = job.getTaskReports(TaskType.REDUCE);
+        assertEquals("Mismatched reduce count", nReds, rReports.length);
+        check(TaskType.REDUCE, job, spec, rReports,
+            nMaps * SLOPBYTES, nMaps + 1, 0, 1);
+      }
+    }
+
+    public void check(final TaskType type, Job job, JobStory spec,
+          final TaskReport[] runTasks,
+          long extraInputBytes, int extraInputRecords,
+          long extraOutputBytes, int extraOutputRecords) throws Exception {
+
+      long[] runInputRecords = new long[runTasks.length];
+      long[] runInputBytes = new long[runTasks.length];
+      long[] runOutputRecords = new long[runTasks.length];
+      long[] runOutputBytes = new long[runTasks.length];
+      long[] specInputRecords = new long[runTasks.length];
+      long[] specInputBytes = new long[runTasks.length];
+      long[] specOutputRecords = new long[runTasks.length];
+      long[] specOutputBytes = new long[runTasks.length];
+
+      for (int i = 0; i < runTasks.length; ++i) {
+        final TaskInfo specInfo;
+        final Counters counters = runTasks[i].getTaskCounters();
+        switch (type) {
+          case MAP:
+             runInputBytes[i] = counters.findCounter("FileSystemCounters",
+                 "HDFS_BYTES_READ").getValue();
+             runInputRecords[i] =
+               (int)counters.findCounter(MAP_INPUT_RECORDS).getValue();
+             runOutputBytes[i] =
+               counters.findCounter(MAP_OUTPUT_BYTES).getValue();
+             runOutputRecords[i] =
+               (int)counters.findCounter(MAP_OUTPUT_RECORDS).getValue();
+
+            specInfo = spec.getTaskInfo(TaskType.MAP, i);
+            break;
+          case REDUCE:
+             runInputBytes[i] =
+               counters.findCounter(REDUCE_SHUFFLE_BYTES).getValue();
+             runInputRecords[i] =
+               (int)counters.findCounter(REDUCE_INPUT_RECORDS).getValue();
+             runOutputBytes[i] =
+               counters.findCounter("FileSystemCounters",
+                   "HDFS_BYTES_WRITTEN").getValue();
+             runOutputRecords[i] =
+               (int)counters.findCounter(REDUCE_OUTPUT_RECORDS).getValue();
+
+            specInfo = spec.getTaskInfo(TaskType.REDUCE, i);
+            break;
+          default:
+            specInfo = null;
+            fail("Unexpected type: " + type);
+        }
+        specInputBytes[i] = specInfo.getInputBytes();
+        specInputRecords[i] = specInfo.getInputRecords();
+        specOutputRecords[i] = specInfo.getOutputRecords();
+        specOutputBytes[i] = specInfo.getOutputBytes();
+        System.out.printf(type + " SPEC: %9d -> %9d :: %5d -> %5d\n",
+             specInputBytes[i], specOutputBytes[i],
+             specInputRecords[i], specOutputRecords[i]);
+        System.out.printf(type + " RUN:  %9d -> %9d :: %5d -> %5d\n",
+             runInputBytes[i], runOutputBytes[i],
+             runInputRecords[i], runOutputRecords[i]);
+      }
+
+      // Check input bytes
+      Arrays.sort(specInputBytes);
+      Arrays.sort(runInputBytes);
+      for (int i = 0; i < runTasks.length; ++i) {
+        assertTrue("Mismatched input bytes " +
+            specInputBytes[i] + "/" + runInputBytes[i],
+            runInputBytes[i] - specInputBytes[i] <= extraInputBytes);
+      }
+
+      // Check input records
+      Arrays.sort(specInputRecords);
+      Arrays.sort(runInputRecords);
+      for (int i = 0; i < runTasks.length; ++i) {
+        assertTrue("Mismatched input records " +
+            specInputRecords[i] + "/" + runInputRecords[i],
+            runInputRecords[i] - specInputRecords[i] <= extraInputRecords);
+      }
+
+      // Check output bytes
+      Arrays.sort(specOutputBytes);
+      Arrays.sort(runOutputBytes);
+      for (int i = 0; i < runTasks.length; ++i) {
+        assertTrue("Mismatched output bytes " +
+            specOutputBytes[i] + "/" + runOutputBytes[i],
+            runOutputBytes[i] - specOutputBytes[i] <= extraOutputBytes);
+      }
+
+      // Check output records
+      Arrays.sort(specOutputRecords);
+      Arrays.sort(runOutputRecords);
+      for (int i = 0; i < runTasks.length; ++i) {
+        assertTrue("Mismatched output records " +
+            specOutputRecords[i] + "/" + runOutputRecords[i],
+            runOutputRecords[i] - specOutputRecords[i] <= extraOutputRecords);
+      }
+
     }
 
     @Override
@@ -110,7 +259,7 @@
     private DebugJobFactory factory;
     private TestMonitor monitor;
 
-    public void checkMonitor() {
+    public void checkMonitor() throws Exception {
       monitor.verify(factory.getSubmitted());
     }
 
@@ -146,12 +295,6 @@
     int res = ToolRunner.run(conf, client, argv);
     assertEquals("Client exited with nonzero status", 0, res);
     client.checkMonitor();
-    final ContentSummary generated = dfs.getContentSummary(in);
-    assertTrue("Mismatched data gen", // +/- 100k for logs
-        (GENDATA << 20) < generated.getLength() + GENSLOP ||
-        (GENDATA << 20) > generated.getLength() - GENSLOP);
-    FileStatus[] outstat = dfs.listStatus(out);
-    assertEquals("Mismatched job count", NJOBS, outstat.length);
   }
 
 }