You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2011/03/08 07:02:35 UTC

svn commit: r1079270 - in /hadoop/mapreduce/branches/yahoo-merge/src: docs/src/documentation/content/xdocs/rumen.xml test/mapred/org/apache/hadoop/tools/rumen/TestRumenFolder.java tools/org/apache/hadoop/tools/rumen/Folder.java

Author: omalley
Date: Tue Mar  8 06:02:35 2011
New Revision: 1079270

URL: http://svn.apache.org/viewvc?rev=1079270&view=rev
Log:
commit 856e0b31712d11b0ad455cf8cec64ac767d64ec6
Author: Rajesh Balamohan <rb...@yahoo-inc.com>
Date:   Thu Feb 24 14:06:49 2011 +0530

    : Feature to instruct rumen-folder utility to skip jobs worth of specific duration

Modified:
    hadoop/mapreduce/branches/yahoo-merge/src/docs/src/documentation/content/xdocs/rumen.xml
    hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenFolder.java
    hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/Folder.java

Modified: hadoop/mapreduce/branches/yahoo-merge/src/docs/src/documentation/content/xdocs/rumen.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/docs/src/documentation/content/xdocs/rumen.xml?rev=1079270&r1=1079269&r2=1079270&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/docs/src/documentation/content/xdocs/rumen.xml (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/docs/src/documentation/content/xdocs/rumen.xml Tue Mar  8 06:02:35 2011
@@ -323,6 +323,17 @@
           </td>
         </tr>
         <tr>
+          <td><code>-starts-after</code></td>
+          <td>Specify the time in milliseconds relative to the begining of   
+	      trace, after which this utility should start considering the jobs 
+              from input trace.
+          </td>
+          <td>If this value is specified as 10000, Folder would ignore 
+              first 10000ms worth of jobs in the trace and 
+              start considering the rest of the jobs in the trace for folding.
+          </td>
+        </tr>
+        <tr>
           <td><code>-temp-directory</code></td>
           <td>Temporary directory for the Folder. By default the <strong>output
               folder's parent directory</strong> is used as the scratch space.

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenFolder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenFolder.java?rev=1079270&r1=1079269&r2=1079270&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenFolder.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenFolder.java Tue Mar  8 06:02:35 2011
@@ -71,6 +71,73 @@ public class TestRumenFolder {
     TestRumenFolder.<LoggedJob> jsonFileMatchesGold(conf, lfs, foldedTracePath,
         foldedGoldFile, LoggedJob.class, "trace");
   }
+  
+  @Test
+  public void testStartsAfterOption() throws Exception {
+    final Configuration conf = new Configuration();
+    final FileSystem lfs = FileSystem.getLocal(conf);
+
+    @SuppressWarnings("deprecation")
+    final Path rootInputDir =
+      new Path(System.getProperty("test.tools.input.dir", ""))
+            .makeQualified(lfs);
+    @SuppressWarnings("deprecation")
+    final Path rootTempDir =
+      new Path(System.getProperty("test.build.data", "/tmp"))
+            .makeQualified(lfs);
+
+    final Path rootInputFile = new Path(rootInputDir, "rumen/small-trace-test");
+    final Path tempDir = new Path(rootTempDir, "TestRumenJobTraces");
+    lfs.delete(tempDir, true);
+
+    final Path foldedTracePath = new Path(tempDir, "folded-trace.json");
+
+    final Path inputFile =
+      new Path(rootInputFile, "folder-input-trace.json.gz");
+
+    String[] args =
+       { "-input-cycle", "100S", "-output-duration", "300S",
+           "-skew-buffer-length", "1", "-seed", "100", "-starts-after", "30S", "-concentration", "2",
+           inputFile.toString(), foldedTracePath.toString() };
+
+    final Path foldedGoldFile =
+        new Path(rootInputFile, "goldFoldedTrace.json.gz");
+
+    Folder folder = new Folder();
+    int result = ToolRunner.run(folder, args);
+    assertEquals("Non-zero exit", 0, result);
+
+    TestRumenFolder.<LoggedJob> checkValidityAfterSkippingJobs(conf, lfs, foldedTracePath,
+        foldedGoldFile, LoggedJob.class, "trace", 30000);
+  }
+  
+  static private <T extends DeepCompare> void 
+  	checkValidityAfterSkippingJobs(Configuration conf, 
+  			FileSystem lfs, Path result, Path gold,
+      Class<? extends T> clazz, String fileDescription, 
+      long startsAfter) throws IOException {
+  	JsonObjectMapperParser<T> goldParser =
+      new JsonObjectMapperParser<T>(gold, clazz, conf);
+    InputStream resultStream = lfs.open(result);
+    JsonObjectMapperParser<T> resultParser =
+        new JsonObjectMapperParser<T>(resultStream, clazz);
+    try {
+      long submitTime = ((LoggedJob)goldParser.getNext()).getSubmitTime();
+      long target = submitTime + startsAfter;
+      while(true) {
+        DeepCompare resultJob = resultParser.getNext();
+        if (resultJob == null) {
+          break;
+        }
+        LoggedJob job = (LoggedJob) resultJob;
+        assertTrue("job's submit time in the output trace is less " +
+          "than the specified value of starts-after", 
+          (job.getSubmitTime() >= target));
+      }
+    } finally {
+      IOUtils.cleanup(null, goldParser, resultParser);
+    }
+  }
 
   static private <T extends DeepCompare> void jsonFileMatchesGold(
       Configuration conf, FileSystem lfs, Path result, Path gold,

Modified: hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/Folder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/Folder.java?rev=1079270&r1=1079269&r2=1079270&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/Folder.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/Folder.java Tue Mar  8 06:02:35 2011
@@ -61,6 +61,7 @@ public class Folder extends Configured i
   private boolean debug = false;
   private boolean allowMissorting = false;
   private int skewBufferLength = 0;
+  private long startsAfter = -1;
 
   static final private Log LOG = LogFactory.getLog(Folder.class);
 
@@ -130,8 +131,9 @@ public class Folder extends Configured i
 
     for (int i = 0; i < args.length; ++i) {
       String thisArg = args[i];
-
-      if (thisArg.equalsIgnoreCase("-output-duration")) {
+      if (thisArg.equalsIgnoreCase("-starts-after")) {
+        startsAfter = parseDuration(args[++i]);
+      } else if (thisArg.equalsIgnoreCase("-output-duration")) {
         outputDuration = parseDuration(args[++i]);
       } else if (thisArg.equalsIgnoreCase("-input-cycle")) {
         inputCycle = parseDuration(args[++i]);
@@ -274,6 +276,29 @@ public class Folder extends Configured i
 
         return EMPTY_JOB_TRACE;
       }
+      
+      // If starts-after time is specified, skip the number of jobs till we reach
+      // the starting time limit.
+      if (startsAfter > 0) {
+        LOG.info("starts-after time is specified. Initial job submit time : " + job.getSubmitTime());
+
+        long approximateTime = job.getSubmitTime() + startsAfter;
+        job = reader.nextJob();
+        long skippedCount = 0;
+        while (job != null && job.getSubmitTime() < approximateTime) {
+          job = reader.nextJob();
+          skippedCount++;
+        }
+
+        LOG.debug("Skipped the initial " + startsAfter+ " ms jobs. Total number of skipped Jobs : " 
+            + skippedCount);
+
+        if (job == null) {
+          LOG.error("No more jobs to process in the trace..May be you skipped too many jobs..");
+          return EMPTY_JOB_TRACE;
+        }
+        LOG.info("The first job has a submit time of " + job.getSubmitTime());
+      }
 
       firstJobSubmitTime = job.getSubmitTime();
       long lastJobSubmitTime = firstJobSubmitTime;