You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/12/07 20:15:11 UTC
hive git commit: HIVE-15279 : map join dummy operators are not set up
correctly in certain cases with merge join (Sergey Shelukhin,
reviewed by Gunther Hagleitner)
Repository: hive
Updated Branches:
refs/heads/master 8804a7b89 -> d60802d6a
HIVE-15279 : map join dummy operators are not set up correctly in certain cases with merge join (Sergey Shelukhin, reviewed by Gunther Hagleitner)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d60802d6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d60802d6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d60802d6
Branch: refs/heads/master
Commit: d60802d6a0b211dd005219cf2303fdd5e9eb6764
Parents: 8804a7b
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Dec 7 12:09:21 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Dec 7 12:09:21 2016 -0800
----------------------------------------------------------------------
.../hive/ql/exec/HashTableDummyOperator.java | 10 ++++++++++
.../hive/ql/exec/tez/ReduceRecordProcessor.java | 18 +++++++++++++++---
.../hive/ql/optimizer/ReduceSinkMapJoinProc.java | 3 ++-
.../apache/hadoop/hive/ql/parse/GenTezUtils.java | 1 +
.../apache/hadoop/hive/ql/parse/GenTezWork.java | 4 ++++
5 files changed, 32 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/d60802d6/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java
index 0aab7a8..2075d9b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java
@@ -77,4 +77,14 @@ public class HashTableDummyOperator extends Operator<HashTableDummyDesc> impleme
return OperatorType.HASHTABLEDUMMY;
}
+ @Override
+ public boolean equals(Object obj) {
+ return super.equals(obj) || (obj instanceof HashTableDummyOperator)
+ && (((HashTableDummyOperator)obj).operatorId == operatorId);
+ }
+
+ @Override
+ public int hashCode() {
+ return operatorId.hashCode();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d60802d6/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index cf3c8ab..e4c13fb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.tez;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -29,7 +30,6 @@ import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
import org.apache.hadoop.hive.ql.exec.MapredContext;
@@ -41,7 +41,6 @@ import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -123,9 +122,23 @@ public class ReduceRecordProcessor extends RecordProcessor{
connectOps.clear();
ReduceWork redWork = reduceWork;
+ l4j.info("Main work is " + reduceWork.getName());
+ List<HashTableDummyOperator> workOps = reduceWork.getDummyOps();
+ HashSet<HashTableDummyOperator> dummyOps = workOps == null ? null : new HashSet<>(workOps);
tagToReducerMap.put(redWork.getTag(), redWork);
if (mergeWorkList != null) {
for (BaseWork mergeWork : mergeWorkList) {
+ if (l4j.isDebugEnabled()) {
+ l4j.debug("Additional work " + mergeWork.getName());
+ }
+ workOps = mergeWork.getDummyOps();
+ if (workOps != null) {
+ if (dummyOps == null) {
+ dummyOps = new HashSet<>(workOps);
+ } else {
+ dummyOps.addAll(workOps);
+ }
+ }
ReduceWork mergeReduceWork = (ReduceWork) mergeWork;
reducer = mergeReduceWork.getReducer();
// Check immediately after reducer is assigned, in cae the abort came in during
@@ -193,7 +206,6 @@ public class ReduceRecordProcessor extends RecordProcessor{
// Initialization isn't finished until all parents of all operators
// are initialized. For broadcast joins that means initializing the
// dummy parent operators as well.
- List<HashTableDummyOperator> dummyOps = redWork.getDummyOps();
if (dummyOps != null) {
for (HashTableDummyOperator dummyOp : dummyOps) {
// TODO HIVE-14042. Propagating abort to dummyOps.
http://git-wip-us.apache.org/repos/asf/hive/blob/d60802d6/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
index 00afc18..3a6baca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
@@ -370,6 +370,7 @@ public class ReduceSinkMapJoinProc implements NodeProcessor {
// at task startup
if (mapJoinWork != null) {
for (BaseWork myWork: mapJoinWork) {
+ LOG.debug("adding dummy op to work " + myWork.getName() + " from MJ work: " + dummyOp);
myWork.addDummyOp(dummyOp);
}
}
@@ -382,4 +383,4 @@ public class ReduceSinkMapJoinProc implements NodeProcessor {
return true;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/d60802d6/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index fd80e6c..e2363eb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -348,6 +348,7 @@ public class GenTezUtils {
operators.addAll(current.getChildOperators());
}
}
+ LOG.debug("Setting dummy ops for work " + work.getName() + ": " + dummyOps);
work.setDummyOps(dummyOps);
work.replaceRoots(replacementMap);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d60802d6/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
index 461ba37..2b96e51 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
@@ -269,7 +269,11 @@ public class GenTezWork implements NodeProcessor {
if (context.linkOpWithWorkMap.containsKey(mj)) {
Map<BaseWork,TezEdgeProperty> linkWorkMap = context.linkOpWithWorkMap.get(mj);
if (linkWorkMap != null) {
+ // Note: it's not quite clear why this is done inside this if. Seems like it should be on the top level.
if (context.linkChildOpWithDummyOp.containsKey(mj)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding dummy ops to work: " + work.getName() + ": " + context.linkChildOpWithDummyOp.get(mj));
+ }
for (Operator<?> dummy: context.linkChildOpWithDummyOp.get(mj)) {
work.addDummyOp((HashTableDummyOperator) dummy);
}