You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:23:01 UTC

svn commit: r1077515 [1/4] - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/org/apache/hadoop/mapreduce/ test/org/apache/hadoop/tools/rumen/ test/tools/data/rumen/small-trace-test/ test/tools/data/rumen/small-trace-test/counters-fo...

Author: omalley
Date: Fri Mar  4 04:22:59 2011
New Revision: 1077515

URL: http://svn.apache.org/viewvc?rev=1077515&view=rev
Log:
commit bcb73ee3319252d3650eecc249551de89348dbc9
Author: Hong Tang <ht...@yahoo-inc.com>
Date:   Fri Jun 25 14:21:29 2010 -0700

    MAPREDUCE-1309. Rumen refactory. From https://issues.apache.org/jira/secure/attachment/12447992/rumen-yhadoop-20.patch. (htang)
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1309. Rumen refactory. (htang)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/ConcatenatedInputFilesDemuxer.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRandomSeedGenerator.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/
    hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_40864_conf.xml
    hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_40864_job_name-DAILY%2F20100210%5D.gz   (with props)
    hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_50510_conf.xml
    hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_50510_job_name-DAILY%2F20100208%5D.gz   (with props)
    hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-test-trace.json.gz   (with props)
    hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/dispatch-sample-v20-jt-log.gz   (with props)
    hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/dispatch-topology-output.json.gz   (with props)
    hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/dispatch-trace-output.json.gz   (with props)
    hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/folder-input-trace.json.gz   (with props)
    hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/goldFoldedTrace.json.gz   (with props)
    hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/sample-conf.file.xml
    hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/v20-single-input-log-event-classes.text.gz   (with props)
    hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/v20-single-input-log.gz   (with props)
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DefaultInputDemuxer.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DefaultOutputter.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DeskewedJobTraceReader.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/EventType.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HistoryEvent.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HistoryEventEmitter.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounter.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounterGroup.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounters.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobConfPropertyNames.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobConfigurationParser.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobFinishedEvent.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParser.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParserFactory.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobInfoChangeEvent.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobInitedEvent.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobPriorityChangeEvent.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobStatusChangedEvent.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobSubmittedEvent.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobUnsuccessfulCompletionEvent.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/MapAttemptFinishedEvent.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Outputter.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RandomSeedGenerator.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceAttemptFinishedEvent.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RewindableInputStream.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/SingleEventEmitter.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Task20LineHistoryEventEmitter.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptFinishedEvent.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptStartedEvent.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptUnsuccessfulCompletionEvent.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskFailedEvent.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskFinishedEvent.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskStartedEvent.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskUpdatedEvent.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TopologyBuilder.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Version20LogInterfaceUtils.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Counters.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Counters.java?rev=1077515&r1=1077514&r2=1077515&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Counters.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Counters.java Fri Mar  4 04:22:59 2011
@@ -25,7 +25,7 @@ public class Counters implements Writabl
   public Counters() {
   }
   
