You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:23:01 UTC
svn commit: r1077515 [1/4] - in
/hadoop/common/branches/branch-0.20-security-patches/src:
mapred/org/apache/hadoop/mapreduce/ test/org/apache/hadoop/tools/rumen/
test/tools/data/rumen/small-trace-test/
test/tools/data/rumen/small-trace-test/counters-fo...
Author: omalley
Date: Fri Mar 4 04:22:59 2011
New Revision: 1077515
URL: http://svn.apache.org/viewvc?rev=1077515&view=rev
Log:
commit bcb73ee3319252d3650eecc249551de89348dbc9
Author: Hong Tang <ht...@yahoo-inc.com>
Date: Fri Jun 25 14:21:29 2010 -0700
MAPREDUCE-1309. Rumen refactory. From https://issues.apache.org/jira/secure/attachment/12447992/rumen-yhadoop-20.patch. (htang)
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1309. Rumen refactory. (htang)
+
Added:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/ConcatenatedInputFilesDemuxer.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRandomSeedGenerator.java
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_40864_conf.xml
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_40864_job_name-DAILY%2F20100210%5D.gz (with props)
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_50510_conf.xml
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_50510_job_name-DAILY%2F20100208%5D.gz (with props)
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-test-trace.json.gz (with props)
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/dispatch-sample-v20-jt-log.gz (with props)
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/dispatch-topology-output.json.gz (with props)
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/dispatch-trace-output.json.gz (with props)
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/folder-input-trace.json.gz (with props)
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/goldFoldedTrace.json.gz (with props)
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/sample-conf.file.xml
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/v20-single-input-log-event-classes.text.gz (with props)
hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/v20-single-input-log.gz (with props)
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DefaultInputDemuxer.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DefaultOutputter.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DeskewedJobTraceReader.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/EventType.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HistoryEvent.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HistoryEventEmitter.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounter.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounterGroup.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounters.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobConfPropertyNames.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobConfigurationParser.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobFinishedEvent.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParser.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParserFactory.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobInfoChangeEvent.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobInitedEvent.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobPriorityChangeEvent.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobStatusChangedEvent.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobSubmittedEvent.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobUnsuccessfulCompletionEvent.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/MapAttemptFinishedEvent.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Outputter.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RandomSeedGenerator.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceAttemptFinishedEvent.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RewindableInputStream.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/SingleEventEmitter.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Task20LineHistoryEventEmitter.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptFinishedEvent.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptStartedEvent.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptUnsuccessfulCompletionEvent.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskFailedEvent.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskFinishedEvent.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskStartedEvent.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskUpdatedEvent.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TopologyBuilder.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Version20LogInterfaceUtils.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Counters.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Counters.java?rev=1077515&r1=1077514&r2=1077515&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Counters.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Counters.java Fri Mar 4 04:22:59 2011
@@ -25,7 +25,7 @@ public class Counters implements Writabl
public Counters() {
}
- Counters(org.apache.hadoop.mapred.Counters counters) {
+ public Counters(org.apache.hadoop.mapred.Counters counters) {
for(org.apache.hadoop.mapred.Counters.Group group: counters) {
String name = group.getName();
CounterGroup newGroup = new CounterGroup(name, group.getDisplayName());
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/ConcatenatedInputFilesDemuxer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/ConcatenatedInputFilesDemuxer.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/ConcatenatedInputFilesDemuxer.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/ConcatenatedInputFilesDemuxer.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+public class ConcatenatedInputFilesDemuxer implements InputDemuxer {
+ private String name;
+ private DelimitedInputStream input;
+
+ private String knownNextFileName = null;
+
+ static private int MAXIMUM_HEADER_LINE_LENGTH = 500;
+
+ @Override
+ public void bindTo(Path path, Configuration conf) throws IOException {
+ InputStream underlyingInput = null;
+
+ if (name != null) { // re-binding before the previous one was consumed.
+ close();
+ }
+ name = path.getName();
+
+ underlyingInput = new PossiblyDecompressedInputStream(path, conf);
+
+ input =
+ new DelimitedInputStream(new BufferedInputStream(underlyingInput),
+ "\f!!FILE=", "!!\n");
+
+ knownNextFileName = input.nextFileName();
+
+ if (knownNextFileName == null) {
+ close();
+
+ return;
+ }
+
+ /*
+ * We handle files in specialized formats by trying their demuxers first,
+ * not by failing here.
+ */
+ return;
+ }
+
+ @Override
+ public Pair<String, InputStream> getNext() throws IOException {
+ if (knownNextFileName != null) {
+ Pair<String, InputStream> result =
+ new Pair<String, InputStream>(knownNextFileName, input);
+
+ knownNextFileName = null;
+
+ return result;
+ }
+
+ String nextFileName = input.nextFileName();
+
+ if (nextFileName == null) {
+ return null;
+ }
+
+ return new Pair<String, InputStream>(nextFileName, input);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (input != null) {
+ input.close();
+ }
+ }
+
+ /**
+ * A simple wrapper class to make any input stream delimited. It has an extra
+ * method, getName.
+ *
+ * The input stream should have lines that look like
+ * <marker><filename><endmarker> . The text <marker> should not occur
+ * elsewhere in the file. The text <endmarker> should not occur in a file
+ * name.
+ */
+ static class DelimitedInputStream extends InputStream {
+ private InputStream input;
+
+ private boolean endSeen = false;
+
+ private final String fileMarker;
+
+ private final byte[] markerBytes;
+
+ private final byte[] fileMarkerBuffer;
+
+ private final String fileEndMarker;
+
+ private final byte[] endMarkerBytes;
+
+ private final byte[] fileEndMarkerBuffer;
+
+ /**
+ * Constructor.
+ *
+ * @param input
+ */
+ public DelimitedInputStream(InputStream input, String fileMarker,
+ String fileEndMarker) {
+ this.input = new BufferedInputStream(input, 10000);
+ this.input.mark(10000);
+ this.fileMarker = fileMarker;
+ this.markerBytes = this.fileMarker.getBytes();
+ this.fileMarkerBuffer = new byte[this.markerBytes.length];
+ this.fileEndMarker = fileEndMarker;
+ this.endMarkerBytes = this.fileEndMarker.getBytes();
+ this.fileEndMarkerBuffer = new byte[this.endMarkerBytes.length];
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (endSeen) {
+ return -1;
+ }
+
+ input.mark(10000);
+
+ int result = input.read();
+
+ if (result < 0) {
+ endSeen = true;
+ return result;
+ }
+
+ if (result == markerBytes[0]) {
+ input.reset();
+
+ // this might be a marker line
+ int markerReadResult =
+ input.read(fileMarkerBuffer, 0, fileMarkerBuffer.length);
+
+ input.reset();
+
+ if (markerReadResult < fileMarkerBuffer.length
+ || !fileMarker.equals(new String(fileMarkerBuffer))) {
+ return input.read();
+ }
+
+ return -1;
+ }
+
+ return result;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.io.InputStream#read(byte[], int, int)
+ *
+ * This does SLIGHTLY THE WRONG THING.
+ *
+ * If we run off the end of the segment then the input buffer will be
+ * dirtied beyond the point where we claim to have read. If this turns out
+ * to be a problem, save that data somewhere and restore it if needed.
+ */
+ @Override
+ public int read(byte[] buffer, int offset, int length) throws IOException {
+ if (endSeen) {
+ return -1;
+ }
+
+ input.mark(length + markerBytes.length + 10);
+
+ int dataSeen = input.read(buffer, offset, length);
+
+ byte[] extraReadBuffer = null;
+ int extraActualRead = -1;
+
+ // search for an instance of a file marker
+ for (int i = offset; i < offset + dataSeen; ++i) {
+ if (buffer[i] == markerBytes[0]) {
+ boolean mismatch = false;
+
+ for (int j = 1; j < Math.min(markerBytes.length, offset + dataSeen
+ - i); ++j) {
+ if (buffer[i + j] != markerBytes[j]) {
+ mismatch = true;
+ break;
+ }
+ }
+
+ if (!mismatch) {
+ // see if we have only a prefix of the markerBytes
+ int uncheckedMarkerCharCount =
+ markerBytes.length - (offset + dataSeen - i);
+
+ if (uncheckedMarkerCharCount > 0) {
+ if (extraReadBuffer == null) {
+ extraReadBuffer = new byte[markerBytes.length - 1];
+
+ extraActualRead = input.read(extraReadBuffer);
+ }
+
+ if (extraActualRead < uncheckedMarkerCharCount) {
+ input.reset();
+ return input.read(buffer, offset, length);
+ }
+
+ for (int j = 0; j < uncheckedMarkerCharCount; ++j) {
+ if (extraReadBuffer[j] != markerBytes[markerBytes.length
+ - uncheckedMarkerCharCount + j]) {
+ input.reset();
+ return input.read(buffer, offset, length);
+ }
+ }
+ }
+
+ input.reset();
+
+ if (i == offset) {
+ return -1;
+ }
+
+ int result = input.read(buffer, offset, i - offset);
+ return result;
+ }
+ }
+ }
+
+ return dataSeen;
+ }
+
+ @Override
+ public int read(byte[] buffer) throws IOException {
+ return read(buffer, 0, buffer.length);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (endSeen) {
+ input.close();
+ }
+ }
+
+ String nextFileName() throws IOException {
+ return nextFileName(MAXIMUM_HEADER_LINE_LENGTH);
+ }
+
+ private String nextFileName(int bufferSize) throws IOException {
+ // the line can't contain a newline and must contain a form feed
+ byte[] buffer = new byte[bufferSize];
+
+ input.mark(bufferSize + 1);
+
+ int actualRead = input.read(buffer);
+ int mostRecentRead = actualRead;
+
+ while (actualRead < bufferSize && mostRecentRead > 0) {
+ mostRecentRead =
+ input.read(buffer, actualRead, bufferSize - actualRead);
+
+ if (mostRecentRead > 0) {
+ actualRead += mostRecentRead;
+ }
+ }
+
+ if (actualRead < markerBytes.length) {
+ input.reset();
+ return null;
+ }
+
+ for (int i = 0; i < markerBytes.length; ++i) {
+ if (markerBytes[i] != buffer[i]) {
+ input.reset();
+ return null;
+ }
+ }
+
+ for (int i = markerBytes.length; i < actualRead; ++i) {
+ if (buffer[i] == endMarkerBytes[0]) {
+ // gather the file name
+ input.reset();
+ // burn the marker
+ if (input.read(buffer, 0, markerBytes.length) < markerBytes.length) {
+ throw new IOException("Can't reread bytes I've read before.");
+ }
+ // get the file name
+ if (input.read(buffer, 0, i - markerBytes.length) < i
+ - markerBytes.length) {
+ throw new IOException("Can't reread bytes I've read before.");
+ }
+ // burn the two exclamation points and the newline
+ if (input.read(fileEndMarkerBuffer) < fileEndMarkerBuffer.length) {
+ input.reset();
+ return null;
+ }
+ for (int j = 0; j < endMarkerBytes.length; ++j) {
+ if (endMarkerBytes[j] != fileEndMarkerBuffer[j]) {
+ input.reset();
+ return null;
+ }
+ }
+
+ return new String(buffer, 0, i - markerBytes.length);
+ }
+
+ if (buffer[i] == '\n') {
+ return null;
+ }
+ }
+
+ // we ran off the end. Was the buffer too short, or is this all there was?
+ input.reset();
+
+ if (actualRead < bufferSize) {
+ return null;
+ }
+
+ return nextFileName(bufferSize * 2);
+ }
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRandomSeedGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRandomSeedGenerator.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRandomSeedGenerator.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRandomSeedGenerator.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.apache.hadoop.tools.rumen.RandomSeedGenerator.getSeed;
+
+public class TestRandomSeedGenerator {
+ @Test
+ public void testSeedGeneration() {
+ long masterSeed1 = 42;
+ long masterSeed2 = 43;
+
+ assertTrue("Deterministic seeding",
+ getSeed("stream1", masterSeed1) == getSeed("stream1", masterSeed1));
+ assertTrue("Deterministic seeding",
+ getSeed("stream2", masterSeed2) == getSeed("stream2", masterSeed2));
+ assertTrue("Different streams",
+ getSeed("stream1", masterSeed1) != getSeed("stream2", masterSeed1));
+ assertTrue("Different master seeds",
+ getSeed("stream1", masterSeed1) != getSeed("stream1", masterSeed2));
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java?rev=1077515&r1=1077514&r2=1077515&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java Fri Mar 4 04:22:59 2011
@@ -18,13 +18,29 @@
package org.apache.hadoop.tools.rumen;
+import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.LineReader;
+import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;
@@ -49,11 +65,11 @@ public class TestRumenJobTraces {
final FileSystem lfs = FileSystem.getLocal(conf);
final Path rootInputDir =
- new Path(System.getProperty("test.tools.input.dir", ""))
- .makeQualified(lfs);
+ new Path(System.getProperty("test.tools.input.dir", "")).makeQualified(
+ lfs);
final Path rootTempDir =
- new Path(System.getProperty("test.build.data", "/tmp"))
- .makeQualified(lfs);
+ new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+ lfs);
final Path rootInputFile = new Path(rootInputDir, "rumen/small-trace-test");
final Path tempDir = new Path(rootTempDir, "TestRumenJobTraces");
@@ -82,25 +98,355 @@ public class TestRumenJobTraces {
final Path topologyGoldFile = new Path(rootInputFile, goldTopology);
final Path traceGoldFile = new Path(rootInputFile, goldTrace);
+ @SuppressWarnings("deprecation")
HadoopLogsAnalyzer analyzer = new HadoopLogsAnalyzer();
int result = ToolRunner.run(analyzer, args);
assertEquals("Non-zero exit", 0, result);
TestRumenJobTraces
- .<LoggedNetworkTopology> jsonFileMatchesGold(lfs, topologyFile,
+ .<LoggedNetworkTopology> jsonFileMatchesGold(conf, topologyFile,
+ topologyGoldFile, LoggedNetworkTopology.class, "topology");
+ TestRumenJobTraces.<LoggedJob> jsonFileMatchesGold(conf, traceFile,
+ traceGoldFile, LoggedJob.class, "trace");
+ }
+
+ @Test
+ public void testRumenViaDispatch() throws Exception {
+ final Configuration conf = new Configuration();
+ final FileSystem lfs = FileSystem.getLocal(conf);
+
+ final Path rootInputDir =
+ new Path(System.getProperty("test.tools.input.dir", "")).makeQualified(
+ lfs);
+ final Path rootTempDir =
+ new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+ lfs);
+
+ final Path rootInputPath = new Path(rootInputDir, "rumen/small-trace-test");
+ final Path tempDir = new Path(rootTempDir, "TestRumenViaDispatch");
+ lfs.delete(tempDir, true);
+
+ final Path topologyPath = new Path(tempDir, "dispatch-topology.json");
+ final Path tracePath = new Path(tempDir, "dispatch-trace.json");
+
+ final Path inputPath =
+ new Path(rootInputPath, "dispatch-sample-v20-jt-log.gz");
+
+ System.out.println("topology result file = " + topologyPath);
+ System.out.println("testRumenViaDispatch() trace result file = " + tracePath);
+
+ String demuxerClassName = ConcatenatedInputFilesDemuxer.class.getName();
+
+ String[] args =
+ { "-demuxer", demuxerClassName, tracePath.toString(),
+ topologyPath.toString(), inputPath.toString() };
+
+ final Path topologyGoldFile =
+ new Path(rootInputPath, "dispatch-topology-output.json.gz");
+ final Path traceGoldFile =
+ new Path(rootInputPath, "dispatch-trace-output.json.gz");
+
+ Tool analyzer = new TraceBuilder();
+ int result = ToolRunner.run(analyzer, args);
+ assertEquals("Non-zero exit", 0, result);
+
+ TestRumenJobTraces
+ .<LoggedNetworkTopology> jsonFileMatchesGold(conf, topologyPath,
topologyGoldFile, LoggedNetworkTopology.class, "topology");
- TestRumenJobTraces.<LoggedJob> jsonFileMatchesGold(lfs, traceFile,
+ TestRumenJobTraces.<LoggedJob> jsonFileMatchesGold(conf, tracePath,
traceGoldFile, LoggedJob.class, "trace");
}
+ @Test
+ public void testBracketedCounters() throws Exception {
+ final Configuration conf = new Configuration();
+ final FileSystem lfs = FileSystem.getLocal(conf);
+
+ final Path rootInputDir =
+ new Path(System.getProperty("test.tools.input.dir", "")).makeQualified(
+ lfs);
+ final Path rootTempDir =
+ new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+ lfs);
+
+ final Path rootInputPath = new Path(rootInputDir, "rumen/small-trace-test");
+ final Path tempDir = new Path(rootTempDir, "TestBracketedCounters");
+ lfs.delete(tempDir, true);
+
+ final Path topologyPath = new Path(tempDir, "dispatch-topology.json");
+ final Path tracePath = new Path(tempDir, "dispatch-trace.json");
+
+ final Path inputPath = new Path(rootInputPath, "counters-format-test-logs");
+
+ System.out.println("topology result file = " + topologyPath);
+ System.out.println("testBracketedCounters() trace result file = " + tracePath);
+
+ final Path goldPath =
+ new Path(rootInputPath, "counters-test-trace.json.gz");
+
+ String[] args =
+ { tracePath.toString(), topologyPath.toString(), inputPath.toString() };
+
+ Tool analyzer = new TraceBuilder();
+ int result = ToolRunner.run(analyzer, args);
+ assertEquals("Non-zero exit", 0, result);
+
+ TestRumenJobTraces.<LoggedJob> jsonFileMatchesGold(conf, tracePath,
+ goldPath, LoggedJob.class, "trace");
+ }
+
+ @Test
+ public void testHadoop20JHParser() throws Exception {
+ final Configuration conf = new Configuration();
+ final FileSystem lfs = FileSystem.getLocal(conf);
+
+ boolean success = false;
+
+ final Path rootInputDir =
+ new Path(System.getProperty("test.tools.input.dir", "")).makeQualified(
+ lfs);
+ final Path rootTempDir =
+ new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+ lfs);
+
+ final Path rootInputPath = new Path(rootInputDir, "rumen/small-trace-test");
+ final Path tempDir = new Path(rootTempDir, "TestRumenViaDispatch");
+ lfs.delete(tempDir, true);
+
+ final Path inputPath = new Path(rootInputPath, "v20-single-input-log.gz");
+ final Path goldPath =
+ new Path(rootInputPath, "v20-single-input-log-event-classes.text.gz");
+
+ InputStream inputLogStream =
+ new PossiblyDecompressedInputStream(inputPath, conf);
+
+ InputStream inputGoldStream =
+ new PossiblyDecompressedInputStream(goldPath, conf);
+
+ BufferedInputStream bis = new BufferedInputStream(inputLogStream);
+ bis.mark(10000);
+ Hadoop20JHParser parser = new Hadoop20JHParser(bis);
+
+ final Path resultPath = new Path(tempDir, "result.text");
+
+ System.out.println("testHadoop20JHParser sent its output to " + resultPath);
+
+ Compressor compressor;
+
+ FileSystem fs = resultPath.getFileSystem(conf);
+ CompressionCodec codec =
+ new CompressionCodecFactory(conf).getCodec(resultPath);
+ OutputStream output;
+ if (codec != null) {
+ compressor = CodecPool.getCompressor(codec);
+ output = codec.createOutputStream(fs.create(resultPath), compressor);
+ } else {
+ output = fs.create(resultPath);
+ }
+
+ PrintStream printStream = new PrintStream(output);
+
+ try {
+ assertEquals("Hadoop20JHParser can't parse the test file", true,
+ Hadoop20JHParser.canParse(inputLogStream));
+
+ bis.reset();
+
+ HistoryEvent event = parser.nextEvent();
+
+ while (event != null) {
+ printStream.println(event.getClass().getCanonicalName());
+ event = parser.nextEvent();
+ }
+
+ printStream.close();
+
+ LineReader goldLines = new LineReader(inputGoldStream);
+ LineReader resultLines =
+ new LineReader(new PossiblyDecompressedInputStream(resultPath, conf));
+
+ int lineNumber = 1;
+
+ try {
+ Text goldLine = new Text();
+ Text resultLine = new Text();
+
+ int goldRead = goldLines.readLine(goldLine);
+ int resultRead = resultLines.readLine(resultLine);
+
+ while (goldRead * resultRead != 0) {
+ if (!goldLine.equals(resultLine)) {
+ assertEquals("Type mismatch detected", goldLine, resultLine);
+ break;
+ }
+
+ goldRead = goldLines.readLine(goldLine);
+ resultRead = resultLines.readLine(resultLine);
+
+ ++lineNumber;
+ }
+
+ if (goldRead != resultRead) {
+ assertEquals("the " + (goldRead > resultRead ? "gold" : resultRead)
+ + " file contains more text at line " + lineNumber, goldRead,
+ resultRead);
+ }
+
+ success = true;
+ } finally {
+ goldLines.close();
+ resultLines.close();
+
+ if (success) {
+ lfs.delete(resultPath, false);
+ }
+ }
+
+ } finally {
+ if (parser == null) {
+ inputLogStream.close();
+ } else {
+ if (parser != null) {
+ parser.close();
+ }
+ }
+
+ if (inputGoldStream != null) {
+ inputGoldStream.close();
+ }
+
+ // it's okay to do this twice [if we get an error on input]
+ printStream.close();
+ }
+ }
+
+ @Test
+ public void testJobConfigurationParser() throws Exception {
+ String[] list1 =
+ { "mapred.job.queue.name", "mapreduce.job.name",
+ "mapred.child.java.opts" };
+
+ String[] list2 = { "mapred.job.queue.name", "mapred.child.java.opts" };
+
+ List<String> interested1 = new ArrayList<String>();
+ for (String interested : list1) {
+ interested1.add(interested);
+ }
+
+ List<String> interested2 = new ArrayList<String>();
+ for (String interested : list2) {
+ interested2.add(interested);
+ }
+
+ JobConfigurationParser jcp1 = new JobConfigurationParser(interested1);
+ JobConfigurationParser jcp2 = new JobConfigurationParser(interested2);
+
+ final Configuration conf = new Configuration();
+ final FileSystem lfs = FileSystem.getLocal(conf);
+
+ @SuppressWarnings("deprecation")
+ final Path rootInputDir =
+ new Path(System.getProperty("test.tools.input.dir", ""))
+ .makeQualified(lfs);
+
+ final Path rootInputPath = new Path(rootInputDir, "rumen/small-trace-test");
+
+ final Path inputPath = new Path(rootInputPath, "sample-conf.file.xml");
+
+ InputStream inputConfStream =
+ new PossiblyDecompressedInputStream(inputPath, conf);
+
+ try {
+ Properties props1 = jcp1.parse(inputConfStream);
+ inputConfStream.close();
+
+ inputConfStream = new PossiblyDecompressedInputStream(inputPath, conf);
+ Properties props2 = jcp2.parse(inputConfStream);
+
+ assertEquals("testJobConfigurationParser: wrong number of properties", 3,
+ props1.size());
+ assertEquals("testJobConfigurationParser: wrong number of properties", 2,
+ props2.size());
+
+ assertEquals("prop test 1", "TheQueue", props1
+ .get("mapred.job.queue.name"));
+ assertEquals("prop test 2", "job_0001", props1.get("mapreduce.job.name"));
+ assertEquals("prop test 3",
+ "-server -Xmx640m -Djava.net.preferIPv4Stack=true", props1
+ .get("mapred.child.java.opts"));
+ assertEquals("prop test 4", "TheQueue", props2
+ .get("mapred.job.queue.name"));
+ assertEquals("prop test 5",
+ "-server -Xmx640m -Djava.net.preferIPv4Stack=true", props2
+ .get("mapred.child.java.opts"));
+
+ } finally {
+ inputConfStream.close();
+ }
+ }
+
+ @Test
+ public void testTopologyBuilder() throws Exception {
+ final TopologyBuilder subject = new TopologyBuilder();
+
+ // currently we extract no host names from the Properties
+ subject.process(new Properties());
+
+ subject.process(new TaskAttemptFinishedEvent(TaskAttemptID
+ .forName("attempt_200904211745_0003_m_000004_0"), TaskType
+ .valueOf("MAP"), "STATUS", 1234567890L,
+ "/194\\.6\\.134\\.64/cluster50261\\.secondleveldomain\\.com",
+ "SUCCESS", null));
+ subject.process(new TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID
+ .forName("attempt_200904211745_0003_m_000004_1"), TaskType
+ .valueOf("MAP"), "STATUS", 1234567890L,
+ "/194\\.6\\.134\\.80/cluster50262\\.secondleveldomain\\.com",
+ "MACHINE_EXPLODED"));
+ subject.process(new TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID
+ .forName("attempt_200904211745_0003_m_000004_2"), TaskType
+ .valueOf("MAP"), "STATUS", 1234567890L,
+ "/194\\.6\\.134\\.80/cluster50263\\.secondleveldomain\\.com",
+ "MACHINE_EXPLODED"));
+ subject.process(new TaskStartedEvent(TaskID
+ .forName("task_200904211745_0003_m_000004"), 1234567890L, TaskType
+ .valueOf("MAP"),
+ "/194\\.6\\.134\\.80/cluster50263\\.secondleveldomain\\.com"));
+
+ final LoggedNetworkTopology topology = subject.build();
+
+ List<LoggedNetworkTopology> racks = topology.getChildren();
+
+ assertEquals("Wrong number of racks", 2, racks.size());
+
+ boolean sawSingleton = false;
+ boolean sawDoubleton = false;
+
+ for (LoggedNetworkTopology rack : racks) {
+ List<LoggedNetworkTopology> nodes = rack.getChildren();
+ if (rack.getName().endsWith(".64")) {
+ assertEquals("The singleton rack has the wrong number of elements", 1,
+ nodes.size());
+ sawSingleton = true;
+ } else if (rack.getName().endsWith(".80")) {
+ assertEquals("The doubleton rack has the wrong number of elements", 2,
+ nodes.size());
+ sawDoubleton = true;
+ } else {
+ assertTrue("Unrecognized rack name", false);
+ }
+ }
+
+ assertTrue("Did not see singleton rack", sawSingleton);
+ assertTrue("Did not see doubleton rack", sawDoubleton);
+ }
+
static private <T extends DeepCompare> void jsonFileMatchesGold(
- FileSystem lfs, Path result, Path gold, Class<? extends T> clazz,
+ Configuration conf, Path result, Path gold, Class<? extends T> clazz,
String fileDescription) throws IOException {
JsonObjectMapperParser<T> goldParser =
- new JsonObjectMapperParser<T>(gold, clazz, new Configuration());
- InputStream resultStream = lfs.open(result);
+ new JsonObjectMapperParser<T>(gold, clazz, conf);
JsonObjectMapperParser<T> resultParser =
- new JsonObjectMapperParser<T>(resultStream, clazz);
+ new JsonObjectMapperParser<T>(result, clazz, conf);
try {
while (true) {
DeepCompare goldJob = goldParser.getNext();
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_40864_conf.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_40864_conf.xml?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_40864_conf.xml (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_40864_conf.xml Fri Mar 4 04:22:59 2011
@@ -0,0 +1,3 @@
+<configuration>
+<property><name>mapred.child.java.opts</name><value>-Xmx1024M -Djava.io.tmpdir=./tmp</value></property>
+</configuration>
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_40864_job_name-DAILY%2F20100210%5D.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_40864_job_name-DAILY%252F20100210%255D.gz?rev=1077515&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_40864_job_name-DAILY%2F20100210%5D.gz
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_50510_conf.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_50510_conf.xml?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_50510_conf.xml (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_50510_conf.xml Fri Mar 4 04:22:59 2011
@@ -0,0 +1,3 @@
+<configuration>
+<property><name>mapred.child.java.opts</name><value>-Xmx1024m -Djava.net.preferIPv4Stack=true</value></property>
+</configuration>
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_50510_job_name-DAILY%2F20100208%5D.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_50510_job_name-DAILY%252F20100208%255D.gz?rev=1077515&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-format-test-logs/megacluster.megacorp.com_1265616107882_job_201002080801_50510_job_name-DAILY%2F20100208%5D.gz
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-test-trace.json.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-test-trace.json.gz?rev=1077515&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/counters-test-trace.json.gz
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/dispatch-sample-v20-jt-log.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/dispatch-sample-v20-jt-log.gz?rev=1077515&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/dispatch-sample-v20-jt-log.gz
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/dispatch-topology-output.json.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/dispatch-topology-output.json.gz?rev=1077515&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/dispatch-topology-output.json.gz
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/dispatch-trace-output.json.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/dispatch-trace-output.json.gz?rev=1077515&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/dispatch-trace-output.json.gz
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/folder-input-trace.json.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/folder-input-trace.json.gz?rev=1077515&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/folder-input-trace.json.gz
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/goldFoldedTrace.json.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/goldFoldedTrace.json.gz?rev=1077515&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/goldFoldedTrace.json.gz
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/sample-conf.file.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/sample-conf.file.xml?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/sample-conf.file.xml (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/sample-conf.file.xml Fri Mar 4 04:22:59 2011
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+ <property>
+ <name>mapred.job.queue.name</name><value>TheQueue</value>
+ </property>
+ <property>
+ <name>mapreduce.job.name</name><value>job_0001</value>
+ </property>
+ <property>
+ <name>maproduce.uninteresting.property</name><value>abcdef</value>
+ </property>
+ <property><name>mapred.child.java.opts</name><value>-server -Xmx640m -Djava.net.preferIPv4Stack=true</value></property>
+</configuration>
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/v20-single-input-log-event-classes.text.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/v20-single-input-log-event-classes.text.gz?rev=1077515&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/v20-single-input-log-event-classes.text.gz
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/v20-single-input-log.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/v20-single-input-log.gz?rev=1077515&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/small-trace-test/v20-single-input-log.gz
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java?rev=1077515&r1=1077514&r2=1077515&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java Fri Mar 4 04:22:59 2011
@@ -38,7 +38,6 @@ public abstract class AbstractClusterSto
protected Map<String, MachineNode> mNodeMap;
protected Map<String, RackNode> rNodeMap;
protected int maximumDistance = 0;
- protected Random random;
@Override
public Set<MachineNode> getMachines() {
@@ -53,7 +52,8 @@ public abstract class AbstractClusterSto
}
@Override
- public synchronized MachineNode[] getRandomMachines(int expected) {
+ public synchronized MachineNode[] getRandomMachines(int expected,
+ Random random) {
if (expected == 0) {
return new MachineNode[0];
}
@@ -64,7 +64,6 @@ public abstract class AbstractClusterSto
if (mNodesFlattened == null) {
mNodesFlattened = machineNodes.toArray(new MachineNode[total]);
- random = new Random();
}
MachineNode[] retval = new MachineNode[select];
Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java?rev=1077515&r1=1077514&r2=1077515&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java Fri Mar 4 04:22:59 2011
@@ -18,6 +18,7 @@
package org.apache.hadoop.tools.rumen;
import java.util.Set;
+import java.util.Random;
/**
* {@link ClusterStory} represents all configurations of a MapReduce cluster,
@@ -45,9 +46,10 @@ public interface ClusterStory {
/**
* Select a random set of machines.
* @param expected The expected sample size.
+ * @param random Random number generator to use.
* @return An array of up to expected number of {@link MachineNode}s.
*/
- public MachineNode[] getRandomMachines(int expected);
+ public MachineNode[] getRandomMachines(int expected, Random random);
/**
* Get {@link MachineNode} by its host name.
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DefaultInputDemuxer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DefaultInputDemuxer.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DefaultInputDemuxer.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DefaultInputDemuxer.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * {@link DefaultInputDemuxer} acts as a pass-through demuxer. It just opens
+ * each file and returns back the input stream. If the input is compressed, it
+ * would return a decompression stream.
+ */
+public class DefaultInputDemuxer implements InputDemuxer {
+ String name;
+ InputStream input;
+
+ @Override
+ public void bindTo(Path path, Configuration conf) throws IOException {
+ if (name != null) { // re-binding before the previous one was consumed.
+ close();
+ }
+ name = path.getName();
+
+ input = new PossiblyDecompressedInputStream(path, conf);
+
+ return;
+ }
+
+ @Override
+ public Pair<String, InputStream> getNext() throws IOException {
+ if (name != null) {
+ Pair<String, InputStream> ret =
+ new Pair<String, InputStream>(name, input);
+ name = null;
+ input = null;
+ return ret;
+ }
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (input != null) {
+ input.close();
+ }
+ } finally {
+ name = null;
+ input = null;
+ }
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DefaultOutputter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DefaultOutputter.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DefaultOutputter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DefaultOutputter.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.Compressor;
+
+/**
+ * The default {@link Outputter} that outputs to a plain file. Compression
+ * will be applied if the path has the right suffix.
+ */
+public class DefaultOutputter<T> implements Outputter<T> {
+ JsonObjectMapperWriter<T> writer;
+ Compressor compressor;
+
+ @Override
+ public void init(Path path, Configuration conf) throws IOException {
+ FileSystem fs = path.getFileSystem(conf);
+ CompressionCodec codec = new CompressionCodecFactory(conf).getCodec(path);
+ OutputStream output;
+ if (codec != null) {
+ compressor = CodecPool.getCompressor(codec);
+ output = codec.createOutputStream(fs.create(path), compressor);
+ } else {
+ output = fs.create(path);
+ }
+ writer = new JsonObjectMapperWriter<T>(output,
+ conf.getBoolean("rumen.output.pretty.print", true));
+ }
+
+ @Override
+ public void output(T object) throws IOException {
+ writer.write(object);
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ writer.close();
+ } finally {
+ if (compressor != null) {
+ CodecPool.returnCompressor(compressor);
+ }
+ }
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DeskewedJobTraceReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DeskewedJobTraceReader.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DeskewedJobTraceReader.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/DeskewedJobTraceReader.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class DeskewedJobTraceReader implements Closeable {
+ // underlying engine
+ private final JobTraceReader reader;
+
+ // configuration variables
+ private final int skewBufferLength;
+
+ private final boolean abortOnUnfixableSkew;
+
+ // state variables
+ private long skewMeasurementLatestSubmitTime = Long.MIN_VALUE;
+
+ private long returnedLatestSubmitTime = Long.MIN_VALUE;
+
+ private int maxSkewBufferNeeded = 0;
+
+ // a submit time will NOT be in countedRepeatedSubmitTimesSoFar if
+ // it only occurs once. This situation is represented by having the
+ // time in submitTimesSoFar only. A submit time that occurs twice or more
+ // appears in countedRepeatedSubmitTimesSoFar [with the appropriate range
+ // value] AND submitTimesSoFar
+ private TreeMap<Long, Integer> countedRepeatedSubmitTimesSoFar =
+ new TreeMap<Long, Integer>();
+ private TreeSet<Long> submitTimesSoFar = new TreeSet<Long>();
+
+ private final PriorityQueue<LoggedJob> skewBuffer;
+
+ static final private Log LOG =
+ LogFactory.getLog(DeskewedJobTraceReader.class);
+
+ static private class JobComparator implements Comparator<LoggedJob> {
+ @Override
+ public int compare(LoggedJob j1, LoggedJob j2) {
+ return (j1.getSubmitTime() < j2.getSubmitTime()) ? -1 : (j1
+ .getSubmitTime() == j2.getSubmitTime()) ? 0 : 1;
+ }
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param reader
+ * the {@link JobTraceReader} that's being protected
+ * @param skewBufferSize
+ * [the number of late jobs that can preced a later out-of-order
+ * earlier job
+ * @throws IOException
+ */
+ public DeskewedJobTraceReader(JobTraceReader reader, int skewBufferLength,
+ boolean abortOnUnfixableSkew) throws IOException {
+ this.reader = reader;
+
+ this.skewBufferLength = skewBufferLength;
+
+ this.abortOnUnfixableSkew = abortOnUnfixableSkew;
+
+ skewBuffer =
+ new PriorityQueue<LoggedJob>(skewBufferLength + 1, new JobComparator());
+
+ fillSkewBuffer();
+ }
+
+ public DeskewedJobTraceReader(JobTraceReader reader) throws IOException {
+ this(reader, 0, true);
+ }
+
+ private LoggedJob rawNextJob() throws IOException {
+ LoggedJob result = reader.getNext();
+
+ if ((!abortOnUnfixableSkew || skewBufferLength > 0) && result != null) {
+ long thisTime = result.getSubmitTime();
+
+ if (submitTimesSoFar.contains(thisTime)) {
+ Integer myCount = countedRepeatedSubmitTimesSoFar.get(thisTime);
+
+ countedRepeatedSubmitTimesSoFar.put(thisTime, myCount == null ? 2
+ : myCount + 1);
+ } else {
+ submitTimesSoFar.add(thisTime);
+ }
+
+ if (thisTime < skewMeasurementLatestSubmitTime) {
+ Iterator<Long> endCursor = submitTimesSoFar.descendingIterator();
+
+ int thisJobNeedsSkew = 0;
+
+ Long keyNeedingSkew;
+
+ while (endCursor.hasNext()
+ && (keyNeedingSkew = endCursor.next()) > thisTime) {
+ Integer keyNeedsSkewAmount =
+ countedRepeatedSubmitTimesSoFar.get(keyNeedingSkew);
+
+ thisJobNeedsSkew +=
+ keyNeedsSkewAmount == null ? 1 : keyNeedsSkewAmount;
+ }
+
+ maxSkewBufferNeeded = Math.max(maxSkewBufferNeeded, thisJobNeedsSkew);
+ }
+
+ skewMeasurementLatestSubmitTime =
+ Math.max(thisTime, skewMeasurementLatestSubmitTime);
+ }
+
+ return result;
+ }
+
+ static class OutOfOrderException extends RuntimeException {
+ static final long serialVersionUID = 1L;
+
+ public OutOfOrderException(String text) {
+ super(text);
+ }
+ }
+
+ LoggedJob nextJob() throws IOException, OutOfOrderException {
+ LoggedJob newJob = rawNextJob();
+
+ if (newJob != null) {
+ skewBuffer.add(newJob);
+ }
+
+ LoggedJob result = skewBuffer.poll();
+
+ while (result != null && result.getSubmitTime() < returnedLatestSubmitTime) {
+ LOG.error("The current job was submitted earlier than the previous one");
+ LOG.error("Its jobID is " + result.getJobID());
+ LOG.error("Its submit time is " + result.getSubmitTime()
+ + ",but the previous one was " + returnedLatestSubmitTime);
+
+ if (abortOnUnfixableSkew) {
+ throw new OutOfOrderException("Job submit time is "
+ + result.getSubmitTime() + ",but the previous one was "
+ + returnedLatestSubmitTime);
+ }
+
+ result = rawNextJob();
+ }
+
+ if (result != null) {
+ returnedLatestSubmitTime = result.getSubmitTime();
+ }
+
+ return result;
+ }
+
+ private void fillSkewBuffer() throws IOException {
+ for (int i = 0; i < skewBufferLength; ++i) {
+ LoggedJob newJob = rawNextJob();
+
+ if (newJob == null) {
+ return;
+ }
+
+ skewBuffer.add(newJob);
+ }
+ }
+
+ int neededSkewBufferSize() {
+ return maxSkewBufferNeeded;
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/EventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/EventType.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/EventType.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/EventType.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+@SuppressWarnings("all")
+public enum EventType {
+ JOB_SUBMITTED, JOB_INITED, JOB_FINISHED, JOB_PRIORITY_CHANGED, JOB_STATUS_CHANGED, JOB_FAILED, JOB_KILLED, JOB_INFO_CHANGED, TASK_STARTED, TASK_FINISHED, TASK_FAILED, TASK_UPDATED, MAP_ATTEMPT_STARTED, MAP_ATTEMPT_FINISHED, MAP_ATTEMPT_FAILED, MAP_ATTEMPT_KILLED, REDUCE_ATTEMPT_STARTED, REDUCE_ATTEMPT_FINISHED, REDUCE_ATTEMPT_FAILED, REDUCE_ATTEMPT_KILLED, SETUP_ATTEMPT_STARTED, SETUP_ATTEMPT_FINISHED, SETUP_ATTEMPT_FAILED, SETUP_ATTEMPT_KILLED, CLEANUP_ATTEMPT_STARTED, CLEANUP_ATTEMPT_FINISHED, CLEANUP_ATTEMPT_FAILED, CLEANUP_ATTEMPT_KILLED
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java Fri Mar 4 04:22:59 2011
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.LineReader;
+
+/**
+ * {@link JobHistoryParser} to parse job histories for hadoop 0.20 (META=1).
+ */
+public class Hadoop20JHParser implements JobHistoryParser {
+ final LineReader reader;
+
+ static final String endLineString = " .";
+ static final int internalVersion = 1;
+
+ /**
+ * Can this parser parse the input?
+ *
+ * @param input
+ * @return Whether this parser can parse the input.
+ * @throws IOException
+ *
+ * We will deem a stream to be a good 0.20 job history stream if the
+ * first line is exactly "Meta VERSION=\"1\" ."
+ */
+ public static boolean canParse(InputStream input) throws IOException {
+ try {
+ LineReader reader = new LineReader(input);
+
+ Text buffer = new Text();
+
+ return reader.readLine(buffer) != 0
+ && buffer.toString().equals("Meta VERSION=\"1\" .");
+ } catch (EOFException e) {
+ return false;
+ }
+ }
+
+ public Hadoop20JHParser(InputStream input) throws IOException {
+ super();
+
+ reader = new LineReader(input);
+ }
+
+ Map<String, HistoryEventEmitter> liveEmitters =
+ new HashMap<String, HistoryEventEmitter>();
+ Queue<HistoryEvent> remainingEvents = new LinkedList<HistoryEvent>();
+
+ enum LineType {
+ JOB("Job", "JOBID") {
+ HistoryEventEmitter createEmitter() {
+ return new Job20LineHistoryEventEmitter();
+ }
+ },
+
+ TASK("Task", "TASKID") {
+ HistoryEventEmitter createEmitter() {
+ return new Task20LineHistoryEventEmitter();
+ }
+ },
+
+ MAP_ATTEMPT("MapAttempt", "TASK_ATTEMPT_ID") {
+ HistoryEventEmitter createEmitter() {
+ return new MapAttempt20LineHistoryEventEmitter();
+ }
+ },
+
+ REDUCE_ATTEMPT("ReduceAttempt", "TASK_ATTEMPT_ID") {
+ HistoryEventEmitter createEmitter() {
+ return new ReduceAttempt20LineHistoryEventEmitter();
+ }
+ };
+
+ private LogRecordType type;
+ private String name;
+
+ LineType(String s, String name) {
+ type = LogRecordType.intern(s);
+ this.name = name;
+ }
+
+ LogRecordType recordType() {
+ return type;
+ }
+
+ String getName(ParsedLine line) {
+ return line.get(name);
+ }
+
+ abstract HistoryEventEmitter createEmitter();
+
+ static LineType findLineType(LogRecordType lrt) {
+ for (LineType lt : LineType.values()) {
+ if (lt.type == lrt) {
+ return lt;
+ }
+ }
+
+ return null;
+ }
+ }
+
+ @Override
+ public HistoryEvent nextEvent() {
+ try {
+ while (remainingEvents.isEmpty()) {
+ ParsedLine line = new ParsedLine(getFullLine(), internalVersion);
+ LineType type = LineType.findLineType(line.getType());
+ if (type == null) {
+ continue;
+ }
+ String name = type.getName(line);
+ HistoryEventEmitter emitter = findOrMakeEmitter(name, type);
+ Pair<Queue<HistoryEvent>, HistoryEventEmitter.PostEmitAction> pair =
+ emitter.emitterCore(line, name);
+ if (pair.second() == HistoryEventEmitter.PostEmitAction.REMOVE_HEE) {
+ liveEmitters.remove(name);
+ }
+ remainingEvents = pair.first();
+ }
+ return remainingEvents.poll();
+ } catch (EOFException e) {
+ return null;
+ } catch (IOException e) {
+ return null;
+ }
+ }
+
+ HistoryEventEmitter findOrMakeEmitter(String name, LineType type) {
+ HistoryEventEmitter result = liveEmitters.get(name);
+ if (result == null) {
+ result = type.createEmitter();
+ liveEmitters.put(name, result);
+ }
+ return result;
+ }
+
+ private String getOneLine() throws IOException {
+ Text resultText = new Text();
+
+ if (reader.readLine(resultText) == 0) {
+ throw new EOFException("apparent bad line");
+ }
+
+ return resultText.toString();
+ }
+
+ private String getFullLine() throws IOException {
+ String line = getOneLine();
+
+ while (line.length() < endLineString.length()) {
+ line = getOneLine();
+ }
+
+ if (line.endsWith(endLineString)) {
+ return line;
+ }
+
+ StringBuilder sb = new StringBuilder(line);
+
+ String addedLine;
+
+ do {
+ addedLine = getOneLine();
+
+ if (addedLine == null) {
+ return sb.toString();
+ }
+
+ sb.append("\n");
+ sb.append(addedLine);
+ } while (addedLine.length() < endLineString.length()
+ || !endLineString.equals(addedLine.substring(addedLine.length()
+ - endLineString.length())));
+
+ return sb.toString();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java?rev=1077515&r1=1077514&r2=1077515&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java Fri Mar 4 04:22:59 2011
@@ -70,6 +70,7 @@ import org.codehaus.jackson.map.Serializ
* about it. See {@code usage()}, below.
*
*/
+@Deprecated
public class HadoopLogsAnalyzer extends Configured implements Tool {
// output streams
@@ -103,14 +104,15 @@ public class HadoopLogsAnalyzer extends
/**
* The regular expression used to parse task attempt IDs in job tracker logs
*/
- private final static Pattern taskAttemptIDPattern = Pattern
- .compile(".*_([0-9]+)");
+ private final static Pattern taskAttemptIDPattern =
+ Pattern.compile(".*_([0-9]+)");
private final static Pattern xmlFilePrefix = Pattern.compile("[ \t]*<");
private final static Pattern confFileHeader = Pattern.compile("_conf.xml!!");
- private final Map<String, Pattern> counterPatterns = new HashMap<String, Pattern>();
+ private final Map<String, Pattern> counterPatterns =
+ new HashMap<String, Pattern>();
/**
* The unpaired job config file. Currently only used to glean the {@code -Xmx}
@@ -188,14 +190,14 @@ public class HadoopLogsAnalyzer extends
private boolean collectTaskTimes = false;
private LogRecordType canonicalJob = LogRecordType.intern("Job");
- private LogRecordType canonicalMapAttempt = LogRecordType
- .intern("MapAttempt");
- private LogRecordType canonicalReduceAttempt = LogRecordType
- .intern("ReduceAttempt");
+ private LogRecordType canonicalMapAttempt =
+ LogRecordType.intern("MapAttempt");
+ private LogRecordType canonicalReduceAttempt =
+ LogRecordType.intern("ReduceAttempt");
private LogRecordType canonicalTask = LogRecordType.intern("Task");
- private static Pattern streamingJobnamePattern = Pattern
- .compile("streamjob\\d+.jar");
+ private static Pattern streamingJobnamePattern =
+ Pattern.compile("streamjob\\d+.jar");
private HashSet<String> hostNames = new HashSet<String>();
@@ -250,8 +252,8 @@ public class HadoopLogsAnalyzer extends
result[i] = new Histogram[LoggedJob.JobType.values().length];
for (int j = 0; j < LoggedJob.JobType.values().length; ++j) {
- result[i][j] = blockname == null ? new Histogram() : new Histogram(
- blockname);
+ result[i][j] =
+ blockname == null ? new Histogram() : new Histogram(blockname);
}
}
@@ -505,8 +507,9 @@ public class HadoopLogsAnalyzer extends
SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
JsonFactory jfactory = jmapper.getJsonFactory();
FileSystem jobFS = jobTraceFilename.getFileSystem(getConf());
- jobTraceGen = jfactory.createJsonGenerator(
- jobFS.create(jobTraceFilename), JsonEncoding.UTF8);
+ jobTraceGen =
+ jfactory.createJsonGenerator(jobFS.create(jobTraceFilename),
+ JsonEncoding.UTF8);
if (prettyprintTrace) {
jobTraceGen.useDefaultPrettyPrinter();
}
@@ -517,8 +520,9 @@ public class HadoopLogsAnalyzer extends
SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
JsonFactory tfactory = tmapper.getJsonFactory();
FileSystem topoFS = topologyFilename.getFileSystem(getConf());
- topologyGen = tfactory.createJsonGenerator(
- topoFS.create(topologyFilename), JsonEncoding.UTF8);
+ topologyGen =
+ tfactory.createJsonGenerator(topoFS.create(topologyFilename),
+ JsonEncoding.UTF8);
topologyGen.useDefaultPrettyPrinter();
}
}
@@ -546,7 +550,7 @@ public class HadoopLogsAnalyzer extends
IOException {
if (input != null) {
input.close();
- LOG.info("File closed: "+currentFileName);
+ LOG.info("File closed: " + currentFileName);
input = null;
}
@@ -573,7 +577,8 @@ public class HadoopLogsAnalyzer extends
+ inputDirectoryFiles.length + ", starts with line " + lineNumber
+ ".");
- input = maybeUncompressedPath(new Path(inputDirectoryPath, currentFileName));
+ input =
+ maybeUncompressedPath(new Path(inputDirectoryPath, currentFileName));
return input != null;
}
@@ -734,8 +739,9 @@ public class HadoopLogsAnalyzer extends
long[] endpointKeys = taskTimes.getCDF(1000, endpoints);
- int smallResultOffset = (taskTimes.getTotalCount() < SMALL_SPREAD_COMPENSATION_THRESHOLD ? 1
- : 0);
+ int smallResultOffset =
+ (taskTimes.getTotalCount() < SMALL_SPREAD_COMPENSATION_THRESHOLD ? 1
+ : 0);
Histogram myTotal = spreadTo[outcome.ordinal()][jtype.ordinal()];
@@ -803,14 +809,15 @@ public class HadoopLogsAnalyzer extends
attemptsInCurrentJob = new HashMap<String, LoggedTaskAttempt>();
// initialize all the per-job statistics gathering places
- successfulMapAttemptTimes = new Histogram[ParsedHost
- .numberOfDistances() + 1];
+ successfulMapAttemptTimes =
+ new Histogram[ParsedHost.numberOfDistances() + 1];
for (int i = 0; i < successfulMapAttemptTimes.length; ++i) {
successfulMapAttemptTimes[i] = new Histogram();
}
successfulReduceAttemptTimes = new Histogram();
- failedMapAttemptTimes = new Histogram[ParsedHost.numberOfDistances() + 1];
+ failedMapAttemptTimes =
+ new Histogram[ParsedHost.numberOfDistances() + 1];
for (int i = 0; i < failedMapAttemptTimes.length; ++i) {
failedMapAttemptTimes[i] = new Histogram();
}
@@ -851,7 +858,8 @@ public class HadoopLogsAnalyzer extends
if (finishTime != null) {
jobBeingTraced.setFinishTime(Long.parseLong(finishTime));
if (status != null) {
- jobBeingTraced.setOutcome(Pre21JobHistoryConstants.Values.valueOf(status));
+ jobBeingTraced.setOutcome(Pre21JobHistoryConstants.Values
+ .valueOf(status));
}
maybeMateJobAndConf();
@@ -890,9 +898,9 @@ public class HadoopLogsAnalyzer extends
if (launchTimeCurrentJob != 0) {
String jobResultText = line.get("JOB_STATUS");
- JobOutcome thisOutcome = ((jobResultText != null && "SUCCESS"
- .equals(jobResultText)) ? JobOutcome.SUCCESS
- : JobOutcome.FAILURE);
+ JobOutcome thisOutcome =
+ ((jobResultText != null && "SUCCESS".equals(jobResultText))
+ ? JobOutcome.SUCCESS : JobOutcome.FAILURE);
if (submitTimeCurrentJob != 0L) {
canonicalDistributionsEnter(delayTimeDists, thisOutcome,
@@ -911,8 +919,8 @@ public class HadoopLogsAnalyzer extends
Histogram currentJobSortTimes = new Histogram();
Histogram currentJobReduceTimes = new Histogram();
- Iterator<Map.Entry<String, Long>> taskIter = taskAttemptStartTimes
- .entrySet().iterator();
+ Iterator<Map.Entry<String, Long>> taskIter =
+ taskAttemptStartTimes.entrySet().iterator();
while (taskIter.hasNext()) {
Map.Entry<String, Long> entry = taskIter.next();
@@ -930,8 +938,8 @@ public class HadoopLogsAnalyzer extends
}
// Reduce processing
- Long shuffleEnd = taskReduceAttemptShuffleEndTimes.get(entry
- .getKey());
+ Long shuffleEnd =
+ taskReduceAttemptShuffleEndTimes.get(entry.getKey());
Long sortEnd = taskReduceAttemptSortEndTimes.get(entry.getKey());
Long reduceEnd = taskReduceAttemptFinishTimes.get(entry.getKey());
@@ -1027,7 +1035,9 @@ public class HadoopLogsAnalyzer extends
Pre21JobHistoryConstants.Values stat;
try {
- stat = status == null ? null : Pre21JobHistoryConstants.Values.valueOf(status);
+ stat =
+ status == null ? null : Pre21JobHistoryConstants.Values
+ .valueOf(status);
} catch (IllegalArgumentException e) {
LOG.error("A task status you don't know about is \"" + status + "\".",
e);
@@ -1037,22 +1047,26 @@ public class HadoopLogsAnalyzer extends
task.setTaskStatus(stat);
try {
- typ = taskType == null ? null : Pre21JobHistoryConstants.Values.valueOf(taskType);
+ typ =
+ taskType == null ? null : Pre21JobHistoryConstants.Values
+ .valueOf(taskType);
} catch (IllegalArgumentException e) {
LOG.error("A task type you don't know about is \"" + taskType + "\".",
e);
typ = null;
}
-
+
if (typ == null) {
return;
}
task.setTaskType(typ);
- List<LoggedTask> vec = typ == Pre21JobHistoryConstants.Values.MAP ? jobBeingTraced
- .getMapTasks() : typ == Pre21JobHistoryConstants.Values.REDUCE ? jobBeingTraced
- .getReduceTasks() : jobBeingTraced.getOtherTasks();
+ List<LoggedTask> vec =
+ typ == Pre21JobHistoryConstants.Values.MAP ? jobBeingTraced
+ .getMapTasks() : typ == Pre21JobHistoryConstants.Values.REDUCE
+ ? jobBeingTraced.getReduceTasks() : jobBeingTraced
+ .getOtherTasks();
if (!taskAlreadyLogged) {
vec.add(task);
@@ -1066,8 +1080,8 @@ public class HadoopLogsAnalyzer extends
Pattern result = counterPatterns.get(counterName);
if (result == null) {
- String namePatternRegex = "\\[\\(" + counterName
- + "\\)\\([^)]+\\)\\(([0-9]+)\\)\\]";
+ String namePatternRegex =
+ "\\[\\(" + counterName + "\\)\\([^)]+\\)\\(([0-9]+)\\)\\]";
result = Pattern.compile(namePatternRegex);
counterPatterns.put(counterName, result);
}
@@ -1253,7 +1267,9 @@ public class HadoopLogsAnalyzer extends
Pre21JobHistoryConstants.Values stat = null;
try {
- stat = status == null ? null : Pre21JobHistoryConstants.Values.valueOf(status);
+ stat =
+ status == null ? null : Pre21JobHistoryConstants.Values
+ .valueOf(status);
} catch (IllegalArgumentException e) {
LOG.error("A map attempt status you don't know about is \"" + status
+ "\".", e);
@@ -1404,7 +1420,9 @@ public class HadoopLogsAnalyzer extends
Pre21JobHistoryConstants.Values stat = null;
try {
- stat = status == null ? null : Pre21JobHistoryConstants.Values.valueOf(status);
+ stat =
+ status == null ? null : Pre21JobHistoryConstants.Values
+ .valueOf(status);
} catch (IllegalArgumentException e) {
LOG.warn("A map attempt status you don't know about is \"" + status
+ "\".", e);
@@ -1632,8 +1650,8 @@ public class HadoopLogsAnalyzer extends
}
for (Map.Entry<Long, Long> ent : successfulNthMapperAttempts) {
- successAfterI[ent.getKey().intValue()] = ((double) ent.getValue())
- / totalSuccessfulAttempts;
+ successAfterI[ent.getKey().intValue()] =
+ ((double) ent.getValue()) / totalSuccessfulAttempts;
}
jobBeingTraced.setMapperTriesToSucceed(successAfterI);
} else {
@@ -1712,8 +1730,9 @@ public class HadoopLogsAnalyzer extends
}
if (spreading) {
- String ratioDescription = "(" + spreadMax + "/1000 %ile) to ("
- + spreadMin + "/1000 %ile) scaled by 1000000";
+ String ratioDescription =
+ "(" + spreadMax + "/1000 %ile) to (" + spreadMin
+ + "/1000 %ile) scaled by 1000000";
printDistributionSet(
"Map task success times " + ratioDescription + ":",
@@ -1737,8 +1756,8 @@ public class HadoopLogsAnalyzer extends
}
if (topologyGen != null) {
- LoggedNetworkTopology topo = new LoggedNetworkTopology(allHosts,
- "<root>", 0);
+ LoggedNetworkTopology topo =
+ new LoggedNetworkTopology(allHosts, "<root>", 0);
topologyGen.writeObject(topo);
topologyGen.close();
}