You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/03/24 18:57:24 UTC

svn commit: r1580942 - /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java

Author: gunther
Date: Mon Mar 24 17:57:24 2014
New Revision: 1580942

URL: http://svn.apache.org/r1580942
Log:
HIVE-6700: In some queries inputs are closed on Tez before the operator pipeline is flushed (Gunther Hagleitner, reviewed by Vikram Dixit K and Siddharth Seth)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1580942&r1=1580941&r2=1580942&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Mon Mar 24 17:57:24 2014
@@ -74,9 +74,9 @@ public class TezProcessor implements Log
 
   @Override
   public void close() throws IOException {
-    if(rproc != null){
-      rproc.close();
-    }
+    // we have to close in the processor's run method, because tez closes inputs 
+    // before calling close (TEZ-955) and we might need to read inputs
+    // when we flush the pipeline.
   }
 
   @Override
@@ -123,42 +123,48 @@ public class TezProcessor implements Log
   @Override
   public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs)
       throws Exception {
-    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
-    // in case of broadcast-join read the broadcast edge inputs
-    // (possibly asynchronously)
-
-    LOG.info("Running task: " + processorContext.getUniqueIdentifier());
-
-    if (isMap) {
-      rproc = new MapRecordProcessor();
-      MRInputLegacy mrInput = getMRInput(inputs);
-      try {
-        mrInput.init();
-      } catch (IOException e) {
-        throw new RuntimeException("Failed while initializing MRInput", e);
+    try{
+      perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
+      // in case of broadcast-join read the broadcast edge inputs
+      // (possibly asynchronously)
+      
+      LOG.info("Running task: " + processorContext.getUniqueIdentifier());
+      
+      if (isMap) {
+        rproc = new MapRecordProcessor();
+        MRInputLegacy mrInput = getMRInput(inputs);
+        try {
+          mrInput.init();
+        } catch (IOException e) {
+          throw new RuntimeException("Failed while initializing MRInput", e);
+        }
+      } else {
+        rproc = new ReduceRecordProcessor();
       }
-    } else {
-      rproc = new ReduceRecordProcessor();
-    }
 
-    TezCacheAccess cacheAccess = TezCacheAccess.createInstance(jobConf);
-    // Start the actual Inputs. After MRInput initialization.
-    for (Entry<String, LogicalInput> inputEntry : inputs.entrySet()) {
-      if (!cacheAccess.isInputCached(inputEntry.getKey())) {
-        inputEntry.getValue().start();
-      } else {
-        LOG.info("Input: " + inputEntry.getKey() + " is already cached. Skipping start");
+      TezCacheAccess cacheAccess = TezCacheAccess.createInstance(jobConf);
+      // Start the actual Inputs. After MRInput initialization.
+      for (Entry<String, LogicalInput> inputEntry : inputs.entrySet()) {
+        if (!cacheAccess.isInputCached(inputEntry.getKey())) {
+          inputEntry.getValue().start();
+        } else {
+          LOG.info("Input: " + inputEntry.getKey() + " is already cached. Skipping start");
+        }
+      }
+      
+      // Outputs will be started later by the individual Processors.
+      
+      MRTaskReporter mrReporter = new MRTaskReporter(processorContext);
+      rproc.init(jobConf, processorContext, mrReporter, inputs, outputs);
+      rproc.run();
+
+      //done - output does not need to be committed as hive does not use outputcommitter
+      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
+    } finally {
+      if(rproc != null){
+        rproc.close();
       }
     }
-
-    // Outputs will be started later by the individual Processors.
-
-    MRTaskReporter mrReporter = new MRTaskReporter(processorContext);
-    rproc.init(jobConf, processorContext, mrReporter, inputs, outputs);
-    rproc.run();
-
-    //done - output does not need to be committed as hive does not use outputcommitter
-    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
   }
 
   /**