-  Counters(org.apache.hadoop.mapred.Counters counters) {
+  public Counters(org.apache.hadoop.mapred.Counters counters) {
     for(org.apache.hadoop.mapred.Counters.Group group: counters) {
       String name = group.getName();
       CounterGroup newGroup = new CounterGroup(name, group.getDisplayName());

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/ConcatenatedInputFilesDemuxer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/ConcatenatedInputFilesDemuxer.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/ConcatenatedInputFilesDemuxer.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/ConcatenatedInputFilesDemuxer.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+public class ConcatenatedInputFilesDemuxer implements InputDemuxer {
+  private String name;
+  private DelimitedInputStream input;
+
+  private String knownNextFileName = null;
+
+  static private int MAXIMUM_HEADER_LINE_LENGTH = 500;
+
+  @Override
+  public void bindTo(Path path, Configuration conf) throws IOException {
+    InputStream underlyingInput = null;
+
+    if (name != null) { // re-binding before the previous one was consumed.
+      close();
+    }
+    name = path.getName();
+
+    underlyingInput = new PossiblyDecompressedInputStream(path, conf);
+
+    input =
+        new DelimitedInputStream(new BufferedInputStream(underlyingInput),
+            "\f!!FILE=", "!!\n");
+
+    knownNextFileName = input.nextFileName();
+
+    if (knownNextFileName == null) {
+      close();
+
+      return;
+    }
+
+    /*
+     * We handle files in specialized formats by trying their demuxers first,
+     * not by failing here.
+     */
+    return;
+  }
+
+  @Override
+  public Pair<String, InputStream> getNext() throws IOException {
+    if (knownNextFileName != null) {
+      Pair<String, InputStream> result =
+          new Pair<String, InputStream>(knownNextFileName, input);
+
+      knownNextFileName = null;
+
+      return result;
+    }
+
+    String nextFileName = input.nextFileName();
+
+    if (nextFileName == null) {
+      return null;
+    }
+
+    return new Pair<String, InputStream>(nextFileName, input);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (input != null) {
+      input.close();
+    }
+  }
+
+  /**
+   * A simple wrapper class to make any input stream delimited. It has an extra
+   * method, getName.
+   * 
+   * The input stream should have lines that look like
+   * <marker><filename><endmarker> . The text <marker> should not occur
+   * elsewhere in the file. The text <endmarker> should not occur in a file
+   * name.
+   */
+  static class DelimitedInputStream extends InputStream {
+    private InputStream input;
+
+    private boolean endSeen = false;
+
+    private final String fileMarker;
+
+    private final byte[] markerBytes;
+
+    private final byte[] fileMarkerBuffer;
+
+    private final String fileEndMarker;
+
+    private final byte[] endMarkerBytes;
+
+    private final byte[] fileEndMarkerBuffer;
+
+    /**
+     * Constructor.
+     * 
+     * @param input
+     */
+    public DelimitedInputStream(InputStream input, String fileMarker,
+        String fileEndMarker) {
+      this.input = new BufferedInputStream(input, 10000);
+      this.input.mark(10000);
+      this.fileMarker = fileMarker;
+      this.markerBytes = this.fileMarker.getBytes();
+      this.fileMarkerBuffer = new byte[this.markerBytes.length];
+      this.fileEndMarker = fileEndMarker;
+      this.endMarkerBytes = this.fileEndMarker.getBytes();
+      this.fileEndMarkerBuffer = new byte[this.endMarkerBytes.length];
+    }
+
+    @Override
+    public int read() throws IOException {
+      if (endSeen) {
+        return -1;
+      }
+
+      input.mark(10000);
+
+      int result = input.read();
+
+      if (result < 0) {
+        endSeen = true;
+        return result;
+      }
+
+      if (result == markerBytes[0]) {
+        input.reset();
+
+        // this might be a marker line
+        int markerReadResult =
+            input.read(fileMarkerBuffer, 0, fileMarkerBuffer.length);
+
+        input.reset();
+
+        if (markerReadResult < fileMarkerBuffer.length
+            || !fileMarker.equals(new String(fileMarkerBuffer))) {
+          return input.read();
+        }
+
+        return -1;
+      }
+
+      return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.io.InputStream#read(byte[], int, int)
+     * 
+     * This does SLIGHTLY THE WRONG THING.
+     * 
+     * If we run off the end of the segment then the input buffer will be
+     * dirtied beyond the point where we claim to have read. If this turns out
+     * to be a problem, save that data somewhere and restore it if needed.
+     */
+    @Override
+    public int read(byte[] buffer, int offset, int length) throws IOException {
+      if (endSeen) {
+        return -1;
+      }
+
+      input.mark(length + markerBytes.length + 10);
+
+      int dataSeen = input.read(buffer, offset, length);
+
+      byte[] extraReadBuffer = null;
+      int extraActualRead = -1;
+
+      // search for an instance of a file marker
+      for (int i = offset; i < offset + dataSeen; ++i) {
+        if (buffer[i] == markerBytes[0]) {
+          boolean mismatch = false;
+
+          for (int j = 1; j < Math.min(markerBytes.length, offset + dataSeen
+              - i); ++j) {
+            if (buffer[i + j] != markerBytes[j]) {
+              mismatch = true;
+              break;
+            }
+          }
+
+          if (!mismatch) {
+            // see if we have only a prefix of the markerBytes
+            int uncheckedMarkerCharCount =
+                markerBytes.length - (offset + dataSeen - i);
+
+            if (uncheckedMarkerCharCount > 0) {
+              if (extraReadBuffer == null) {
+                extraReadBuffer = new byte[markerBytes.length - 1];
+
+                extraActualRead = input.read(extraReadBuffer);
+              }
+
+              if (extraActualRead < uncheckedMarkerCharCount) {
+                input.reset();
+                return input.read(buffer, offset, length);
+              }
+
+              for (int j = 0; j < uncheckedMarkerCharCount; ++j) {
+                if (extraReadBuffer[j] != markerBytes[markerBytes.length
+                    - uncheckedMarkerCharCount + j]) {
+                  input.reset();
+                  return input.read(buffer, offset, length);
+                }
+              }
+            }
+
+            input.reset();
+
+            if (i == offset) {
+              return -1;
+            }
+
+            int result = input.read(buffer, offset, i - offset);
+            return result;
+          }
+        }
+      }
+
+      return dataSeen;
+    }
+
+    @Override
+    public int read(byte[] buffer) throws IOException {
+      return read(buffer, 0, buffer.length);
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (endSeen) {
+        input.close();
+      }
+    }
+
+    String nextFileName() throws IOException {
+      return nextFileName(MAXIMUM_HEADER_LINE_LENGTH);
+    }
+
+    private String nextFileName(int bufferSize) throws IOException {
+      // the line can't contain a newline and must contain a form feed
+      byte[] buffer = new byte[bufferSize];
+
+      input.mark(bufferSize + 1);
+
+      int actualRead = input.read(buffer);
+      int mostRecentRead = actualRead;
+
+      while (actualRead < bufferSize && mostRecentRead > 0) {
+        mostRecentRead =
+            input.read(buffer, actualRead, bufferSize - actualRead);
+
+        if (mostRecentRead > 0) {
+          actualRead += mostRecentRead;
+        }
+      }
+
+      if (actualRead < markerBytes.length) {
+        input.reset();
+        return null;
+      }
+
+      for (int i = 0; i < markerBytes.length; ++i) {
+        if (markerBytes[i] != buffer[i]) {
+          input.reset();
+          return null;
+        }
+      }
+
+      for (int i = markerBytes.length; i < actualRead; ++i) {
+        if (buffer[i] == endMarkerBytes[0]) {
+          // gather the file name
+          input.reset();
+          // burn the marker
+          if (input.read(buffer, 0, markerBytes.length) < markerBytes.length) {
+            throw new IOException("Can't reread bytes I've read before.");
+          }
+          // get the file name
+          if (input.read(buffer, 0, i - markerBytes.length) < i
+              - markerBytes.length) {
+            throw new IOException("Can't reread bytes I've read before.");
+          }
+          // burn the two exclamation points and the newline
+          if (input.read(fileEndMarkerBuffer) < fileEndMarkerBuffer.length) {
+            input.reset();
+            return null;
+          }
+          for (int j = 0; j < endMarkerBytes.length; ++j) {
+            if (endMarkerBytes[j] != fileEndMarkerBuffer[j]) {
+              input.reset();
+              return null;
+            }
+          }
+
+          return new String(buffer, 0, i - markerBytes.length);
+        }
+
+        if (buffer[i] == '\n') {
+          return null;
+        }
+      }
+
+      // we ran off the end. Was the buffer too short, or is this all there was?
+      input.reset();
+
+      if (actualRead < bufferSize) {
+        return null;
+      }
+
+      return nextFileName(bufferSize * 2);
+    }
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRandomSeedGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRandomSeedGenerator.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRandomSeedGenerator.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRandomSeedGenerator.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.apache.hadoop.tools.rumen.RandomSeedGenerator.getSeed;
+
+public class TestRandomSeedGenerator {
+  @Test
+  public void testSeedGeneration() {
+    long masterSeed1 = 42;
+    long masterSeed2 = 43;
+    
+    assertTrue("Deterministic seeding",
+        getSeed("stream1", masterSeed1) == getSeed("stream1", masterSeed1));
+    assertTrue("Deterministic seeding",
+        getSeed("stream2", masterSeed2) == getSeed("stream2", masterSeed2));
+    assertTrue("Different streams", 
+        getSeed("stream1", masterSeed1) != getSeed("stream2", masterSeed1));
+    assertTrue("Different master seeds",
+        getSeed("stream1", masterSeed1) != getSeed("stream1", masterSeed2));
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java?rev=1077515&r1=1077514&r2=1077515&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java Fri Mar  4 04:22:59 2011
@@ -18,13 +18,29 @@
 
 package org.apache.hadoop.tools.rumen;
 
+import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.LineReader;
+import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
 import org.junit.Test;
@@ -49,11 +65,11 @@ public class TestRumenJobTraces {
     final FileSystem lfs = FileSystem.getLocal(conf);
 
     final Path rootInputDir =
-        new Path(System.getProperty("test.tools.input.dir", ""))
-            .makeQualified(lfs);
+        new Path(System.getProperty("test.tools.input.dir", "")).makeQualified(
+            lfs);
     final Path rootTempDir =
-        new Path(System.getProperty("test.build.data", "/tmp"))
-            .makeQualified(lfs);
+        new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+            lfs);
 
     final Path rootInputFile = new Path(rootInputDir, "rumen/small-trace-test");
     final Path tempDir = new Path(rootTempDir, "TestRumenJobTraces");
@@ -82,25 +98,355 @@ public class TestRumenJobTraces {
     final Path topologyGoldFile = new Path(rootInputFile, goldTopology);
     final Path traceGoldFile = new Path(rootInputFile, goldTrace);
 
+    @SuppressWarnings("deprecation")
     HadoopLogsAnalyzer analyzer = new HadoopLogsAnalyzer();
     int result = ToolRunner.run(analyzer, args);
     assertEquals("Non-zero exit", 0, result);
 
     TestRumenJobTraces
-        .<LoggedNetworkTopology> jsonFileMatchesGold(lfs, topologyFile,
+        .<LoggedNetworkTopology> jsonFileMatchesGold(conf, topologyFile,
+            topologyGoldFile, LoggedNetworkTopology.class, "topology");
+    TestRumenJobTraces.<LoggedJob> jsonFileMatchesGold(conf, traceFile,
+        traceGoldFile, LoggedJob.class, "trace");
+  }
+
+  @Test
+  public void testRumenViaDispatch() throws Exception {
+    final Configuration conf = new Configuration();
+    final FileSystem lfs = FileSystem.getLocal(conf);
+
+    final Path rootInputDir =
+        new Path(System.getProperty("test.tools.input.dir", "")).makeQualified(
+            lfs);
+    final Path rootTempDir =
+        new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+            lfs);
+
+    final Path rootInputPath = new Path(rootInputDir, "rumen/small-trace-test");
+    final Path tempDir = new Path(rootTempDir, "TestRumenViaDispatch");
+    lfs.delete(tempDir, true);
+
+    final Path topologyPath = new Path(tempDir, "dispatch-topology.json");
+    final Path tracePath = new Path(tempDir, "dispatch-trace.json");
+
+    final Path inputPath =
+        new Path(rootInputPath, "dispatch-sample-v20-jt-log.gz");
+
+    System.out.println("topology result file = " + topologyPath);
+    System.out.println("testRumenViaDispatch() trace result file = " + tracePath);
+
+    String demuxerClassName = ConcatenatedInputFilesDemuxer.class.getName();
+
+    String[] args =
+        { "-demuxer", demuxerClassName, tracePath.toString(),
+            topologyPath.toString(), inputPath.toString() };
+
+    final Path topologyGoldFile =
+        new Path(rootInputPath, "dispatch-topology-output.json.gz");
+    final Path traceGoldFile =
+        new Path(rootInputPath, "dispatch-trace-output.json.gz");
+
+    Tool analyzer = new TraceBuilder();
+    int result = ToolRunner.run(analyzer, args);
+    assertEquals("Non-zero exit", 0, result);
+
+    TestRumenJobTraces
+        .<LoggedNetworkTopology> jsonFileMatchesGold(conf, topologyPath,
             topologyGoldFile, LoggedNetworkTopology.class, "topology");
-    TestRumenJobTraces.<LoggedJob> jsonFileMatchesGold(lfs, traceFile,
+    TestRumenJobTraces.<LoggedJob> jsonFileMatchesGold(conf, tracePath,
         traceGoldFile, LoggedJob.class, "trace");
   }
 
+  @Test
+  public void testBracketedCounters() throws Exception {
+    final Configuration conf = new Configuration();
+    final FileSystem lfs = FileSystem.getLocal(conf);
+
+    final Path rootInputDir =
+        new Path(System.getProperty("test.tools.input.dir", "")).makeQualified(
+            lfs);
+    final Path rootTempDir =
+        new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+            lfs);
+
+    final Path rootInputPath = new Path(rootInputDir, "rumen/small-trace-test");
+    final Path tempDir = new Path(rootTempDir, "TestBracketedCounters");
+    lfs.delete(tempDir, true);
+
+    final Path topologyPath = new Path(tempDir, "dispatch-topology.json");
+    final Path tracePath = new Path(tempDir, "dispatch-trace.json");
+
+    final Path inputPath = new Path(rootInputPath, "counters-format-test-logs");
+
+    System.out.println("topology result file = " + topologyPath);
+    System.out.println("testBracketedCounters() trace result file = " + tracePath);
+
+    final Path goldPath =
+        new Path(rootInputPath, "counters-test-trace.json.gz");
+
+    String[] args =
+        { tracePath.toString(), topologyPath.toString(), inputPath.toString() };
+
+    Tool analyzer = new TraceBuilder();
+    int result = ToolRunner.run(analyzer, args);
+    assertEquals("Non-zero exit", 0, result);
+
+    TestRumenJobTraces.<LoggedJob> jsonFileMatchesGold(conf, tracePath,
+        goldPath, LoggedJob.class, "trace");
+  }
+
+  @Test
+  public void testHadoop20JHParser() throws Exception {
+    final Configuration conf = new Configuration();
+    final FileSystem lfs = FileSystem.getLocal(conf);
+
+    boolean success = false;
+
+    final Path rootInputDir =
+        new Path(System.getProperty("test.tools.input.dir", "")).makeQualified(
+            lfs);
+    final Path rootTempDir =
+        new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+            lfs);
+
+    final Path rootInputPath = new Path(rootInputDir, "rumen/small-trace-test");
+    final Path tempDir = new Path(rootTempDir, "TestRumenViaDispatch");
+    lfs.delete(tempDir, true);
+
+    final Path inputPath = new Path(rootInputPath, "v20-single-input-log.gz");
+    final Path goldPath =
+        new Path(rootInputPath, "v20-single-input-log-event-classes.text.gz");
+
+    InputStream inputLogStream =
+        new PossiblyDecompressedInputStream(inputPath, conf);
+
+    InputStream inputGoldStream =
+        new PossiblyDecompressedInputStream(goldPath, conf);
+
+    BufferedInputStream bis = new BufferedInputStream(inputLogStream);
+    bis.mark(10000);
+    Hadoop20JHParser parser = new Hadoop20JHParser(bis);
+
+    final Path resultPath = new Path(tempDir, "result.text");
+
+    System.out.println("testHadoop20JHParser sent its output to " + resultPath);
+
+    Compressor compressor;
+
+    FileSystem fs = resultPath.getFileSystem(conf);
+    CompressionCodec codec =
+        new CompressionCodecFactory(conf).getCodec(resultPath);
+    OutputStream output;
+    if (codec != null) {
+      compressor = CodecPool.getCompressor(codec);
+      output = codec.createOutputStream(fs.create(resultPath), compressor);
+    } else {
+      output = fs.create(resultPath);
+    }
+
+    PrintStream printStream = new PrintStream(output);
+
+    try {
+      assertEquals("Hadoop20JHParser can't parse the test file", true,
+          Hadoop20JHParser.canParse(inputLogStream));
+
+      bis.reset();
+
+      HistoryEvent event = parser.nextEvent();
+
+      while (event != null) {
+        printStream.println(event.getClass().getCanonicalName());
+        event = parser.nextEvent();
+      }
+
+      printStream.close();
+
+      LineReader goldLines = new LineReader(inputGoldStream);
+      LineReader resultLines =
+          new LineReader(new PossiblyDecompressedInputStream(resultPath, conf));
+
+      int lineNumber = 1;
+
+      try {
+        Text goldLine = new Text();
+        Text resultLine = new Text();
+
+        int goldRead = goldLines.readLine(goldLine);
+        int resultRead = resultLines.readLine(resultLine);
+
+        while (goldRead * resultRead != 0) {
+          if (!goldLine.equals(resultLine)) {
+            assertEquals("Type mismatch detected", goldLine, resultLine);
+            break;
+          }
+
+          goldRead = goldLines.readLine(goldLine);
+          resultRead = resultLines.readLine(resultLine);
+
+          ++lineNumber;
+        }
+
+        if (goldRead != resultRead) {
+          assertEquals("the " + (goldRead > resultRead ? "gold" : resultRead)
+              + " file contains more text at line " + lineNumber, goldRead,
+              resultRead);
+        }
+
+        success = true;
+      } finally {
+        goldLines.close();
+        resultLines.close();
+
+        if (success) {
+          lfs.delete(resultPath, false);
+        }
+      }
+
+    } finally {
+      if (parser == null) {
+        inputLogStream.close();
+      } else {
+        if (parser != null) {
+          parser.close();
+        }
+      }
+
+      if (inputGoldStream != null) {
+        inputGoldStream.close();
+      }
+
+      // it's okay to do this twice [if we get an error on input]
+      printStream.close();
+    }
+  }
+
+  @Test
+  public void testJobConfigurationParser() throws Exception {
+    String[] list1 =
+        { "mapred.job.queue.name", "mapreduce.job.name",
+            "mapred.child.java.opts" };
+
+    String[] list2 = { "mapred.job.queue.name", "mapred.child.java.opts" };
+
+    List<String> interested1 = new ArrayList<String>();
+    for (String interested : list1) {
+      interested1.add(interested);
+    }
+
+    List<String> interested2 = new ArrayList<String>();
+    for (String interested : list2) {
+      interested2.add(interested);
+    }
+
+    JobConfigurationParser jcp1 = new JobConfigurationParser(interested1);
+    JobConfigurationParser jcp2 = new JobConfigurationParser(interested2);
+
+    final Configuration conf = new Configuration();
+    final FileSystem lfs = FileSystem.getLocal(conf);
+
+    @SuppressWarnings("deprecation")
+    final Path rootInputDir =
+        new Path(System.getProperty("test.tools.input.dir", ""))
+            .makeQualified(lfs);
+
+    final Path rootInputPath = new Path(rootInputDir, "rumen/small-trace-test");
+
+    final Path inputPath = new Path(rootInputPath, "sample-conf.file.xml");
+
+    InputStream inputConfStream =
+        new PossiblyDecompressedInputStream(inputPath, conf);
+
+    try {
+      Properties props1 = jcp1.parse(inputConfStream);
+      inputConfStream.close();
+
+      inputConfStream = new PossiblyDecompressedInputStream(inputPath, conf);
+      Properties props2 = jcp2.parse(inputConfStream);
+
+      assertEquals("testJobConfigurationParser: wrong number of properties", 3,
+          props1.size());
+      assertEquals("testJobConfigurationParser: wrong number of properties", 2,
+          props2.size());
+
+      assertEquals("prop test 1", "TheQueue", props1
+          .get("mapred.job.queue.name"));
+      assertEquals("prop test 2", "job_0001", props1.get("mapreduce.job.name"));
+      assertEquals("prop test 3",
+          "-server -Xmx640m -Djava.net.preferIPv4Stack=true", props1
+              .get("mapred.child.java.opts"));
+      assertEquals("prop test 4", "TheQueue", props2
+          .get("mapred.job.queue.name"));
+      assertEquals("prop test 5",
+          "-server -Xmx640m -Djava.net.preferIPv4Stack=true", props2
+              .get("mapred.child.java.opts"));
+
+    } finally {
+      inputConfStream.close();
+    }
+  }
+
+  @Test
+  public void testTopologyBuilder() throws Exception {
+    final TopologyBuilder subject = new TopologyBuilder();
+
+    // currently we extract no host names from the Properties
+    subject.process(new Properties());
+
+    subject.process(new TaskAttemptFinishedEvent(TaskAttemptID
+        .forName("attempt_200904211745_0003_m_000004_0"), TaskType
+        .valueOf("MAP"), "STATUS", 1234567890L,
+        "/194\\.6\\.134\\.64/cluster50261\\.secondleveldomain\\.com",
+        "SUCCESS", null));
+    subject.process(new TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID
+        .forName("attempt_200904211745_0003_m_000004_1"), TaskType
+        .valueOf("MAP"), "STATUS", 1234567890L,
+        "/194\\.6\\.134\\.80/cluster50262\\.secondleveldomain\\.com",
+        "MACHINE_EXPLODED"));
+    subject.process(new TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID
+        .forName("attempt_200904211745_0003_m_000004_2"), TaskType
+        .valueOf("MAP"), "STATUS", 1234567890L,
+        "/194\\.6\\.134\\.80/cluster50263\\.secondleveldomain\\.com",
+        "MACHINE_EXPLODED"));
+    subject.process(new TaskStartedEvent(TaskID
+        .forName("task_200904211745_0003_m_000004"), 1234567890L, TaskType
+        .valueOf("MAP"),
+        "/194\\.6\\.134\\.80/cluster50263\\.secondleveldomain\\.com"));
+
+    final LoggedNetworkTopology topology = subject.build();
+
+    List<LoggedNetworkTopology> racks = topology.getChildren();
+
+    assertEquals("Wrong number of racks", 2, racks.size());
+
+    boolean sawSingleton = false;
+    boolean sawDoubleton = false;
+
+    for (LoggedNetworkTopology rack : racks) {
+      List<LoggedNetworkTopology> nodes = rack.getChildren();
+      if (rack.getName().endsWith(".64")) {
+        assertEquals("The singleton rack has the wrong number of elements", 1,
+            nodes.size());
+        sawSingleton = true;
+      } else if (rack.getName().endsWith(".80")) {
+        assertEquals("The doubleton rack has the wrong number of elements", 2,
+            nodes.size());
+        sawDoubleton = true;
+      } else {
+        assertTrue("Unrecognized rack name", false);
+      }
+    }
+
+    assertTrue("Did not see singleton rack", sawSingleton);
+    assertTrue("Did not see doubleton rack", sawDoubleton);
+  }
+
   static private <T extends DeepCompare> void jsonFileMatchesGold(
-      FileSystem lfs, Path result, Path gold, Class<? extends T> clazz,
+      Configuration conf, Path result, Path gold, Class<? extends T> clazz,
       String fileDescription) throws IOException {
     JsonObjectMapperParser<T> goldParser =
-        new JsonObjectMapperParser<T>(gold, clazz, new Configuration());
-    InputStream resultStream = lfs.open(result);
+        new JsonObjectMapperParser<T>(gold, clazz, conf);
     JsonObjectMapperParser<T> resultParser =
-        new JsonObjectMapperParser<T>(resultStream, clazz);
+        new JsonObjectMapperParser<T>(result, clazz, conf);
     try {
       while (true) {
         DeepCompare goldJob = goldParser.getNext();

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_40864_conf.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_40864_conf.xml?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_40864_conf.xml (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_40864_conf.xml Fri Mar  4 04:22:59 2011
@@ -0,0 +1,3 @@
+<configuration>
+<property><name>mapred.child.java.opts</name><value>-Xmx1024M -Djava.io.tmpdir=./tmp</value></property>
+</configuration>

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_40864_job_name-DAILY%2F20100210%5D.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_40864_job_name-DAILY%252F20100210%255D.gz?rev=1077515&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_40864_job_name-DAILY%2F20100210%5D.gz
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_50510_conf.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_50510_conf.xml?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_50510_conf.xml (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_50510_conf.xml Fri Mar  4 04:22:59 2011
@@ -0,0 +1,3 @@
+<configuration>
+<property><name>mapred.child.java.opts</name><value>-Xmx1024m -Djava.net.preferIPv4Stack=true</value></property>
+</configuration>

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_50510_job_name-DAILY%2F20100208%5D.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_50510_job_name-DAILY%252F20100208%255D.gz?rev=1077515&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_50510_job_name-DAILY%2F20100208%5D.gz
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-test-trace.json.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-test-trace.json.gz?rev=1077515&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-test-trace.json.gz
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/dispatch-sample-v20-jt-log.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/dispatch-sample-v20-jt-log.gz?rev=1077515&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/dispatch-sample-v20-jt-log.gz
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/dispatch-topology-output.json.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/dispatch-topology-output.json.gz?rev=1077515&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/dispatch-topology-output.json.gz
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/dispatch-trace-output.json.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/dispatch-trace-output.json.gz?rev=1077515&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/dispatch-trace-output.json.gz
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/folder-input-trace.json.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/folder-input-trace.json.gz?rev=1077515&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/folder-input-trace.json.gz
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/goldFoldedTrace.json.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/goldFoldedTrace.json.gz?rev=1077515&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/goldFoldedTrace.json.gz
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/sample-conf.file.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/sample-conf.file.xml?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/sample-conf.file.xml (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/sample-conf.file.xml Fri Mar  4 04:22:59 2011
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+   <property>
+      <name>mapred.job.queue.name</name><value>TheQueue</value>
+   </property>
+   <property>
+      <name>mapreduce.job.name</name><value>job_0001</value>
+   </property>
+   <property>
+      <name>maproduce.uninteresting.property</name><value>abcdef</value>
+   </property>
+   <property><name>mapred.child.java.opts</name><value>-server -Xmx640m -Djava.net.preferIPv4Stack=true</value></property>
+</configuration>

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/v20-single-input-log-event-classes.text.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/v20-single-input-log-event-classes.text.gz?rev=1077515&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/v20-single-input-log-event-classes.text.gz
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/v20-single-input-log.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/v20-single-input-log.gz?rev=1077515&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/v20-single-input-log.gz
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java?rev=1077515&r1=1077514&r2=1077515&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java Fri Mar  4 04:22:59 2011
@@ -38,7 +38,6 @@ public abstract class AbstractClusterSto
   protected Map<String, MachineNode> mNodeMap;
   protected Map<String, RackNode> rNodeMap;
   protected int maximumDistance = 0;
-  protected Random random;
   
   @Override
   public Set<MachineNode> getMachines() {
@@ -53,7 +52,8 @@ public abstract class AbstractClusterSto
   }
   
   @Override
-  public synchronized MachineNode[] getRandomMachines(int expected) {
+  public synchronized MachineNode[] getRandomMachines(int expected, 
+                                                      Random random) {
     if (expected == 0) {
       return new MachineNode[0];
     }
@@ -64,7 +64,6 @@ public abstract class AbstractClusterSto
 
     if (mNodesFlattened == null) {
       mNodesFlattened = machineNodes.toArray(new MachineNode[total]);
-      random = new Random();
     }
 
     MachineNode[] retval = new MachineNode[select];

Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java?rev=1077515&r1=1077514&r2=1077515&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java Fri Mar  4 04:22:59 2011
@@ -18,6 +18,7 @@
 package org.apache.hadoop.tools.rumen;
 
 import java.util.Set;
+import java.util.Random;
 
 /**
  * {@link ClusterStory} represents all configurations of a MapReduce cluster,
@@ -45,9 +46,10 @@ public interface ClusterStory {
   /**
    * Select a random set of machines.
    * @param expected The expected sample size.
+   * @param random Random number generator to use.
    * @return An array of up to expected number of {@link MachineNode}s.
    */
-  public MachineNode[] getRandomMachines(int expected);
+  public MachineNode[] getRandomMachines(int expected, Random random);
 
   /**
    * Get {@link MachineNode} by its host name.

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DefaultInputDemuxer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DefaultInputDemuxer.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DefaultInputDemuxer.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DefaultInputDemuxer.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * {@link DefaultInputDemuxer} acts as a pass-through demuxer. It just opens
+ * each file and returns back the input stream. If the input is compressed, it
+ * would return a decompression stream.
+ */
+public class DefaultInputDemuxer implements InputDemuxer {
+  String name;
+  InputStream input;
+
+  @Override
+  public void bindTo(Path path, Configuration conf) throws IOException {
+    if (name != null) { // re-binding before the previous one was consumed.
+      close();
+    }
+    name = path.getName();
+
+    input = new PossiblyDecompressedInputStream(path, conf);
+
+    return;
+  }
+
+  @Override
+  public Pair<String, InputStream> getNext() throws IOException {
+    if (name != null) {
+      Pair<String, InputStream> ret =
+          new Pair<String, InputStream>(name, input);
+      name = null;
+      input = null;
+      return ret;
+    }
+    return null;
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      if (input != null) {
+        input.close();
+      }
+    } finally {
+      name = null;
+      input = null;
+    }
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DefaultOutputter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DefaultOutputter.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DefaultOutputter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DefaultOutputter.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.Compressor;
+
+/**
+ * The default {@link Outputter} that outputs to a plain file. Compression
+ * will be applied if the path has the right suffix.
+ */
+public class DefaultOutputter<T> implements Outputter<T> {
+  JsonObjectMapperWriter<T> writer;
+  Compressor compressor;
+  
+  @Override
+  public void init(Path path, Configuration conf) throws IOException {
+    FileSystem fs = path.getFileSystem(conf);
+    CompressionCodec codec = new CompressionCodecFactory(conf).getCodec(path);
+    OutputStream output;
+    if (codec != null) {
+      compressor = CodecPool.getCompressor(codec);
+      output = codec.createOutputStream(fs.create(path), compressor);
+    } else {
+      output = fs.create(path);
+    }
+    writer = new JsonObjectMapperWriter<T>(output, 
+        conf.getBoolean("rumen.output.pretty.print", true));
+  }
+
+  @Override
+  public void output(T object) throws IOException {
+    writer.write(object);
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      writer.close();
+    } finally {
+      if (compressor != null) {
+        CodecPool.returnCompressor(compressor);
+      }
+    }
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DeskewedJobTraceReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DeskewedJobTraceReader.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DeskewedJobTraceReader.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DeskewedJobTraceReader.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class DeskewedJobTraceReader implements Closeable {
+  // underlying engine
+  private final JobTraceReader reader;
+
+  // configuration variables
+  private final int skewBufferLength;
+
+  private final boolean abortOnUnfixableSkew;
+
+  // state variables
+  private long skewMeasurementLatestSubmitTime = Long.MIN_VALUE;
+
+  private long returnedLatestSubmitTime = Long.MIN_VALUE;
+
+  private int maxSkewBufferNeeded = 0;
+
+  // a submit time will NOT be in countedRepeatedSubmitTimesSoFar if
+  // it only occurs once. This situation is represented by having the
+  // time in submitTimesSoFar only. A submit time that occurs twice or more
+  // appears in countedRepeatedSubmitTimesSoFar [with the appropriate range
+  // value] AND submitTimesSoFar
+  private TreeMap<Long, Integer> countedRepeatedSubmitTimesSoFar =
+      new TreeMap<Long, Integer>();
+  private TreeSet<Long> submitTimesSoFar = new TreeSet<Long>();
+
+  private final PriorityQueue<LoggedJob> skewBuffer;
+
+  static final private Log LOG =
+      LogFactory.getLog(DeskewedJobTraceReader.class);
+
+  static private class JobComparator implements Comparator<LoggedJob> {
+    @Override
+    public int compare(LoggedJob j1, LoggedJob j2) {
+      return (j1.getSubmitTime() < j2.getSubmitTime()) ? -1 : (j1
+          .getSubmitTime() == j2.getSubmitTime()) ? 0 : 1;
+    }
+  }
+
+  /**
+   * Constructor.
+   * 
+   * @param reader
+   *          the {@link JobTraceReader} that's being protected
+   * @param skewBufferSize
+   *          [the number of late jobs that can preced a later out-of-order
+   *          earlier job
+   * @throws IOException
+   */
+  public DeskewedJobTraceReader(JobTraceReader reader, int skewBufferLength,
+      boolean abortOnUnfixableSkew) throws IOException {
+    this.reader = reader;
+
+    this.skewBufferLength = skewBufferLength;
+
+    this.abortOnUnfixableSkew = abortOnUnfixableSkew;
+
+    skewBuffer =
+        new PriorityQueue<LoggedJob>(skewBufferLength + 1, new JobComparator());
+
+    fillSkewBuffer();
+  }
+
+  public DeskewedJobTraceReader(JobTraceReader reader) throws IOException {
+    this(reader, 0, true);
+  }
+
+  private LoggedJob rawNextJob() throws IOException {
+    LoggedJob result = reader.getNext();
+
+    if ((!abortOnUnfixableSkew || skewBufferLength > 0) && result != null) {
+      long thisTime = result.getSubmitTime();
+
+      if (submitTimesSoFar.contains(thisTime)) {
+        Integer myCount = countedRepeatedSubmitTimesSoFar.get(thisTime);
+
+        countedRepeatedSubmitTimesSoFar.put(thisTime, myCount == null ? 2
+            : myCount + 1);
+      } else {
+        submitTimesSoFar.add(thisTime);
+      }
+
+      if (thisTime < skewMeasurementLatestSubmitTime) {
+        Iterator<Long> endCursor = submitTimesSoFar.descendingIterator();
+
+        int thisJobNeedsSkew = 0;
+
+        Long keyNeedingSkew;
+
+        while (endCursor.hasNext()
+            && (keyNeedingSkew = endCursor.next()) > thisTime) {
+          Integer keyNeedsSkewAmount =
+              countedRepeatedSubmitTimesSoFar.get(keyNeedingSkew);
+
+          thisJobNeedsSkew +=
+              keyNeedsSkewAmount == null ? 1 : keyNeedsSkewAmount;
+        }
+
+        maxSkewBufferNeeded = Math.max(maxSkewBufferNeeded, thisJobNeedsSkew);
+      }
+
+      skewMeasurementLatestSubmitTime =
+          Math.max(thisTime, skewMeasurementLatestSubmitTime);
+    }
+
+    return result;
+  }
+
+  static class OutOfOrderException extends RuntimeException {
+    static final long serialVersionUID = 1L;
+
+    public OutOfOrderException(String text) {
+      super(text);
+    }
+  }
+
+  LoggedJob nextJob() throws IOException, OutOfOrderException {
+    LoggedJob newJob = rawNextJob();
+
+    if (newJob != null) {
+      skewBuffer.add(newJob);
+    }
+
+    LoggedJob result = skewBuffer.poll();
+
+    while (result != null && result.getSubmitTime() < returnedLatestSubmitTime) {
+      LOG.error("The current job was submitted earlier than the previous one");
+      LOG.error("Its jobID is " + result.getJobID());
+      LOG.error("Its submit time is " + result.getSubmitTime()
+          + ",but the previous one was " + returnedLatestSubmitTime);
+
+      if (abortOnUnfixableSkew) {
+        throw new OutOfOrderException("Job submit time is "
+            + result.getSubmitTime() + ",but the previous one was "
+            + returnedLatestSubmitTime);
+      }
+
+      result = rawNextJob();
+    }
+
+    if (result != null) {
+      returnedLatestSubmitTime = result.getSubmitTime();
+    }
+
+    return result;
+  }
+
+  private void fillSkewBuffer() throws IOException {
+    for (int i = 0; i < skewBufferLength; ++i) {
+      LoggedJob newJob = rawNextJob();
+
+      if (newJob == null) {
+        return;
+      }
+
+      skewBuffer.add(newJob);
+    }
+  }
+
+  int neededSkewBufferSize() {
+    return maxSkewBufferNeeded;
+  }
+
+  @Override
+  public void close() throws IOException {
+    reader.close();
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/EventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/EventType.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/EventType.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/EventType.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+@SuppressWarnings("all")
+public enum EventType { 
+  JOB_SUBMITTED, JOB_INITED, JOB_FINISHED, JOB_PRIORITY_CHANGED, JOB_STATUS_CHANGED, JOB_FAILED, JOB_KILLED, JOB_INFO_CHANGED, TASK_STARTED, TASK_FINISHED, TASK_FAILED, TASK_UPDATED, MAP_ATTEMPT_STARTED, MAP_ATTEMPT_FINISHED, MAP_ATTEMPT_FAILED, MAP_ATTEMPT_KILLED, REDUCE_ATTEMPT_STARTED, REDUCE_ATTEMPT_FINISHED, REDUCE_ATTEMPT_FAILED, REDUCE_ATTEMPT_KILLED, SETUP_ATTEMPT_STARTED, SETUP_ATTEMPT_FINISHED, SETUP_ATTEMPT_FAILED, SETUP_ATTEMPT_KILLED, CLEANUP_ATTEMPT_STARTED, CLEANUP_ATTEMPT_FINISHED, CLEANUP_ATTEMPT_FAILED, CLEANUP_ATTEMPT_KILLED
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.LineReader;
+
+/**
+ * {@link JobHistoryParser} to parse job histories for hadoop 0.20 (META=1).
+ */
+public class Hadoop20JHParser implements JobHistoryParser {
+  final LineReader reader;
+
+  static final String endLineString = " .";
+  static final int internalVersion = 1;
+
+  /**
+   * Can this parser parse the input?
+   * 
+   * @param input
+   * @return Whether this parser can parse the input.
+   * @throws IOException
+   * 
+   *           We will deem a stream to be a good 0.20 job history stream if the
+   *           first line is exactly "Meta VERSION=\"1\" ."
+   */
+  public static boolean canParse(InputStream input) throws IOException {
+    try {
+      LineReader reader = new LineReader(input);
+
+      Text buffer = new Text();
+
+      return reader.readLine(buffer) != 0
+          && buffer.toString().equals("Meta VERSION=\"1\" .");
+    } catch (EOFException e) {
+      return false;
+    }
+  }
+
+  public Hadoop20JHParser(InputStream input) throws IOException {
+    super();
+
+    reader = new LineReader(input);
+  }
+
+  Map<String, HistoryEventEmitter> liveEmitters =
+      new HashMap<String, HistoryEventEmitter>();
+  Queue<HistoryEvent> remainingEvents = new LinkedList<HistoryEvent>();
+
+  enum LineType {
+    JOB("Job", "JOBID") {
+      HistoryEventEmitter createEmitter() {
+        return new Job20LineHistoryEventEmitter();
+      }
+    },
+
+    TASK("Task", "TASKID") {
+      HistoryEventEmitter createEmitter() {
+        return new Task20LineHistoryEventEmitter();
+      }
+    },
+
+    MAP_ATTEMPT("MapAttempt", "TASK_ATTEMPT_ID") {
+      HistoryEventEmitter createEmitter() {
+        return new MapAttempt20LineHistoryEventEmitter();
+      }
+    },
+
+    REDUCE_ATTEMPT("ReduceAttempt", "TASK_ATTEMPT_ID") {
+      HistoryEventEmitter createEmitter() {
+        return new ReduceAttempt20LineHistoryEventEmitter();
+      }
+    };
+
+    private LogRecordType type;
+    private String name;
+
+    LineType(String s, String name) {
+      type = LogRecordType.intern(s);
+      this.name = name;
+    }
+
+    LogRecordType recordType() {
+      return type;
+    }
+
+    String getName(ParsedLine line) {
+      return line.get(name);
+    }
+
+    abstract HistoryEventEmitter createEmitter();
+
+    static LineType findLineType(LogRecordType lrt) {
+      for (LineType lt : LineType.values()) {
+        if (lt.type == lrt) {
+          return lt;
+        }
+      }
+
+      return null;
+    }
+  }
+
+  @Override
+  public HistoryEvent nextEvent() {
+    try {
+      while (remainingEvents.isEmpty()) {
+        ParsedLine line = new ParsedLine(getFullLine(), internalVersion);
+        LineType type = LineType.findLineType(line.getType());
+        if (type == null) {
+          continue;
+        }
+        String name = type.getName(line);
+        HistoryEventEmitter emitter = findOrMakeEmitter(name, type);
+        Pair<Queue<HistoryEvent>, HistoryEventEmitter.PostEmitAction> pair =
+            emitter.emitterCore(line, name);
+        if (pair.second() == HistoryEventEmitter.PostEmitAction.REMOVE_HEE) {
+          liveEmitters.remove(name);
+        }
+        remainingEvents = pair.first();
+      }
+      return remainingEvents.poll();
+    } catch (EOFException e) {
+      return null;
+    } catch (IOException e) {
+      return null;
+    }
+  }
+
+  HistoryEventEmitter findOrMakeEmitter(String name, LineType type) {
+    HistoryEventEmitter result = liveEmitters.get(name);
+    if (result == null) {
+      result = type.createEmitter();
+      liveEmitters.put(name, result);
+    }
+    return result;
+  }
+
+  private String getOneLine() throws IOException {
+    Text resultText = new Text();
+
+    if (reader.readLine(resultText) == 0) {
+      throw new EOFException("apparent bad line");
+    }
+
+    return resultText.toString();
+  }
+
+  private String getFullLine() throws IOException {
+    String line = getOneLine();
+
+    while (line.length() < endLineString.length()) {
+      line = getOneLine();
+    }
+
+    if (line.endsWith(endLineString)) {
+      return line;
+    }
+
+    StringBuilder sb = new StringBuilder(line);
+
+    String addedLine;
+
+    do {
+      addedLine = getOneLine();
+
+      if (addedLine == null) {
+        return sb.toString();
+      }
+
+      sb.append("\n");
+      sb.append(addedLine);
+    } while (addedLine.length() < endLineString.length()
+        || !endLineString.equals(addedLine.substring(addedLine.length()
+            - endLineString.length())));
+
+    return sb.toString();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (reader != null) {
+      reader.close();
+    }
+  }
+
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java?rev=1077515&r1=1077514&r2=1077515&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java Fri Mar  4 04:22:59 2011
@@ -70,6 +70,7 @@ import org.codehaus.jackson.map.Serializ
  * about it. See {@code usage()}, below.
  * 
  */
+@Deprecated
 public class HadoopLogsAnalyzer extends Configured implements Tool {
 
   // output streams
@@ -103,14 +104,15 @@ public class HadoopLogsAnalyzer extends 
   /**
    * The regular expression used to parse task attempt IDs in job tracker logs
    */
-  private final static Pattern taskAttemptIDPattern = Pattern
-      .compile(".*_([0-9]+)");
+  private final static Pattern taskAttemptIDPattern =
+      Pattern.compile(".*_([0-9]+)");
 
   private final static Pattern xmlFilePrefix = Pattern.compile("[ \t]*<");
 
   private final static Pattern confFileHeader = Pattern.compile("_conf.xml!!");
 
-  private final Map<String, Pattern> counterPatterns = new HashMap<String, Pattern>();
+  private final Map<String, Pattern> counterPatterns =
+      new HashMap<String, Pattern>();
 
   /**
    * The unpaired job config file. Currently only used to glean the {@code -Xmx}
@@ -188,14 +190,14 @@ public class HadoopLogsAnalyzer extends 
   private boolean collectTaskTimes = false;
 
   private LogRecordType canonicalJob = LogRecordType.intern("Job");
-  private LogRecordType canonicalMapAttempt = LogRecordType
-      .intern("MapAttempt");
-  private LogRecordType canonicalReduceAttempt = LogRecordType
-      .intern("ReduceAttempt");
+  private LogRecordType canonicalMapAttempt =
+      LogRecordType.intern("MapAttempt");
+  private LogRecordType canonicalReduceAttempt =
+      LogRecordType.intern("ReduceAttempt");
   private LogRecordType canonicalTask = LogRecordType.intern("Task");
 
-  private static Pattern streamingJobnamePattern = Pattern
-      .compile("streamjob\\d+.jar");
+  private static Pattern streamingJobnamePattern =
+      Pattern.compile("streamjob\\d+.jar");
 
   private HashSet<String> hostNames = new HashSet<String>();
 
@@ -250,8 +252,8 @@ public class HadoopLogsAnalyzer extends 
       result[i] = new Histogram[LoggedJob.JobType.values().length];
 
       for (int j = 0; j < LoggedJob.JobType.values().length; ++j) {
-        result[i][j] = blockname == null ? new Histogram() : new Histogram(
-            blockname);
+        result[i][j] =
+            blockname == null ? new Histogram() : new Histogram(blockname);
       }
     }
 
@@ -505,8 +507,9 @@ public class HadoopLogsAnalyzer extends 
           SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
       JsonFactory jfactory = jmapper.getJsonFactory();
       FileSystem jobFS = jobTraceFilename.getFileSystem(getConf());
-      jobTraceGen = jfactory.createJsonGenerator(
-          jobFS.create(jobTraceFilename), JsonEncoding.UTF8);
+      jobTraceGen =
+          jfactory.createJsonGenerator(jobFS.create(jobTraceFilename),
+              JsonEncoding.UTF8);
       if (prettyprintTrace) {
         jobTraceGen.useDefaultPrettyPrinter();
       }
@@ -517,8 +520,9 @@ public class HadoopLogsAnalyzer extends 
             SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
         JsonFactory tfactory = tmapper.getJsonFactory();
         FileSystem topoFS = topologyFilename.getFileSystem(getConf());
-        topologyGen = tfactory.createJsonGenerator(
-            topoFS.create(topologyFilename), JsonEncoding.UTF8);
+        topologyGen =
+            tfactory.createJsonGenerator(topoFS.create(topologyFilename),
+                JsonEncoding.UTF8);
         topologyGen.useDefaultPrettyPrinter();
       }
     }
@@ -546,7 +550,7 @@ public class HadoopLogsAnalyzer extends 
       IOException {
     if (input != null) {
       input.close();
-      LOG.info("File closed: "+currentFileName);
+      LOG.info("File closed: " + currentFileName);
       input = null;
     }
 
@@ -573,7 +577,8 @@ public class HadoopLogsAnalyzer extends 
             + inputDirectoryFiles.length + ", starts with line " + lineNumber
             + ".");
 
-    input = maybeUncompressedPath(new Path(inputDirectoryPath, currentFileName));
+    input =
+        maybeUncompressedPath(new Path(inputDirectoryPath, currentFileName));
 
     return input != null;
   }
@@ -734,8 +739,9 @@ public class HadoopLogsAnalyzer extends 
 
     long[] endpointKeys = taskTimes.getCDF(1000, endpoints);
 
-    int smallResultOffset = (taskTimes.getTotalCount() < SMALL_SPREAD_COMPENSATION_THRESHOLD ? 1
-        : 0);
+    int smallResultOffset =
+        (taskTimes.getTotalCount() < SMALL_SPREAD_COMPENSATION_THRESHOLD ? 1
+            : 0);
 
     Histogram myTotal = spreadTo[outcome.ordinal()][jtype.ordinal()];
 
@@ -803,14 +809,15 @@ public class HadoopLogsAnalyzer extends 
           attemptsInCurrentJob = new HashMap<String, LoggedTaskAttempt>();
 
           // initialize all the per-job statistics gathering places
-          successfulMapAttemptTimes = new Histogram[ParsedHost
-              .numberOfDistances() + 1];
+          successfulMapAttemptTimes =
+              new Histogram[ParsedHost.numberOfDistances() + 1];
           for (int i = 0; i < successfulMapAttemptTimes.length; ++i) {
             successfulMapAttemptTimes[i] = new Histogram();
           }
 
           successfulReduceAttemptTimes = new Histogram();
-          failedMapAttemptTimes = new Histogram[ParsedHost.numberOfDistances() + 1];
+          failedMapAttemptTimes =
+              new Histogram[ParsedHost.numberOfDistances() + 1];
           for (int i = 0; i < failedMapAttemptTimes.length; ++i) {
             failedMapAttemptTimes[i] = new Histogram();
           }
@@ -851,7 +858,8 @@ public class HadoopLogsAnalyzer extends 
           if (finishTime != null) {
             jobBeingTraced.setFinishTime(Long.parseLong(finishTime));
             if (status != null) {
-              jobBeingTraced.setOutcome(Pre21JobHistoryConstants.Values.valueOf(status));
+              jobBeingTraced.setOutcome(Pre21JobHistoryConstants.Values
+                  .valueOf(status));
             }
 
             maybeMateJobAndConf();
@@ -890,9 +898,9 @@ public class HadoopLogsAnalyzer extends 
           if (launchTimeCurrentJob != 0) {
             String jobResultText = line.get("JOB_STATUS");
 
-            JobOutcome thisOutcome = ((jobResultText != null && "SUCCESS"
-                .equals(jobResultText)) ? JobOutcome.SUCCESS
-                : JobOutcome.FAILURE);
+            JobOutcome thisOutcome =
+                ((jobResultText != null && "SUCCESS".equals(jobResultText))
+                    ? JobOutcome.SUCCESS : JobOutcome.FAILURE);
 
             if (submitTimeCurrentJob != 0L) {
               canonicalDistributionsEnter(delayTimeDists, thisOutcome,
@@ -911,8 +919,8 @@ public class HadoopLogsAnalyzer extends 
             Histogram currentJobSortTimes = new Histogram();
             Histogram currentJobReduceTimes = new Histogram();
 
-            Iterator<Map.Entry<String, Long>> taskIter = taskAttemptStartTimes
-                .entrySet().iterator();
+            Iterator<Map.Entry<String, Long>> taskIter =
+                taskAttemptStartTimes.entrySet().iterator();
 
             while (taskIter.hasNext()) {
               Map.Entry<String, Long> entry = taskIter.next();
@@ -930,8 +938,8 @@ public class HadoopLogsAnalyzer extends 
               }
 
               // Reduce processing
-              Long shuffleEnd = taskReduceAttemptShuffleEndTimes.get(entry
-                  .getKey());
+              Long shuffleEnd =
+                  taskReduceAttemptShuffleEndTimes.get(entry.getKey());
               Long sortEnd = taskReduceAttemptSortEndTimes.get(entry.getKey());
               Long reduceEnd = taskReduceAttemptFinishTimes.get(entry.getKey());
 
@@ -1027,7 +1035,9 @@ public class HadoopLogsAnalyzer extends 
       Pre21JobHistoryConstants.Values stat;
 
       try {
-        stat = status == null ? null : Pre21JobHistoryConstants.Values.valueOf(status);
+        stat =
+            status == null ? null : Pre21JobHistoryConstants.Values
+                .valueOf(status);
       } catch (IllegalArgumentException e) {
         LOG.error("A task status you don't know about is \"" + status + "\".",
             e);
@@ -1037,22 +1047,26 @@ public class HadoopLogsAnalyzer extends 
       task.setTaskStatus(stat);
 
       try {
-        typ = taskType == null ? null : Pre21JobHistoryConstants.Values.valueOf(taskType);
+        typ =
+            taskType == null ? null : Pre21JobHistoryConstants.Values
+                .valueOf(taskType);
       } catch (IllegalArgumentException e) {
         LOG.error("A task type you don't know about is \"" + taskType + "\".",
             e);
         typ = null;
       }
-      
+
       if (typ == null) {
         return;
       }
 
       task.setTaskType(typ);
 
-      List<LoggedTask> vec = typ == Pre21JobHistoryConstants.Values.MAP ? jobBeingTraced
-          .getMapTasks() : typ == Pre21JobHistoryConstants.Values.REDUCE ? jobBeingTraced
-          .getReduceTasks() : jobBeingTraced.getOtherTasks();
+      List<LoggedTask> vec =
+          typ == Pre21JobHistoryConstants.Values.MAP ? jobBeingTraced
+              .getMapTasks() : typ == Pre21JobHistoryConstants.Values.REDUCE
+              ? jobBeingTraced.getReduceTasks() : jobBeingTraced
+                  .getOtherTasks();
 
       if (!taskAlreadyLogged) {
         vec.add(task);
@@ -1066,8 +1080,8 @@ public class HadoopLogsAnalyzer extends 
     Pattern result = counterPatterns.get(counterName);
 
     if (result == null) {
-      String namePatternRegex = "\\[\\(" + counterName
-          + "\\)\\([^)]+\\)\\(([0-9]+)\\)\\]";
+      String namePatternRegex =
+          "\\[\\(" + counterName + "\\)\\([^)]+\\)\\(([0-9]+)\\)\\]";
       result = Pattern.compile(namePatternRegex);
       counterPatterns.put(counterName, result);
     }
@@ -1253,7 +1267,9 @@ public class HadoopLogsAnalyzer extends 
       Pre21JobHistoryConstants.Values stat = null;
 
       try {
-        stat = status == null ? null : Pre21JobHistoryConstants.Values.valueOf(status);
+        stat =
+            status == null ? null : Pre21JobHistoryConstants.Values
+                .valueOf(status);
       } catch (IllegalArgumentException e) {
         LOG.error("A map attempt status you don't know about is \"" + status
             + "\".", e);
@@ -1404,7 +1420,9 @@ public class HadoopLogsAnalyzer extends 
       Pre21JobHistoryConstants.Values stat = null;
 
       try {
-        stat = status == null ? null : Pre21JobHistoryConstants.Values.valueOf(status);
+        stat =
+            status == null ? null : Pre21JobHistoryConstants.Values
+                .valueOf(status);
       } catch (IllegalArgumentException e) {
         LOG.warn("A map attempt status you don't know about is \"" + status
             + "\".", e);
@@ -1632,8 +1650,8 @@ public class HadoopLogsAnalyzer extends 
         }
 
         for (Map.Entry<Long, Long> ent : successfulNthMapperAttempts) {
-          successAfterI[ent.getKey().intValue()] = ((double) ent.getValue())
-              / totalSuccessfulAttempts;
+          successAfterI[ent.getKey().intValue()] =
+              ((double) ent.getValue()) / totalSuccessfulAttempts;
         }
         jobBeingTraced.setMapperTriesToSucceed(successAfterI);
       } else {
@@ -1712,8 +1730,9 @@ public class HadoopLogsAnalyzer extends 
       }
 
       if (spreading) {
-        String ratioDescription = "(" + spreadMax + "/1000 %ile) to ("
-            + spreadMin + "/1000 %ile) scaled by 1000000";
+        String ratioDescription =
+            "(" + spreadMax + "/1000 %ile) to (" + spreadMin
+                + "/1000 %ile) scaled by 1000000";
 
         printDistributionSet(
             "Map task success times " + ratioDescription + ":",
@@ -1737,8 +1756,8 @@ public class HadoopLogsAnalyzer extends 
     }
 
     if (topologyGen != null) {
-      LoggedNetworkTopology topo = new LoggedNetworkTopology(allHosts,
-          "<root>", 0);
+      LoggedNetworkTopology topo =
+          new LoggedNetworkTopology(allHosts, "<root>", 0);
       topologyGen.writeObject(topo);
       topologyGen.close();
     }