You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/05/31 08:09:28 UTC

svn commit: r1598826 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez: MapRecordProcessor.java TezProcessor.java

Author: gunther
Date: Sat May 31 06:09:28 2014
New Revision: 1598826

URL: http://svn.apache.org/r1598826
Log:
HIVE-7112: Tez processor swallows errors (Patch by Gunther Hagleitner, reviewed by Navis and Vikram Dixit K)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1598826&r1=1598825&r2=1598826&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Sat May 31 06:09:28 2014
@@ -207,6 +207,9 @@ public class MapRecordProcessor extends 
 
     // detecting failed executions by exceptions thrown by the operator tree
     try {
+      if (mapOp == null || mapWork == null) {
+        return;
+      }
       mapOp.close(abort);
 
       // Need to close the dummyOps as well. The operator pipeline
@@ -236,5 +239,4 @@ public class MapRecordProcessor extends 
       MapredContext.close();
     }
   }
-
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1598826&r1=1598825&r2=1598826&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Sat May 31 06:09:28 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
@@ -45,8 +46,8 @@ import org.apache.tez.runtime.library.ap
  */
 public class TezProcessor implements LogicalIOProcessor {
 
-  
-  
+
+
   private static final Log LOG = LogFactory.getLog(TezProcessor.class);
   private boolean isMap = false;
 
@@ -74,7 +75,7 @@ public class TezProcessor implements Log
 
   @Override
   public void close() throws IOException {
-    // we have to close in the processor's run method, because tez closes inputs 
+    // we have to close in the processor's run method, because tez closes inputs
     // before calling close (TEZ-955) and we might need to read inputs
     // when we flush the pipeline.
   }
@@ -124,16 +125,16 @@ public class TezProcessor implements Log
   @Override
   public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs)
       throws Exception {
-    
-    Exception processingException = null;
-    
+
+    Throwable originalThrowable = null;
+
     try{
       perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
       // in case of broadcast-join read the broadcast edge inputs
       // (possibly asynchronously)
-      
+
       LOG.info("Running task: " + processorContext.getUniqueIdentifier());
-      
+
       if (isMap) {
         rproc = new MapRecordProcessor();
         MRInputLegacy mrInput = getMRInput(inputs);
@@ -156,29 +157,35 @@ public class TezProcessor implements Log
           LOG.info("Input: " + inputEntry.getKey() + " is already cached. Skipping start");
         }
       }
-      
+
       // Outputs will be started later by the individual Processors.
-      
+
       MRTaskReporter mrReporter = new MRTaskReporter(processorContext);
       rproc.init(jobConf, processorContext, mrReporter, inputs, outputs);
       rproc.run();
 
       //done - output does not need to be committed as hive does not use outputcommitter
       perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
-    } catch (Exception e) {
-      processingException = e;
+    } catch (Throwable t) {
+      originalThrowable = t;
     } finally {
+      if (originalThrowable != null && originalThrowable instanceof Error) {
+        LOG.error(StringUtils.stringifyException(originalThrowable));
+        throw new RuntimeException(originalThrowable);
+      }
+
       try {
         if(rproc != null){
           rproc.close();
         }
-      } catch (Exception e) {
-        if (processingException == null) {
-          processingException = e;
+      } catch (Throwable t) {
+        if (originalThrowable == null) {
+          originalThrowable = t;
         }
       }
-      if (processingException != null) {
-        throw processingException;
+      if (originalThrowable != null) {
+        LOG.error(StringUtils.stringifyException(originalThrowable));
+        throw new RuntimeException(originalThrowable);
       }
     }
   }
@@ -186,7 +193,7 @@ public class TezProcessor implements Log
   /**
    * KVOutputCollector. OutputCollector that writes using KVWriter.
    * Must be initialized before it is used.
-   * 
+   *
    */
   static class TezKVOutputCollector implements OutputCollector {
     private KeyValueWriter writer;