You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2009/10/12 21:05:28 UTC

svn commit: r824457 - in /hadoop/hive/trunk: ./ common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/test/queries/clientnegative/ ql/src/test/queries/clientpositive/ ql/src/test/results/clientnegative/ q...

Author: zshao
Date: Mon Oct 12 19:05:27 2009
New Revision: 824457

URL: http://svn.apache.org/viewvc?rev=824457&view=rev
Log:
HIVE-869. Allow ScriptOperator to consume not all input data. (Paul Yang via zshao)

Added:
    hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe1.q
    hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe2.q
    hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe3.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/script_pipe.q
    hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe3.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/script_pipe.q.out
Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hadoop/hive/trunk/conf/hive-default.xml
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=824457&r1=824456&r2=824457&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Mon Oct 12 19:05:27 2009
@@ -62,6 +62,9 @@
     HIVE-868. Add last ddl time and dml time for table/partition.
     (Namit Jain via zshao)
 
+    HIVE-869. Allow ScriptOperator to consume not all input data.
+    (Paul Yang via zshao)
+
   IMPROVEMENTS
 
     HIVE-760. Add version info to META-INF/MANIFEST.MF.

Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=824457&r1=824456&r2=824457&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Oct 12 19:05:27 2009
@@ -61,6 +61,7 @@
     SCRATCHDIR("hive.exec.scratchdir", "/tmp/"+System.getProperty("user.name")+"/hive"),
     SUBMITVIACHILD("hive.exec.submitviachild", false),
     SCRIPTERRORLIMIT("hive.exec.script.maxerrsize", 100000),
+    ALLOWPARTIALCONSUMP("hive.exec.script.allow.partial.consumption", false),
     COMPRESSRESULT("hive.exec.compress.output", false),
     COMPRESSINTERMEDIATE("hive.exec.compress.intermediate", false),
     COMPRESSINTERMEDIATECODEC("hive.intermediate.compression.codec", ""),

Modified: hadoop/hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/conf/hive-default.xml?rev=824457&r1=824456&r2=824457&view=diff
==============================================================================
--- hadoop/hive/trunk/conf/hive-default.xml (original)
+++ hadoop/hive/trunk/conf/hive-default.xml Mon Oct 12 19:05:27 2009
@@ -280,6 +280,13 @@
 </property>
 
 <property>
+  <name>hive.exec.script.allow.partial.consumption</name>
+  <value>false</value>
+  <description> When enabled, this option allows a user script to exit successfully without consuming all the data from the standard input.
+  </description>
+</property>
+
+<property>
   <name>hive.exec.compress.output</name>
   <value>false</value>
   <description> This controls whether the final outputs of a query (to a local/hdfs file or a hive table) is compressed. The compression codec and other options are determined from hadoop config variables mapred.output.compress* </description>

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=824457&r1=824456&r2=824457&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Mon Oct 12 19:05:27 2009
@@ -70,6 +70,8 @@
   transient volatile Throwable scriptError = null;
   transient RecordWriter scriptOutWriter;
 
+  static final String IO_EXCEPTION_BROKEN_PIPE_STRING= "Broken pipe";
+  
   /**
    * Timer to send periodic reports back to the tracker.
    */
@@ -270,6 +272,20 @@
     }
   }
 
