You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ke...@apache.org on 2012/09/20 03:02:56 UTC
svn commit: r1387833 - in
/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec:
FileSinkOperator.java JoinOperator.java Operator.java
Author: kevinwilfong
Date: Thu Sep 20 01:02:56 2012
New Revision: 1387833
URL: http://svn.apache.org/viewvc?rev=1387833&view=rev
Log:
HIVE-3477. Duplicate data possible with speculative execution for dynamic partitions. (njain via kevinwilfong)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1387833&r1=1387832&r2=1387833&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Thu Sep 20 01:02:56 2012
@@ -744,7 +744,7 @@ public class FileSinkOperator extends Te
}
@Override
- public void jobClose(Configuration hconf, boolean success, JobCloseFeedBack feedBack)
+ public void jobCloseOp(Configuration hconf, boolean success, JobCloseFeedBack feedBack)
throws HiveException {
try {
if ((conf != null) && isNativeTable) {
@@ -755,7 +755,7 @@ public class FileSinkOperator extends Te
} catch (IOException e) {
throw new HiveException(e);
}
- super.jobClose(hconf, success, feedBack);
+ super.jobCloseOp(hconf, success, feedBack);
}
@Override
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=1387833&r1=1387832&r2=1387833&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Thu Sep 20 01:02:56 2012
@@ -152,7 +152,7 @@ public class JoinOperator extends Common
}
@Override
- public void jobClose(Configuration hconf, boolean success, JobCloseFeedBack feedBack)
+ public void jobCloseOp(Configuration hconf, boolean success, JobCloseFeedBack feedBack)
throws HiveException {
int numAliases = conf.getExprs().size();
if (conf.getHandleSkewJoin()) {
@@ -189,7 +189,7 @@ public class JoinOperator extends Common
throw new HiveException(e);
}
}
- super.jobClose(hconf, success, feedBack);
+ super.jobCloseOp(hconf, success, feedBack);
}
private void moveUpFiles(String specPath, Configuration hconf, Log log)
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1387833&r1=1387832&r2=1387833&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Thu Sep 20 01:02:56 2012
@@ -581,6 +581,13 @@ public abstract class Operator<T extends
protected void closeOp(boolean abort) throws HiveException {
}
+ private boolean jobCloseDone = false;
+
+ // Operator specific logic goes here
+ public void jobCloseOp(Configuration conf, boolean success, JobCloseFeedBack feedBack)
+ throws HiveException {
+ }
+
/**
* Unlike other operator interfaces which are called from map or reduce task,
* jobClose is called from the jobclient side once the job has completed.
@@ -592,12 +599,18 @@ public abstract class Operator<T extends
*/
public void jobClose(Configuration conf, boolean success, JobCloseFeedBack feedBack)
throws HiveException {
- if (childOperators == null) {
+ // JobClose has already been performed on this operator
+ if (jobCloseDone) {
return;
}
- for (Operator<? extends OperatorDesc> op : childOperators) {
- op.jobClose(conf, success, feedBack);
+ jobCloseOp(conf, success, feedBack);
+ jobCloseDone = true;
+
+ if (childOperators != null) {
+ for (Operator<? extends OperatorDesc> op : childOperators) {
+ op.jobClose(conf, success, feedBack);
+ }
}
}