You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ke...@apache.org on 2012/09/20 03:02:56 UTC

svn commit: r1387833 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec: FileSinkOperator.java JoinOperator.java Operator.java

Author: kevinwilfong
Date: Thu Sep 20 01:02:56 2012
New Revision: 1387833

URL: http://svn.apache.org/viewvc?rev=1387833&view=rev
Log:
HIVE-3477. Duplicate data possible with speculative execution for dynamic partitions. (njain via kevinwilfong)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1387833&r1=1387832&r2=1387833&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Thu Sep 20 01:02:56 2012
@@ -744,7 +744,7 @@ public class FileSinkOperator extends Te
   }
 
   @Override
-  public void jobClose(Configuration hconf, boolean success, JobCloseFeedBack feedBack)
+  public void jobCloseOp(Configuration hconf, boolean success, JobCloseFeedBack feedBack)
       throws HiveException {
     try {
       if ((conf != null) && isNativeTable) {
@@ -755,7 +755,7 @@ public class FileSinkOperator extends Te
     } catch (IOException e) {
       throw new HiveException(e);
     }
-    super.jobClose(hconf, success, feedBack);
+    super.jobCloseOp(hconf, success, feedBack);
   }
 
   @Override

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=1387833&r1=1387832&r2=1387833&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Thu Sep 20 01:02:56 2012
@@ -152,7 +152,7 @@ public class JoinOperator extends Common
   }
 
   @Override
-  public void jobClose(Configuration hconf, boolean success, JobCloseFeedBack feedBack)
+  public void jobCloseOp(Configuration hconf, boolean success, JobCloseFeedBack feedBack)
       throws HiveException {
     int numAliases = conf.getExprs().size();
     if (conf.getHandleSkewJoin()) {
@@ -189,7 +189,7 @@ public class JoinOperator extends Common
         throw new HiveException(e);
       }
     }
-    super.jobClose(hconf, success, feedBack);
+    super.jobCloseOp(hconf, success, feedBack);
   }
 
   private void moveUpFiles(String specPath, Configuration hconf, Log log)

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1387833&r1=1387832&r2=1387833&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Thu Sep 20 01:02:56 2012
@@ -581,6 +581,13 @@ public abstract class Operator<T extends
   protected void closeOp(boolean abort) throws HiveException {
   }
 
+  private boolean jobCloseDone = false;
+
+  // Operator specific logic goes here
+  public void jobCloseOp(Configuration conf, boolean success, JobCloseFeedBack feedBack)
+      throws HiveException {
+  }
+
   /**
    * Unlike other operator interfaces which are called from map or reduce task,
    * jobClose is called from the jobclient side once the job has completed.
@@ -592,12 +599,18 @@ public abstract class Operator<T extends
    */
   public void jobClose(Configuration conf, boolean success, JobCloseFeedBack feedBack)
       throws HiveException {
-    if (childOperators == null) {
+    // JobClose has already been performed on this operator
+    if (jobCloseDone) {
       return;
     }
 
-    for (Operator<? extends OperatorDesc> op : childOperators) {
-      op.jobClose(conf, success, feedBack);
+    jobCloseOp(conf, success, feedBack);
+    jobCloseDone = true;
+
+    if (childOperators != null) {
+      for (Operator<? extends OperatorDesc> op : childOperators) {
+        op.jobClose(conf, success, feedBack);
+      }
     }
   }