You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2009/10/12 21:05:28 UTC
svn commit: r824457 - in /hadoop/hive/trunk: ./
common/src/java/org/apache/hadoop/hive/conf/ conf/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/test/queries/clientnegative/ ql/src/test/queries/clientpositive/
ql/src/test/results/clientnegative/ q...
Author: zshao
Date: Mon Oct 12 19:05:27 2009
New Revision: 824457
URL: http://svn.apache.org/viewvc?rev=824457&view=rev
Log:
HIVE-869. Allow ScriptOperator to consume not all input data. (Paul Yang via zshao)
Added:
hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe1.q
hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe2.q
hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe3.q
hadoop/hive/trunk/ql/src/test/queries/clientpositive/script_pipe.q
hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe1.q.out
hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe2.q.out
hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe3.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/script_pipe.q.out
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hadoop/hive/trunk/conf/hive-default.xml
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=824457&r1=824456&r2=824457&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Mon Oct 12 19:05:27 2009
@@ -62,6 +62,9 @@
HIVE-868. Add last ddl time and dml time for table/partition.
(Namit Jain via zshao)
+ HIVE-869. Allow ScriptOperator to consume not all input data.
+ (Paul Yang via zshao)
+
IMPROVEMENTS
HIVE-760. Add version info to META-INF/MANIFEST.MF.
Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=824457&r1=824456&r2=824457&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Oct 12 19:05:27 2009
@@ -61,6 +61,7 @@
SCRATCHDIR("hive.exec.scratchdir", "/tmp/"+System.getProperty("user.name")+"/hive"),
SUBMITVIACHILD("hive.exec.submitviachild", false),
SCRIPTERRORLIMIT("hive.exec.script.maxerrsize", 100000),
+ ALLOWPARTIALCONSUMP("hive.exec.script.allow.partial.consumption", false),
COMPRESSRESULT("hive.exec.compress.output", false),
COMPRESSINTERMEDIATE("hive.exec.compress.intermediate", false),
COMPRESSINTERMEDIATECODEC("hive.intermediate.compression.codec", ""),
Modified: hadoop/hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/conf/hive-default.xml?rev=824457&r1=824456&r2=824457&view=diff
==============================================================================
--- hadoop/hive/trunk/conf/hive-default.xml (original)
+++ hadoop/hive/trunk/conf/hive-default.xml Mon Oct 12 19:05:27 2009
@@ -280,6 +280,13 @@
</property>
<property>
+ <name>hive.exec.script.allow.partial.consumption</name>
+ <value>false</value>
+ <description> When enabled, this option allows a user script to exit successfully without consuming all the data from the standard input.
+ </description>
+</property>
+
+<property>
<name>hive.exec.compress.output</name>
<value>false</value>
<description> This controls whether the final outputs of a query (to a local/hdfs file or a hive table) is compressed. The compression codec and other options are determined from hadoop config variables mapred.output.compress* </description>
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=824457&r1=824456&r2=824457&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Mon Oct 12 19:05:27 2009
@@ -70,6 +70,8 @@
transient volatile Throwable scriptError = null;
transient RecordWriter scriptOutWriter;
+ static final String IO_EXCEPTION_BROKEN_PIPE_STRING= "Broken pipe";
+
/**
* Timer to send periodic reports back to the tracker.
*/
@@ -270,6 +272,20 @@
}
}
+ boolean isBrokenPipeException(IOException e) {
+ return (e.getMessage().compareToIgnoreCase(IO_EXCEPTION_BROKEN_PIPE_STRING) == 0);
+ }
+
+ boolean allowPartialConsumption() {
+ return HiveConf.getBoolVar(hconf, HiveConf.ConfVars.ALLOWPARTIALCONSUMP);
+ }
+
+ void displayBrokenPipeInfo() {
+ LOG.info("The script did not consume all input data. This is considered as an error.");
+ LOG.info("set " + HiveConf.ConfVars.ALLOWPARTIALCONSUMP.toString() + "=true; to ignore it.");
+ return;
+ }
+
public void processOp(Object row, int tag) throws HiveException {
if(scriptError != null) {
@@ -285,9 +301,17 @@
serialize_error_count.set(serialize_error_count.get() + 1);
throw new HiveException(e);
} catch (IOException e) {
- LOG.error("Error in writing to script: " + e.getMessage());
- scriptError = e;
- throw new HiveException(e);
+ if(isBrokenPipeException(e) && allowPartialConsumption()) {
+ setDone(true);
+ LOG.warn("Got broken pipe during write: ignoring exception and setting operator to done");
+ } else {
+ LOG.error("Error in writing to script: " + e.getMessage());
+ if(isBrokenPipeException(e)) {
+ displayBrokenPipeInfo();
+ }
+ scriptError = e;
+ throw new HiveException(e);
+ }
}
}
@@ -300,7 +324,18 @@
}
// everything ok. try normal shutdown
try {
- scriptOutWriter.close();
+ try {
+ scriptOutWriter.close();
+ } catch (IOException e) {
+ if(isBrokenPipeException(e) && allowPartialConsumption()) {
+ LOG.warn("Got broken pipe: ignoring exception");
+ } else {
+ if(isBrokenPipeException(e)) {
+ displayBrokenPipeInfo();
+ }
+ throw e;
+ }
+ }
int exitVal = scriptPid.waitFor();
if (exitVal != 0) {
LOG.error("Script failed with code " + exitVal);
Added: hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe1.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe1.q?rev=824457&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe1.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe1.q Mon Oct 12 19:05:27 2009
@@ -0,0 +1,2 @@
+-- Tests exception in ScriptOperator.close() by passing to the operator a small amount of data
+SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp;
Added: hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe2.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe2.q?rev=824457&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe2.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe2.q Mon Oct 12 19:05:27 2009
@@ -0,0 +1,2 @@
+-- Tests exception in ScriptOperator.processOp() by passing extra data needed to fill pipe buffer
+SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src;
Added: hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe3.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe3.q?rev=824457&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe3.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe3.q Mon Oct 12 19:05:27 2009
@@ -0,0 +1,3 @@
+set hive.exec.script.allow.partial.consumption = true;
+-- Test to ensure that a script with a bad error code still fails even with partial consumption
+SELECT TRANSFORM(*) USING 'false' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp;
Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/script_pipe.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/script_pipe.q?rev=824457&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/script_pipe.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/script_pipe.q Mon Oct 12 19:05:27 2009
@@ -0,0 +1,8 @@
+set hive.exec.script.allow.partial.consumption = true;
+-- Tests exception in ScriptOperator.close() by passing to the operator a small amount of data
+EXPLAIN SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp;
+-- Tests exception in ScriptOperator.processOp() by passing extra data needed to fill pipe buffer
+EXPLAIN SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src;
+
+SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp;
+SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src;
Added: hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe1.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe1.q.out?rev=824457&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe1.q.out (added)
+++ hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe1.q.out Mon Oct 12 19:05:27 2009
@@ -0,0 +1,6 @@
+PREHOOK: query: -- Tests exception in ScriptOperator.close() by passing to the operator a small amount of data
+SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/1832401066/10000
+FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask
Added: hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe2.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe2.q.out?rev=824457&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe2.q.out (added)
+++ hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe2.q.out Mon Oct 12 19:05:27 2009
@@ -0,0 +1,6 @@
+PREHOOK: query: -- Tests exception in ScriptOperator.processOp() by passing extra data needed to fill pipe buffer
+SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/588453066/10000
+FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask
Added: hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe3.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe3.q.out?rev=824457&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe3.q.out (added)
+++ hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe3.q.out Mon Oct 12 19:05:27 2009
@@ -0,0 +1,6 @@
+PREHOOK: query: -- Test to ensure that a script with a bad error code still fails even with partial consumption
+SELECT TRANSFORM(*) USING 'false' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/1937270363/10000
+FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask
Added: hadoop/hive/trunk/ql/src/test/results/clientpositive/script_pipe.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientpositive/script_pipe.q.out?rev=824457&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientpositive/script_pipe.q.out (added)
+++ hadoop/hive/trunk/ql/src/test/results/clientpositive/script_pipe.q.out Mon Oct 12 19:05:27 2009
@@ -0,0 +1,144 @@
+PREHOOK: query: -- Tests exception in ScriptOperator.close() by passing to the operator a small amount of data
+EXPLAIN SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp
+PREHOOK: type: QUERY
+POSTHOOK: query: -- Tests exception in ScriptOperator.close() by passing to the operator a small amount of data
+EXPLAIN SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_LIMIT 1))) tmp)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TRANSFORM (TOK_EXPLIST TOK_ALLCOLREF) TOK_SERDE TOK_RECORDWRITER 'true' TOK_SERDE TOK_RECORDREADER (TOK_ALIASLIST a b c))))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ tmp:src
+ TableScan
+ alias: src
+ Select Operator
+ expressions:
+ expr: key
+ type: string
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1
+ Limit
+ Reduce Output Operator
+ sort order:
+ tag: -1
+ value expressions:
+ expr: _col0
+ type: string
+ expr: _col1
+ type: string
+ Reduce Operator Tree:
+ Extract
+ Limit
+ Select Operator
+ expressions:
+ expr: _col0
+ type: string
+ expr: _col1
+ type: string
+ outputColumnNames: _col0, _col1
+ Transform Operator
+ command: true
+ output info:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+PREHOOK: query: -- Tests exception in ScriptOperator.processOp() by passing extra data needed to fill pipe buffer
+EXPLAIN SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src
+PREHOOK: type: QUERY
+POSTHOOK: query: -- Tests exception in ScriptOperator.processOp() by passing extra data needed to fill pipe buffer
+EXPLAIN SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TRANSFORM (TOK_EXPLIST (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value) (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value) (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value) (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value) (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value) (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value)) TOK_SERDE TOK_RECORDWRITER 'head -n 1' TOK_SERDE TOK_RECORDREADER (TOK_ALIASLIST a b c d))))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ src
+ TableScan
+ alias: src
+ Select Operator
+ expressions:
+ expr: key
+ type: string
+ expr: value
+ type: string
+ expr: key
+ type: string
+ expr: value
+ type: string
+ expr: key
+ type: string
+ expr: value
+ type: string
+ expr: key
+ type: string
+ expr: value
+ type: string
+ expr: key
+ type: string
+ expr: value
+ type: string
+ expr: key
+ type: string
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11
+ Transform Operator
+ command: head -n 1
+ output info:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+PREHOOK: query: SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/1755225837/10000
+POSTHOOK: query: SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/1755225837/10000
+PREHOOK: query: SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/501105556/10000
+POSTHOOK: query: SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/501105556/10000
+238 val_238 238 val_238