You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/03/24 19:42:30 UTC
svn commit: r1580984 -
/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
Author: gunther
Date: Mon Mar 24 18:42:30 2014
New Revision: 1580984
URL: http://svn.apache.org/r1580984
Log:
HIVE-6700: In some queries inputs are closed on Tez before the operator pipeline is flushed (Gunther Hagleitner, reviewed by Vikram Dixit K and Siddharth Seth)
Modified:
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1580984&r1=1580983&r2=1580984&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Mon Mar 24 18:42:30 2014
@@ -74,9 +74,9 @@ public class TezProcessor implements Log
@Override
public void close() throws IOException {
- if(rproc != null){
- rproc.close();
- }
+ // we have to close in the processor's run method, because tez closes inputs
+ // before calling close (TEZ-955) and we might need to read inputs
+ // when we flush the pipeline.
}
@Override
@@ -123,42 +123,48 @@ public class TezProcessor implements Log
@Override
public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs)
throws Exception {
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
- // in case of broadcast-join read the broadcast edge inputs
- // (possibly asynchronously)
-
- LOG.info("Running task: " + processorContext.getUniqueIdentifier());
-
- if (isMap) {
- rproc = new MapRecordProcessor();
- MRInputLegacy mrInput = getMRInput(inputs);
- try {
- mrInput.init();
- } catch (IOException e) {
- throw new RuntimeException("Failed while initializing MRInput", e);
+ try{
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
+ // in case of broadcast-join read the broadcast edge inputs
+ // (possibly asynchronously)
+
+ LOG.info("Running task: " + processorContext.getUniqueIdentifier());
+
+ if (isMap) {
+ rproc = new MapRecordProcessor();
+ MRInputLegacy mrInput = getMRInput(inputs);
+ try {
+ mrInput.init();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed while initializing MRInput", e);
+ }
+ } else {
+ rproc = new ReduceRecordProcessor();
}
- } else {
- rproc = new ReduceRecordProcessor();
- }
- TezCacheAccess cacheAccess = TezCacheAccess.createInstance(jobConf);
- // Start the actual Inputs. After MRInput initialization.
- for (Entry<String, LogicalInput> inputEntry : inputs.entrySet()) {
- if (!cacheAccess.isInputCached(inputEntry.getKey())) {
- inputEntry.getValue().start();
- } else {
- LOG.info("Input: " + inputEntry.getKey() + " is already cached. Skipping start");
+ TezCacheAccess cacheAccess = TezCacheAccess.createInstance(jobConf);
+ // Start the actual Inputs. After MRInput initialization.
+ for (Entry<String, LogicalInput> inputEntry : inputs.entrySet()) {
+ if (!cacheAccess.isInputCached(inputEntry.getKey())) {
+ inputEntry.getValue().start();
+ } else {
+ LOG.info("Input: " + inputEntry.getKey() + " is already cached. Skipping start");
+ }
+ }
+
+ // Outputs will be started later by the individual Processors.
+
+ MRTaskReporter mrReporter = new MRTaskReporter(processorContext);
+ rproc.init(jobConf, processorContext, mrReporter, inputs, outputs);
+ rproc.run();
+
+ //done - output does not need to be committed as hive does not use outputcommitter
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
+ } finally {
+ if(rproc != null){
+ rproc.close();
}
}
-
- // Outputs will be started later by the individual Processors.
-
- MRTaskReporter mrReporter = new MRTaskReporter(processorContext);
- rproc.init(jobConf, processorContext, mrReporter, inputs, outputs);
- rproc.run();
-
- //done - output does not need to be committed as hive does not use outputcommitter
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
}
/**