+  boolean isBrokenPipeException(IOException e) {
+    return (e.getMessage().compareToIgnoreCase(IO_EXCEPTION_BROKEN_PIPE_STRING) == 0);
+  }
+  
+  boolean allowPartialConsumption() {
+    return HiveConf.getBoolVar(hconf, HiveConf.ConfVars.ALLOWPARTIALCONSUMP);
+  }
+  
+  void displayBrokenPipeInfo() {
+    LOG.info("The script did not consume all input data. This is considered as an error."); 
+    LOG.info("set " + HiveConf.ConfVars.ALLOWPARTIALCONSUMP.toString() + "=true; to ignore it.");
+    return;
+  }
+  
   public void processOp(Object row, int tag) throws HiveException {
 
     if(scriptError != null) {
@@ -285,9 +301,17 @@
       serialize_error_count.set(serialize_error_count.get() + 1);
       throw new HiveException(e);
     } catch (IOException e) {
-      LOG.error("Error in writing to script: " + e.getMessage());
-      scriptError = e;
-      throw new HiveException(e);
+      if(isBrokenPipeException(e) && allowPartialConsumption()) {
+        setDone(true);
+        LOG.warn("Got broken pipe during write: ignoring exception and setting operator to done");
+      } else {
+        LOG.error("Error in writing to script: " + e.getMessage());
+        if(isBrokenPipeException(e)) {
+          displayBrokenPipeInfo();
+        }
+        scriptError = e;
+        throw new HiveException(e);
+      }
     }
   }
 
