You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/06/04 22:18:05 UTC

svn commit: r663366 - in /hadoop/core/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/

Author: omalley
Date: Wed Jun  4 13:18:05 2008
New Revision: 663366

URL: http://svn.apache.org/viewvc?rev=663366&view=rev
Log:
HADOOP-1328. Implement user counters in streaming. Contributed by Tom White.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
    hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
    hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=663366&r1=663365&r2=663366&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Jun  4 13:18:05 2008
@@ -136,6 +136,9 @@
     HADOOP-3177. Implement Syncable interface for FileSystem.
     (Tsz Wo (Nicholas), SZE via dhruba)
 
+    HADOOP-1328. Implement user counters in streaming. (tomwhite via
+    omalley)
+
   IMPROVEMENTS
    
     HADOOP-2928. Remove deprecated FileSystem.getContentLength().

Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=663366&r1=663365&r2=663366&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Wed Jun  4 13:18:05 2008
@@ -405,6 +405,9 @@
   class MRErrorThread extends Thread {
 
     public MRErrorThread() {
+      this.reporterPrefix = job_.get("stream.stderr.reporter.prefix", "reporter:");
+      this.counterPrefix = reporterPrefix + "counter:";
+      this.statusPrefix = reporterPrefix + "status:";
       setDaemon(true);
     }
     
@@ -418,7 +421,18 @@
       try {
         lineReader = new LineReader((InputStream)clientErr_, job_);
         while (lineReader.readLine(line) > 0) {
-          System.err.println(line.toString());
+          String lineStr = line.toString();
+          if (matchesReporter(lineStr)) {
+            if (matchesCounter(lineStr)) {
+              incrCounter(lineStr);
+            } else if (matchesStatus(lineStr)) {
+              setStatus(lineStr);
+            } else {
+              LOG.warn("Cannot parse reporter line: " + lineStr);
+            }
+          } else {
+            System.err.println(lineStr);
+          }
           long now = System.currentTimeMillis(); 
           if (reporter != null && now-lastStderrReport > reporterErrDelay_) {
             lastStderrReport = now;
@@ -450,8 +464,44 @@
         }
       }
     }
+    
+    private boolean matchesReporter(String line) {
+      return line.startsWith(reporterPrefix);
+    }
+
+    private boolean matchesCounter(String line) {
+      return line.startsWith(counterPrefix);
+    }
+
+    private boolean matchesStatus(String line) {
+      return line.startsWith(statusPrefix);
+    }
+
+    private void incrCounter(String line) {
+      String trimmedLine = line.substring(counterPrefix.length()).trim();
+      String[] columns = trimmedLine.split(",");
+      if (columns.length == 3) {
+        try {
+          reporter.incrCounter(columns[0], columns[1],
+              Long.parseLong(columns[2]));
+        } catch (NumberFormatException e) {
+          LOG.warn("Cannot parse counter increment '" + columns[2] +
+              "' from line: " + line);
+        }
+      } else {
+        LOG.warn("Cannot parse counter line: " + line);
+      }
+    }
+
+    private void setStatus(String line) {
+      reporter.setStatus(line.substring(statusPrefix.length()).trim());
+    }
+    
     long lastStderrReport = 0;
     volatile Reporter reporter;
+    private final String reporterPrefix;
+    private final String counterPrefix;
+    private final String statusPrefix;
   }
 
   public void mapRedFinished() {

Modified: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java?rev=663366&r1=663365&r2=663366&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java Wed Jun  4 13:18:05 2008
@@ -20,10 +20,6 @@
 
 import junit.framework.TestCase;
 import java.io.*;
-import java.util.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 /**
  * This class tests hadoopStreaming in MapReduce local mode.
@@ -73,7 +69,7 @@
     };
   }
   
-  public void testCommandLine()
+  public void testCommandLine() throws IOException
   {
     try {
       try {
@@ -94,8 +90,6 @@
       System.err.println("outEx1=" + outputExpect);
       System.err.println("  out1=" + output);
       assertEquals(outputExpect, output);
-    } catch(Exception e) {
-      failTrace(e);
     } finally {
       File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
       INPUT_FILE.delete();
@@ -104,13 +98,6 @@
     }
   }
 
-  private void failTrace(Exception e)
-  {
-    StringWriter sw = new StringWriter();
-    e.printStackTrace(new PrintWriter(sw));
-    fail(sw.toString());
-  }
-
   public static void main(String[]args) throws Exception
   {
     new TestStreaming().testCommandLine();

Modified: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java?rev=663366&r1=663365&r2=663366&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java Wed Jun  4 13:18:05 2008
@@ -91,6 +91,7 @@
     while ((line = in.readLine()) != null) {
       String out = line.replace(find, replace);
       System.out.println(out);
+      System.err.println("reporter:counter:UserCounters,InputLines,1");
     }
   }