You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/05/31 08:09:28 UTC
svn commit: r1598826 - in
/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez:
MapRecordProcessor.java TezProcessor.java
Author: gunther
Date: Sat May 31 06:09:28 2014
New Revision: 1598826
URL: http://svn.apache.org/r1598826
Log:
HIVE-7112: Tez processor swallows errors (Patch by Gunther Hagleitner, reviewed by Navis and Vikram Dixit K)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1598826&r1=1598825&r2=1598826&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Sat May 31 06:09:28 2014
@@ -207,6 +207,9 @@ public class MapRecordProcessor extends
// detecting failed executions by exceptions thrown by the operator tree
try {
+ if (mapOp == null || mapWork == null) {
+ return;
+ }
mapOp.close(abort);
// Need to close the dummyOps as well. The operator pipeline
@@ -236,5 +239,4 @@ public class MapRecordProcessor extends
MapredContext.close();
}
}
-
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1598826&r1=1598825&r2=1598826&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Sat May 31 06:09:28 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.util.StringUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
@@ -45,8 +46,8 @@ import org.apache.tez.runtime.library.ap
*/
public class TezProcessor implements LogicalIOProcessor {
-
-
+
+
private static final Log LOG = LogFactory.getLog(TezProcessor.class);
private boolean isMap = false;
@@ -74,7 +75,7 @@ public class TezProcessor implements Log
@Override
public void close() throws IOException {
- // we have to close in the processor's run method, because tez closes inputs
+ // we have to close in the processor's run method, because tez closes inputs
// before calling close (TEZ-955) and we might need to read inputs
// when we flush the pipeline.
}
@@ -124,16 +125,16 @@ public class TezProcessor implements Log
@Override
public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs)
throws Exception {
-
- Exception processingException = null;
-
+
+ Throwable originalThrowable = null;
+
try{
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
// in case of broadcast-join read the broadcast edge inputs
// (possibly asynchronously)
-
+
LOG.info("Running task: " + processorContext.getUniqueIdentifier());
-
+
if (isMap) {
rproc = new MapRecordProcessor();
MRInputLegacy mrInput = getMRInput(inputs);
@@ -156,29 +157,35 @@ public class TezProcessor implements Log
LOG.info("Input: " + inputEntry.getKey() + " is already cached. Skipping start");
}
}
-
+
// Outputs will be started later by the individual Processors.
-
+
MRTaskReporter mrReporter = new MRTaskReporter(processorContext);
rproc.init(jobConf, processorContext, mrReporter, inputs, outputs);
rproc.run();
//done - output does not need to be committed as hive does not use outputcommitter
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
- } catch (Exception e) {
- processingException = e;
+ } catch (Throwable t) {
+ originalThrowable = t;
} finally {
+ if (originalThrowable != null && originalThrowable instanceof Error) {
+ LOG.error(StringUtils.stringifyException(originalThrowable));
+ throw new RuntimeException(originalThrowable);
+ }
+
try {
if(rproc != null){
rproc.close();
}
- } catch (Exception e) {
- if (processingException == null) {
- processingException = e;
+ } catch (Throwable t) {
+ if (originalThrowable == null) {
+ originalThrowable = t;
}
}
- if (processingException != null) {
- throw processingException;
+ if (originalThrowable != null) {
+ LOG.error(StringUtils.stringifyException(originalThrowable));
+ throw new RuntimeException(originalThrowable);
}
}
}
@@ -186,7 +193,7 @@ public class TezProcessor implements Log
/**
* KVOutputCollector. OutputCollector that writes using KVWriter.
* Must be initialized before it is used.
- *
+ *
*/
static class TezKVOutputCollector implements OutputCollector {
private KeyValueWriter writer;