@@ -300,7 +324,18 @@
       }
       // everything ok. try normal shutdown
       try {
-        scriptOutWriter.close();
+        try {
+          scriptOutWriter.close();
+        } catch (IOException e) {
+          if(isBrokenPipeException(e) && allowPartialConsumption()) {
+            LOG.warn("Got broken pipe: ignoring exception");
+          } else {
+            if(isBrokenPipeException(e)) {
+              displayBrokenPipeInfo();
+            }
+            throw e;
+          }
+        }
         int exitVal = scriptPid.waitFor();
         if (exitVal != 0) {
           LOG.error("Script failed with code " + exitVal);

Added: hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe1.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe1.q?rev=824457&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe1.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe1.q Mon Oct 12 19:05:27 2009
@@ -0,0 +1,2 @@
+-- Tests exception in ScriptOperator.close() by passing to the operator a small amount of data
+SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp;

Added: hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe2.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe2.q?rev=824457&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe2.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe2.q Mon Oct 12 19:05:27 2009
@@ -0,0 +1,2 @@
+-- Tests exception in ScriptOperator.processOp() by passing extra data needed to fill pipe buffer
+SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src;

Added: hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe3.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe3.q?rev=824457&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe3.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientnegative/script_broken_pipe3.q Mon Oct 12 19:05:27 2009
@@ -0,0 +1,3 @@
+set hive.exec.script.allow.partial.consumption = true;
+-- Test to ensure that a script with a bad error code still fails even with partial consumption
+SELECT TRANSFORM(*) USING 'false' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp;

Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/script_pipe.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/script_pipe.q?rev=824457&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/script_pipe.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/script_pipe.q Mon Oct 12 19:05:27 2009
@@ -0,0 +1,8 @@
+set hive.exec.script.allow.partial.consumption = true;
+-- Tests exception in ScriptOperator.close() by passing to the operator a small amount of data
+EXPLAIN SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp;
+-- Tests exception in ScriptOperator.processOp() by passing extra data needed to fill pipe buffer
+EXPLAIN SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src;
+
+SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp;
+SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src;

Added: hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe1.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe1.q.out?rev=824457&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe1.q.out (added)
+++ hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe1.q.out Mon Oct 12 19:05:27 2009
@@ -0,0 +1,6 @@
+PREHOOK: query: -- Tests exception in ScriptOperator.close() by passing to the operator a small amount of data
+SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/1832401066/10000
+FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask

Added: hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe2.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe2.q.out?rev=824457&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe2.q.out (added)
+++ hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe2.q.out Mon Oct 12 19:05:27 2009
@@ -0,0 +1,6 @@
+PREHOOK: query: -- Tests exception in ScriptOperator.processOp() by passing extra data needed to fill pipe buffer
+SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/588453066/10000
+FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask

Added: hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe3.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe3.q.out?rev=824457&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe3.q.out (added)
+++ hadoop/hive/trunk/ql/src/test/results/clientnegative/script_broken_pipe3.q.out Mon Oct 12 19:05:27 2009
@@ -0,0 +1,6 @@
+PREHOOK: query: -- Test to ensure that a script with a bad error code still fails even with partial consumption
+SELECT TRANSFORM(*) USING 'false' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/1937270363/10000
+FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask

Added: hadoop/hive/trunk/ql/src/test/results/clientpositive/script_pipe.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientpositive/script_pipe.q.out?rev=824457&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientpositive/script_pipe.q.out (added)
+++ hadoop/hive/trunk/ql/src/test/results/clientpositive/script_pipe.q.out Mon Oct 12 19:05:27 2009
@@ -0,0 +1,144 @@
+PREHOOK: query: -- Tests exception in ScriptOperator.close() by passing to the operator a small amount of data
+EXPLAIN SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp
+PREHOOK: type: QUERY
+POSTHOOK: query: -- Tests exception in ScriptOperator.close() by passing to the operator a small amount of data
+EXPLAIN SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_LIMIT 1))) tmp)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TRANSFORM (TOK_EXPLIST TOK_ALLCOLREF) TOK_SERDE TOK_RECORDWRITER 'true' TOK_SERDE TOK_RECORDREADER (TOK_ALIASLIST a b c))))))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        tmp:src 
+          TableScan
+            alias: src
+            Select Operator
+              expressions:
+                    expr: key
+                    type: string
+                    expr: value
+                    type: string
+              outputColumnNames: _col0, _col1
+              Limit
+                Reduce Output Operator
+                  sort order: 
+                  tag: -1
+                  value expressions:
+                        expr: _col0
+                        type: string
+                        expr: _col1
+                        type: string
+      Reduce Operator Tree:
+        Extract
+          Limit
+            Select Operator
+              expressions:
+                    expr: _col0
+                    type: string
+                    expr: _col1
+                    type: string
+              outputColumnNames: _col0, _col1
+              Transform Operator
+                command: true
+                output info:
+                    input format: org.apache.hadoop.mapred.TextInputFormat
+                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                File Output Operator
+                  compressed: false
+                  GlobalTableId: 0
+                  table:
+                      input format: org.apache.hadoop.mapred.TextInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+
+
+PREHOOK: query: -- Tests exception in ScriptOperator.processOp() by passing extra data needed to fill pipe buffer
+EXPLAIN SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src
+PREHOOK: type: QUERY
+POSTHOOK: query: -- Tests exception in ScriptOperator.processOp() by passing extra data needed to fill pipe buffer
+EXPLAIN SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TRANSFORM (TOK_EXPLIST (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value) (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value) (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value) (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value) (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value) (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value)) TOK_SERDE TOK_RECORDWRITER 'head -n 1' TOK_SERDE TOK_RECORDREADER (TOK_ALIASLIST a b c d))))))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        src 
+          TableScan
+            alias: src
+            Select Operator
+              expressions:
+                    expr: key
+                    type: string
+                    expr: value
+                    type: string
+                    expr: key
+                    type: string
+                    expr: value
+                    type: string
+                    expr: key
+                    type: string
+                    expr: value
+                    type: string
+                    expr: key
+                    type: string
+                    expr: value
+                    type: string
+                    expr: key
+                    type: string
+                    expr: value
+                    type: string
+                    expr: key
+                    type: string
+                    expr: value
+                    type: string
+              outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11
+              Transform Operator
+                command: head -n 1
+                output info:
+                    input format: org.apache.hadoop.mapred.TextInputFormat
+                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                File Output Operator
+                  compressed: false
+                  GlobalTableId: 0
+                  table:
+                      input format: org.apache.hadoop.mapred.TextInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+
+
+PREHOOK: query: SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/1755225837/10000
+POSTHOOK: query: SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/1755225837/10000
+PREHOOK: query: SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/501105556/10000
+POSTHOOK: query: SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/501105556/10000
+238	val_238	238	val_238