You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2013/07/29 14:45:52 UTC

svn commit: r1508033 - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java conf/hive-default.xml.template ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java

Author: brock
Date: Mon Jul 29 12:45:52 2013
New Revision: 1508033

URL: http://svn.apache.org/r1508033
Log:
HIVE-305: Port Hadoop streaming's counters/status reporters to Hive Transforms (Guo Hongjie, Edward Capriolo via Brock Noland)

Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/conf/hive-default.xml.template
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1508033&r1=1508032&r2=1508033&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Jul 29 12:45:52 2013
@@ -182,6 +182,8 @@ public class HiveConf extends Configurat
     SUBMITVIACHILD("hive.exec.submitviachild", false),
     SCRIPTERRORLIMIT("hive.exec.script.maxerrsize", 100000),
     ALLOWPARTIALCONSUMP("hive.exec.script.allow.partial.consumption", false),
+    STREAMREPORTERPERFIX("stream.stderr.reporter.prefix", "reporter:"),
+    STREAMREPORTERENABLED("stream.stderr.reporter.enabled", true),
     COMPRESSRESULT("hive.exec.compress.output", false),
     COMPRESSINTERMEDIATE("hive.exec.compress.intermediate", false),
     COMPRESSINTERMEDIATECODEC("hive.intermediate.compression.codec", ""),

Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1508033&r1=1508032&r2=1508033&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Mon Jul 29 12:45:52 2013
@@ -896,6 +896,18 @@
 </property>
 
 <property>
+  <name>stream.stderr.reporter.prefix</name>
+  <value>reporter:</value>
+  <description>Streaming jobs that log to stardard error with this prefix can log counter or status information.</description>
+</property>
+
+<property>
+  <name>stream.stderr.reporter.enabled</name>
+  <value>true</value>
+  <description>Enable consumption of status and counter messages for streaming jobs.</description>
+</property>
+
+<property>
   <name>hive.script.recordwriter</name>
   <value>org.apache.hadoop.hive.ql.exec.TextRecordWriter</value>
   <description>The default record writer for writing data to the user scripts. </description>

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=1508033&r1=1508032&r2=1508033&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Mon Jul 29 12:45:52 2013
@@ -46,6 +46,7 @@ import org.apache.hadoop.io.BytesWritabl
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 
@@ -532,22 +533,68 @@ public class ScriptOperator extends Oper
     }
   }
 
+  class CounterStatusProcessor {
+
+    private final String reporterPrefix;
+    private final String counterPrefix;
+    private final String statusPrefix;
+    private final Reporter reporter;
+
+    CounterStatusProcessor(Configuration hconf, Reporter reporter){
+      this.reporterPrefix = HiveConf.getVar(hconf, HiveConf.ConfVars.STREAMREPORTERPERFIX);
+      this.counterPrefix = reporterPrefix + "counter:";
+      this.statusPrefix = reporterPrefix + "status:";
+      this.reporter = reporter;
+    }
+
+    private boolean process(String line) {
+      if (line.startsWith(reporterPrefix)){
+        if (line.startsWith(counterPrefix)){
+          incrCounter(line);
+        }
+        if (line.startsWith(statusPrefix)){
+          setStatus(line);
+        }
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    private void incrCounter(String line) {
+      String  trimmedLine = line.substring(counterPrefix.length()).trim();
+      String[] columns = trimmedLine.split(",");
+      if (columns.length == 3) {
+        try {
+          reporter.incrCounter(columns[0], columns[1], Long.parseLong(columns[2]));
+        } catch (NumberFormatException e) {
+            LOG.warn("Cannot parse counter increment '" + columns[2] +
+                "' from line " + line);
+        }
+      } else {
+        LOG.warn("Cannot parse counter line: " + line);
+      }
+    }
+
+    private void setStatus(String line) {
+      reporter.setStatus(line.substring(statusPrefix.length()).trim());
+    }
+  }
   /**
    * The processor for stderr stream.
-   *
-   * TODO: In the future when we move to hadoop 0.18 and above, we should borrow
-   * the logic from HadoopStreaming: PipeMapRed.java MRErrorThread to support
-   * counters and status updates.
    */
   class ErrorStreamProcessor implements StreamProcessor {
     private long bytesCopied = 0;
     private final long maxBytes;
-
     private long lastReportTime;
+    private CounterStatusProcessor counterStatus;
 
     public ErrorStreamProcessor(int maxBytes) {
       this.maxBytes = maxBytes;
       lastReportTime = 0;
+      if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.STREAMREPORTERENABLED)){
+        counterStatus = new CounterStatusProcessor(hconf, reporter);
+      }
     }
 
     public void processLine(Writable line) throws HiveException {
@@ -571,6 +618,14 @@ public class ScriptOperator extends Oper
         reporter.progress();
       }
 
+      if (reporter != null) {
+        if (counterStatus != null) {
+          if (counterStatus.process(stringLine)) {
+            return;
+          }
+        }
+      }
+
       if ((maxBytes < 0) || (bytesCopied < maxBytes)) {
         System.err.println(stringLine);
       }
@@ -659,7 +714,7 @@ public class ScriptOperator extends Oper
     for (int i = 0; i < inArgs.length; i++) {
       finalArgv[wrapComponents.length + i] = inArgs[i];
     }
-    return (finalArgv);
+    return finalArgv;
   }
 
   // Code below shameless borrowed from Hadoop Streaming