You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/06/04 22:18:05 UTC
svn commit: r663366 - in /hadoop/core/trunk: ./
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
src/contrib/streaming/src/test/org/apache/hadoop/streaming/
Author: omalley
Date: Wed Jun 4 13:18:05 2008
New Revision: 663366
URL: http://svn.apache.org/viewvc?rev=663366&view=rev
Log:
HADOOP-1328. Implement user counters in streaming. Contributed by Tom White.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=663366&r1=663365&r2=663366&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Jun 4 13:18:05 2008
@@ -136,6 +136,9 @@
HADOOP-3177. Implement Syncable interface for FileSystem.
(Tsz Wo (Nicholas), SZE via dhruba)
+ HADOOP-1328. Implement user counters in streaming. (tomwhite via
+ omalley)
+
IMPROVEMENTS
HADOOP-2928. Remove deprecated FileSystem.getContentLength().
Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=663366&r1=663365&r2=663366&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Wed Jun 4 13:18:05 2008
@@ -405,6 +405,9 @@
class MRErrorThread extends Thread {
public MRErrorThread() {
+ this.reporterPrefix = job_.get("stream.stderr.reporter.prefix", "reporter:");
+ this.counterPrefix = reporterPrefix + "counter:";
+ this.statusPrefix = reporterPrefix + "status:";
setDaemon(true);
}
@@ -418,7 +421,18 @@
try {
lineReader = new LineReader((InputStream)clientErr_, job_);
while (lineReader.readLine(line) > 0) {
- System.err.println(line.toString());
+ String lineStr = line.toString();
+ if (matchesReporter(lineStr)) {
+ if (matchesCounter(lineStr)) {
+ incrCounter(lineStr);
+ } else if (matchesStatus(lineStr)) {
+ setStatus(lineStr);
+ } else {
+ LOG.warn("Cannot parse reporter line: " + lineStr);
+ }
+ } else {
+ System.err.println(lineStr);
+ }
long now = System.currentTimeMillis();
if (reporter != null && now-lastStderrReport > reporterErrDelay_) {
lastStderrReport = now;
@@ -450,8 +464,44 @@
}
}
}
+
+ private boolean matchesReporter(String line) {
+ return line.startsWith(reporterPrefix);
+ }
+
+ private boolean matchesCounter(String line) {
+ return line.startsWith(counterPrefix);
+ }
+
+ private boolean matchesStatus(String line) {
+ return line.startsWith(statusPrefix);
+ }
+
+ private void incrCounter(String line) {
+ String trimmedLine = line.substring(counterPrefix.length()).trim();
+ String[] columns = trimmedLine.split(",");
+ if (columns.length == 3) {
+ try {
+ reporter.incrCounter(columns[0], columns[1],
+ Long.parseLong(columns[2]));
+ } catch (NumberFormatException e) {
+ LOG.warn("Cannot parse counter increment '" + columns[2] +
+ "' from line: " + line);
+ }
+ } else {
+ LOG.warn("Cannot parse counter line: " + line);
+ }
+ }
+
+ private void setStatus(String line) {
+ reporter.setStatus(line.substring(statusPrefix.length()).trim());
+ }
+
long lastStderrReport = 0;
volatile Reporter reporter;
+ private final String reporterPrefix;
+ private final String counterPrefix;
+ private final String statusPrefix;
}
public void mapRedFinished() {
Modified: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java?rev=663366&r1=663365&r2=663366&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java Wed Jun 4 13:18:05 2008
@@ -20,10 +20,6 @@
import junit.framework.TestCase;
import java.io.*;
-import java.util.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
/**
* This class tests hadoopStreaming in MapReduce local mode.
@@ -73,7 +69,7 @@
};
}
- public void testCommandLine()
+ public void testCommandLine() throws IOException
{
try {
try {
@@ -94,8 +90,6 @@
System.err.println("outEx1=" + outputExpect);
System.err.println(" out1=" + output);
assertEquals(outputExpect, output);
- } catch(Exception e) {
- failTrace(e);
} finally {
File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
INPUT_FILE.delete();
@@ -104,13 +98,6 @@
}
}
- private void failTrace(Exception e)
- {
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- fail(sw.toString());
- }
-
public static void main(String[]args) throws Exception
{
new TestStreaming().testCommandLine();
Modified: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java?rev=663366&r1=663365&r2=663366&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java Wed Jun 4 13:18:05 2008
@@ -91,6 +91,7 @@
while ((line = in.readLine()) != null) {
String out = line.replace(find, replace);
System.out.println(out);
+ System.err.println("reporter:counter:UserCounters,InputLines,1");
}
}