You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2013/07/29 14:45:52 UTC
svn commit: r1508033 - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
conf/hive-default.xml.template
ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
Author: brock
Date: Mon Jul 29 12:45:52 2013
New Revision: 1508033
URL: http://svn.apache.org/r1508033
Log:
HIVE-305: Port Hadoop streaming's counters/status reporters to Hive Transforms (Guo Hongjie, Edward Capriolo via Brock Noland)
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/conf/hive-default.xml.template
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1508033&r1=1508032&r2=1508033&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Jul 29 12:45:52 2013
@@ -182,6 +182,8 @@ public class HiveConf extends Configurat
SUBMITVIACHILD("hive.exec.submitviachild", false),
SCRIPTERRORLIMIT("hive.exec.script.maxerrsize", 100000),
ALLOWPARTIALCONSUMP("hive.exec.script.allow.partial.consumption", false),
+ STREAMREPORTERPERFIX("stream.stderr.reporter.prefix", "reporter:"),
+ STREAMREPORTERENABLED("stream.stderr.reporter.enabled", true),
COMPRESSRESULT("hive.exec.compress.output", false),
COMPRESSINTERMEDIATE("hive.exec.compress.intermediate", false),
COMPRESSINTERMEDIATECODEC("hive.intermediate.compression.codec", ""),
Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1508033&r1=1508032&r2=1508033&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Mon Jul 29 12:45:52 2013
@@ -896,6 +896,18 @@
</property>
<property>
+ <name>stream.stderr.reporter.prefix</name>
+ <value>reporter:</value>
+ <description>Streaming jobs that log to stardard error with this prefix can log counter or status information.</description>
+</property>
+
+<property>
+ <name>stream.stderr.reporter.enabled</name>
+ <value>true</value>
+ <description>Enable consumption of status and counter messages for streaming jobs.</description>
+</property>
+
+<property>
<name>hive.script.recordwriter</name>
<value>org.apache.hadoop.hive.ql.exec.TextRecordWriter</value>
<description>The default record writer for writing data to the user scripts. </description>
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=1508033&r1=1508032&r2=1508033&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Mon Jul 29 12:45:52 2013
@@ -46,6 +46,7 @@ import org.apache.hadoop.io.BytesWritabl
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
@@ -532,22 +533,68 @@ public class ScriptOperator extends Oper
}
}
+ class CounterStatusProcessor {
+
+ private final String reporterPrefix;
+ private final String counterPrefix;
+ private final String statusPrefix;
+ private final Reporter reporter;
+
+ CounterStatusProcessor(Configuration hconf, Reporter reporter){
+ this.reporterPrefix = HiveConf.getVar(hconf, HiveConf.ConfVars.STREAMREPORTERPERFIX);
+ this.counterPrefix = reporterPrefix + "counter:";
+ this.statusPrefix = reporterPrefix + "status:";
+ this.reporter = reporter;
+ }
+
+ private boolean process(String line) {
+ if (line.startsWith(reporterPrefix)){
+ if (line.startsWith(counterPrefix)){
+ incrCounter(line);
+ }
+ if (line.startsWith(statusPrefix)){
+ setStatus(line);
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private void incrCounter(String line) {
+ String trimmedLine = line.substring(counterPrefix.length()).trim();
+ String[] columns = trimmedLine.split(",");
+ if (columns.length == 3) {
+ try {
+ reporter.incrCounter(columns[0], columns[1], Long.parseLong(columns[2]));
+ } catch (NumberFormatException e) {
+ LOG.warn("Cannot parse counter increment '" + columns[2] +
+ "' from line " + line);
+ }
+ } else {
+ LOG.warn("Cannot parse counter line: " + line);
+ }
+ }
+
+ private void setStatus(String line) {
+ reporter.setStatus(line.substring(statusPrefix.length()).trim());
+ }
+ }
/**
* The processor for stderr stream.
- *
- * TODO: In the future when we move to hadoop 0.18 and above, we should borrow
- * the logic from HadoopStreaming: PipeMapRed.java MRErrorThread to support
- * counters and status updates.
*/
class ErrorStreamProcessor implements StreamProcessor {
private long bytesCopied = 0;
private final long maxBytes;
-
private long lastReportTime;
+ private CounterStatusProcessor counterStatus;
public ErrorStreamProcessor(int maxBytes) {
this.maxBytes = maxBytes;
lastReportTime = 0;
+ if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.STREAMREPORTERENABLED)){
+ counterStatus = new CounterStatusProcessor(hconf, reporter);
+ }
}
public void processLine(Writable line) throws HiveException {
@@ -571,6 +618,14 @@ public class ScriptOperator extends Oper
reporter.progress();
}
+ if (reporter != null) {
+ if (counterStatus != null) {
+ if (counterStatus.process(stringLine)) {
+ return;
+ }
+ }
+ }
+
if ((maxBytes < 0) || (bytesCopied < maxBytes)) {
System.err.println(stringLine);
}
@@ -659,7 +714,7 @@ public class ScriptOperator extends Oper
for (int i = 0; i < inArgs.length; i++) {
finalArgv[wrapComponents.length + i] = inArgs[i];
}
- return (finalArgv);
+ return finalArgv;
}
// Code below shameless borrowed from Hadoop Streaming