You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2017/10/24 20:16:14 UTC
[7/7] hive git commit: HIVE-14731: Use Tez cartesian product edge in
Hive (unpartitioned case only) (Zhiyuan Yang via Gunther Hagleitner)
HIVE-14731: Use Tez cartesian product edge in Hive (unpartitioned case only) (Zhiyuan Yang via Gunther Hagleitner)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cfbe6125
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cfbe6125
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cfbe6125
Branch: refs/heads/master
Commit: cfbe6125725223657dff1e2c9bc3131a5193ae51
Parents: a284df1
Author: Gunther Hagleitner <gu...@apache.org>
Authored: Tue Oct 24 13:06:09 2017 -0700
Committer: Gunther Hagleitner <gu...@apache.org>
Committed: Tue Oct 24 13:06:09 2017 -0700
----------------------------------------------------------------------
.../hadoop/hive/common/jsonexplain/Vertex.java | 2 +-
.../common/jsonexplain/tez/TezJsonParser.java | 2 +
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 +
data/conf/llap/hive-site.xml | 5 +
data/conf/tez/hive-site.xml | 5 +
.../test/resources/testconfiguration.properties | 6 +
.../hadoop/hive/ql/exec/tez/DagUtils.java | 69 +-
.../apache/hadoop/hive/ql/exec/tez/TezTask.java | 5 +-
.../hive/ql/optimizer/ConvertJoinMapJoin.java | 74 +-
.../optimizer/physical/CrossProductCheck.java | 368 ---
.../optimizer/physical/CrossProductHandler.java | 382 +++
.../optimizer/physical/PhysicalOptimizer.java | 2 +-
.../physical/SparkCrossProductCheck.java | 12 +-
.../hadoop/hive/ql/parse/TezCompiler.java | 4 +-
.../hadoop/hive/ql/plan/TezEdgeProperty.java | 4 +-
.../hadoop/hive/ql/exec/tez/TestTezTask.java | 4 +-
.../test/queries/clientpositive/cross_prod_1.q | 34 +
.../test/queries/clientpositive/cross_prod_3.q | 13 +
.../test/queries/clientpositive/cross_prod_4.q | 10 +
.../dynamic_partition_pruning_2.q | 2 +-
.../clientpositive/hybridgrace_hashjoin_1.q | 1 +
.../queries/clientpositive/subquery_multi.q | 4 +-
.../queries/clientpositive/subquery_notin.q | 4 +-
.../queries/clientpositive/subquery_select.q | 4 +-
.../clientpositive/llap/auto_join0.q.out | 56 +-
.../clientpositive/llap/auto_join_filters.q.out | 4 +-
.../clientpositive/llap/auto_join_nulls.q.out | 2 +-
.../llap/auto_sortmerge_join_12.q.out | 64 +-
.../clientpositive/llap/cross_join.q.out | 94 +-
.../clientpositive/llap/cross_prod_1.q.out | 2502 ++++++++++++++++++
.../clientpositive/llap/cross_prod_3.q.out | 133 +
.../clientpositive/llap/cross_prod_4.q.out | 195 ++
.../llap/cross_product_check_1.q.out | 12 +-
.../llap/cross_product_check_2.q.out | 305 ++-
.../results/clientpositive/llap/cte_5.q.out | 10 +-
.../results/clientpositive/llap/cte_mat_1.q.out | 10 +-
.../results/clientpositive/llap/cte_mat_2.q.out | 10 +-
.../llap/dynamic_partition_pruning.q.out | 81 +-
.../llap/dynamic_partition_pruning_2.q.out | 52 +-
.../llap/dynamic_semijoin_reduction_sw.q.out | 2 +-
.../clientpositive/llap/explainuser_1.q.out | 30 +-
.../llap/hybridgrace_hashjoin_1.q.out | 166 +-
.../clientpositive/llap/jdbc_handler.q.out | 2 +-
.../results/clientpositive/llap/join0.q.out | 2 +-
.../clientpositive/llap/leftsemijoin.q.out | 2 +-
.../results/clientpositive/llap/mapjoin2.q.out | 2 +-
.../clientpositive/llap/mapjoin_hint.q.out | 64 +-
.../clientpositive/llap/subquery_exists.q.out | 6 +-
.../clientpositive/llap/subquery_in.q.out | 2 +-
.../clientpositive/llap/subquery_multi.q.out | 106 +-
.../clientpositive/llap/subquery_notin.q.out | 107 +-
.../clientpositive/llap/subquery_null_agg.q.out | 2 +-
.../clientpositive/llap/subquery_scalar.q.out | 48 +-
.../clientpositive/llap/subquery_select.q.out | 103 +-
.../clientpositive/llap/tez_self_join.q.out | 2 +-
.../llap/vector_between_columns.q.out | 155 +-
.../llap/vector_complex_all.q.out | 92 +-
.../llap/vector_groupby_mapjoin.q.out | 113 +-
.../llap/vector_include_no_sel.q.out | 99 +-
.../llap/vector_join_filters.q.out | 2 +-
.../clientpositive/llap/vector_join_nulls.q.out | 2 +-
.../vectorized_dynamic_partition_pruning.q.out | 97 +-
.../llap/vectorized_multi_output_select.q.out | 58 +-
.../clientpositive/spark/subquery_multi.q.out | 80 +-
.../clientpositive/spark/subquery_notin.q.out | 106 +-
.../clientpositive/spark/subquery_select.q.out | 84 +-
.../tez/hybridgrace_hashjoin_1.q.out | 164 +-
67 files changed, 4670 insertions(+), 1576 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java
index b7dc88c..a73893f 100644
--- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java
@@ -74,7 +74,7 @@ public final class Vertex implements Comparable<Vertex>{
public VertexType vertexType;
public static enum EdgeType {
- BROADCAST, SHUFFLE, MULTICAST, PARTITION_ONLY_SHUFFLE, FORWARD, UNKNOWN
+ BROADCAST, SHUFFLE, MULTICAST, PARTITION_ONLY_SHUFFLE, FORWARD, XPROD_EDGE, UNKNOWN
};
public String edgeType;
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java
index 69e5358..b6cca10 100644
--- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java
@@ -36,6 +36,8 @@ public class TezJsonParser extends DagJsonParser {
return "MULTICAST";
case "ONE_TO_ONE_EDGE":
return "FORWARD";
+ case "XPROD_EDGE":
+ return "XPROD_EDGE";
default:
return "UNKNOWN";
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 62dcbd5..875e781 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3028,6 +3028,8 @@ public class HiveConf extends Configuration {
0.5f, "The maximum fraction of JVM memory which Tez will reserve for the processor"),
TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION("hive.tez.task.scale.memory.reserve.fraction",
-1f, "The customized fraction of JVM memory which Tez will reserve for the processor"),
+ TEZ_CARTESIAN_PRODUCT_EDGE_ENABLED("hive.tez.cartesian-product.enabled",
+ false, "Use Tez cartesian product edge to speed up cross product"),
// The default is different on the client and server, so it's null here.
LLAP_IO_ENABLED("hive.llap.io.enabled", null, "Whether the LLAP IO layer is enabled."),
LLAP_IO_TRACE_SIZE("hive.llap.io.trace.size", "2Mb",
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/data/conf/llap/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/llap/hive-site.xml b/data/conf/llap/hive-site.xml
index 870b584..8cd5144 100644
--- a/data/conf/llap/hive-site.xml
+++ b/data/conf/llap/hive-site.xml
@@ -338,4 +338,9 @@
<value>true</value>
</property>
+<property>
+ <name>hive.tez.cartesian-product.enabled</name>
+ <value>true</value>
+</property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/data/conf/tez/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/tez/hive-site.xml b/data/conf/tez/hive-site.xml
index 35e8c99..f1dabf5 100644
--- a/data/conf/tez/hive-site.xml
+++ b/data/conf/tez/hive-site.xml
@@ -283,4 +283,9 @@
<value>true</value>
</property>
+<property>
+ <name>hive.tez.cartesian-product.enabled</name>
+ <value>true</value>
+</property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index a081638..c338826 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -139,6 +139,9 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\
count_dist_rewrite.q,\
create_merge_compressed.q,\
cross_join.q,\
+ cross_prod_1.q,\
+ cross_prod_3.q,\
+ cross_prod_4.q,\
cross_product_check_1.q,\
cross_product_check_2.q,\
ctas.q,\
@@ -508,6 +511,9 @@ minillaplocal.query.files=\
correlationoptimizer4.q,\
correlationoptimizer6.q,\
disable_merge_for_bucketing.q,\
+ cross_prod_1.q,\
+ cross_prod_3.q,\
+ cross_prod_4.q,\
dynamic_partition_pruning.q,\
dynamic_semijoin_reduction.q,\
dynamic_semijoin_reduction_2.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index aae3480..5c338b8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -46,6 +46,9 @@ import org.apache.commons.lang.StringUtils;
import org.apache.tez.mapreduce.common.MRInputSplitDistributor;
import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
+import org.apache.tez.runtime.library.api.Partitioner;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductConfig;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductEdgeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -135,6 +138,7 @@ import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductVertexManager;
/**
* DagUtils. DagUtils is a collection of helper methods to convert
@@ -264,7 +268,7 @@ public class DagUtils {
*/
@SuppressWarnings("rawtypes")
public GroupInputEdge createEdge(VertexGroup group, JobConf vConf, Vertex w,
- TezEdgeProperty edgeProp, VertexType vertexType)
+ TezEdgeProperty edgeProp, BaseWork work, TezWork tezWork)
throws IOException {
Class mergeInputClass;
@@ -279,7 +283,8 @@ public class DagUtils {
case CUSTOM_EDGE: {
mergeInputClass = ConcatenatedMergedKeyValueInput.class;
int numBuckets = edgeProp.getNumBuckets();
- CustomVertexConfiguration vertexConf = new CustomVertexConfiguration(numBuckets, vertexType);
+ CustomVertexConfiguration vertexConf
+ = new CustomVertexConfiguration(numBuckets, tezWork.getVertexType(work));
DataOutputBuffer dob = new DataOutputBuffer();
vertexConf.write(dob);
VertexManagerPluginDescriptor desc =
@@ -299,6 +304,10 @@ public class DagUtils {
mergeInputClass = ConcatenatedMergedKeyValueInput.class;
break;
+ case XPROD_EDGE:
+ mergeInputClass = ConcatenatedMergedKeyValueInput.class;
+ break;
+
case SIMPLE_EDGE:
setupAutoReducerParallelism(edgeProp, w);
// fall through
@@ -308,7 +317,7 @@ public class DagUtils {
break;
}
- return GroupInputEdge.create(group, w, createEdgeProperty(edgeProp, vConf),
+ return GroupInputEdge.create(group, w, createEdgeProperty(w, edgeProp, vConf, work, tezWork),
InputDescriptor.create(mergeInputClass.getName()));
}
@@ -322,13 +331,14 @@ public class DagUtils {
* @return
*/
public Edge createEdge(JobConf vConf, Vertex v, Vertex w, TezEdgeProperty edgeProp,
- VertexType vertexType)
+ BaseWork work, TezWork tezWork)
throws IOException {
switch(edgeProp.getEdgeType()) {
case CUSTOM_EDGE: {
int numBuckets = edgeProp.getNumBuckets();
- CustomVertexConfiguration vertexConf = new CustomVertexConfiguration(numBuckets, vertexType);
+ CustomVertexConfiguration vertexConf =
+ new CustomVertexConfiguration(numBuckets, tezWork.getVertexType(work));
DataOutputBuffer dob = new DataOutputBuffer();
vertexConf.write(dob);
VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create(
@@ -339,6 +349,9 @@ public class DagUtils {
w.setVertexManagerPlugin(desc);
break;
}
+ case XPROD_EDGE:
+ break;
+
case SIMPLE_EDGE: {
setupAutoReducerParallelism(edgeProp, w);
break;
@@ -352,14 +365,15 @@ public class DagUtils {
// nothing
}
- return Edge.create(v, w, createEdgeProperty(edgeProp, vConf));
+ return Edge.create(v, w, createEdgeProperty(w, edgeProp, vConf, work, tezWork));
}
/*
* Helper function to create an edge property from an edge type.
*/
- private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration conf)
- throws IOException {
+ private EdgeProperty createEdgeProperty(Vertex w, TezEdgeProperty edgeProp,
+ Configuration conf, BaseWork work, TezWork tezWork)
+ throws IOException {
MRHelpers.translateMRConfToTez(conf);
String keyClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS);
String valClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS);
@@ -412,7 +426,23 @@ public class DagUtils {
.setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null)
.build();
return et4Conf.createDefaultOneToOneEdgeProperty();
+ case XPROD_EDGE:
+ EdgeManagerPluginDescriptor edgeManagerDescriptor =
+ EdgeManagerPluginDescriptor.create(CartesianProductEdgeManager.class.getName());
+ List<String> crossProductSources = new ArrayList<>();
+ for (BaseWork parentWork : tezWork.getParents(work)) {
+ if (EdgeType.XPROD_EDGE == tezWork.getEdgeType(parentWork, work)) {
+ crossProductSources.add(parentWork.getName());
+ }
+ }
+ CartesianProductConfig cpConfig = new CartesianProductConfig(crossProductSources);
+ edgeManagerDescriptor.setUserPayload(cpConfig.toUserPayload(new TezConfiguration(conf)));
+ UnorderedPartitionedKVEdgeConfig cpEdgeConf =
+ UnorderedPartitionedKVEdgeConfig.newBuilder(keyClass, valClass,
+ ValueHashPartitioner.class.getName()).build();
+ return cpEdgeConf.createDefaultCustomEdgeProperty(edgeManagerDescriptor);
case SIMPLE_EDGE:
+ // fallthrough
default:
assert partitionerClassName != null;
partitionerConf = createPartitionerConf(partitionerClassName, conf);
@@ -427,6 +457,14 @@ public class DagUtils {
}
}
+ public static class ValueHashPartitioner implements Partitioner {
+
+ @Override
+ public int getPartition(Object key, Object value, int numPartitions) {
+ return (value.hashCode() & 2147483647) % numPartitions;
+ }
+ }
+
/**
* Utility method to create a stripped down configuration for the MR partitioner.
*
@@ -1240,6 +1278,21 @@ public class DagUtils {
} else if (work instanceof MergeJoinWork) {
v = createVertex(conf, (MergeJoinWork) work, appJarLr, additionalLr, fileSystem, scratchDir,
ctx, vertexType);
+ // set VertexManagerPlugin if whether it's a cross product destination vertex
+ List<String> crossProductSources = new ArrayList<>();
+ for (BaseWork parentWork : tezWork.getParents(work)) {
+ if (tezWork.getEdgeType(parentWork, work) == EdgeType.XPROD_EDGE) {
+ crossProductSources.add(parentWork.getName());
+ }
+ }
+
+ if (!crossProductSources.isEmpty()) {
+ CartesianProductConfig cpConfig = new CartesianProductConfig(crossProductSources);
+ v.setVertexManagerPlugin(
+ VertexManagerPluginDescriptor.create(CartesianProductVertexManager.class.getName())
+ .setUserPayload(cpConfig.toUserPayload(new TezConfiguration(conf))));
+ // parallelism shouldn't be set for cartesian product vertex
+ }
} else {
// something is seriously wrong if this is happening
throw new HiveException(ErrorMsg.GENERIC_ERROR.getErrorCodedMsg());
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index c3a2a2b..a1b7cfb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -477,7 +477,7 @@ public class TezTask extends Task<TezWork> {
for (BaseWork v: children) {
// finally we can create the grouped edge
GroupInputEdge e = utils.createEdge(group, parentConf,
- workToVertex.get(v), work.getEdgeProperty(w, v), work.getVertexType(v));
+ workToVertex.get(v), work.getEdgeProperty(w, v), v, work);
dag.addEdge(e);
}
@@ -506,8 +506,7 @@ public class TezTask extends Task<TezWork> {
Edge e = null;
TezEdgeProperty edgeProp = work.getEdgeProperty(w, v);
-
- e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp, work.getVertexType(v));
+ e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp, v, work);
dag.addEdge(e);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index 53d34bb..9175597 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -102,6 +102,14 @@ public class ConvertJoinMapJoin implements NodeProcessor {
MemoryMonitorInfo memoryMonitorInfo = getMemoryMonitorInfo(maxSize, context.conf);
joinOp.getConf().setMemoryMonitorInfo(memoryMonitorInfo);
+ // not use map join in case of cross product
+ boolean cartesianProductEdgeEnabled =
+ HiveConf.getBoolVar(context.conf, HiveConf.ConfVars.TEZ_CARTESIAN_PRODUCT_EDGE_ENABLED);
+ if (cartesianProductEdgeEnabled && !hasOuterJoin(joinOp) && isCrossProduct(joinOp)) {
+ fallbackToMergeJoin(joinOp, context);
+ return null;
+ }
+
TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
boolean hiveConvertJoin = context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN) &
!context.parseContext.getDisableMapJoin();
@@ -614,6 +622,42 @@ public class ConvertJoinMapJoin implements NodeProcessor {
return false;
}
+ private boolean hasOuterJoin(JoinOperator joinOp) throws SemanticException {
+ boolean hasOuter = false;
+ for (JoinCondDesc joinCondDesc : joinOp.getConf().getConds()) {
+ switch (joinCondDesc.getType()) {
+ case JoinDesc.INNER_JOIN:
+ case JoinDesc.LEFT_SEMI_JOIN:
+ case JoinDesc.UNIQUE_JOIN:
+ hasOuter = false;
+ break;
+
+ case JoinDesc.FULL_OUTER_JOIN:
+ case JoinDesc.LEFT_OUTER_JOIN:
+ case JoinDesc.RIGHT_OUTER_JOIN:
+ hasOuter = true;
+ break;
+
+ default:
+ throw new SemanticException("Unknown join type " + joinCondDesc.getType());
+ }
+ }
+ return hasOuter;
+ }
+
+ private boolean isCrossProduct(JoinOperator joinOp) {
+ ExprNodeDesc[][] joinExprs = joinOp.getConf().getJoinKeys();
+ if (joinExprs != null) {
+ for (ExprNodeDesc[] expr : joinExprs) {
+ if (expr != null && expr.length != 0) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
/**
* Obtain big table position for join.
*
@@ -639,26 +683,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
* case this for now.
*/
if (joinOp.getConf().getConds().length > 1) {
- boolean hasOuter = false;
- for (JoinCondDesc joinCondDesc : joinOp.getConf().getConds()) {
- switch (joinCondDesc.getType()) {
- case JoinDesc.INNER_JOIN:
- case JoinDesc.LEFT_SEMI_JOIN:
- case JoinDesc.UNIQUE_JOIN:
- hasOuter = false;
- break;
-
- case JoinDesc.FULL_OUTER_JOIN:
- case JoinDesc.LEFT_OUTER_JOIN:
- case JoinDesc.RIGHT_OUTER_JOIN:
- hasOuter = true;
- break;
-
- default:
- throw new SemanticException("Unknown join type " + joinCondDesc.getType());
- }
- }
- if (hasOuter) {
+ if (hasOuterJoin(joinOp)) {
return -1;
}
}
@@ -1100,14 +1125,19 @@ public class ConvertJoinMapJoin implements NodeProcessor {
}
}
+ // we are just converting to a common merge join operator. The shuffle
+ // join in map-reduce case.
+ fallbackToMergeJoin(joinOp, context);
+ }
+
+ private void fallbackToMergeJoin(JoinOperator joinOp, OptimizeTezProcContext context)
+ throws SemanticException {
int pos = getMapJoinConversionPos(joinOp, context, estimateNumBuckets(joinOp, false),
true, Long.MAX_VALUE, false);
if (pos < 0) {
LOG.info("Could not get a valid join position. Defaulting to position 0");
pos = 0;
}
- // we are just converting to a common merge join operator. The shuffle
- // join in map-reduce case.
LOG.info("Fallback to common merge join operator");
convertJoinSMBJoin(joinOp, context, pos, 0, false);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java
deleted file mode 100644
index 4b35bb6..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.optimizer.physical;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Stack;
-import java.util.TreeMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.ConditionalTask;
-import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
-import org.apache.hadoop.hive.ql.exec.JoinOperator;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
-import org.apache.hadoop.hive.ql.exec.tez.TezTask;
-import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
-import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
-import org.apache.hadoop.hive.ql.lib.Dispatcher;
-import org.apache.hadoop.hive.ql.lib.GraphWalker;
-import org.apache.hadoop.hive.ql.lib.Node;
-import org.apache.hadoop.hive.ql.lib.NodeProcessor;
-import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.lib.Rule;
-import org.apache.hadoop.hive.ql.lib.RuleRegExp;
-import org.apache.hadoop.hive.ql.lib.TaskGraphWalker;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.BaseWork;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
-import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
-import org.apache.hadoop.hive.ql.plan.ReduceWork;
-import org.apache.hadoop.hive.ql.plan.TableScanDesc;
-import org.apache.hadoop.hive.ql.plan.TezWork;
-import org.apache.hadoop.hive.ql.session.SessionState;
-
-/*
- * Check each MapJoin and ShuffleJoin Operator to see they are performing a cross product.
- * If yes, output a warning to the Session's console.
- * The Checks made are the following:
- * 1. MR, Shuffle Join:
- * Check the parent ReduceSinkOp of the JoinOp. If its keys list is size = 0, then
- * this is a cross product.
- * The parent ReduceSinkOp is in the MapWork for the same Stage.
- * 2. MR, MapJoin:
- * If the keys expr list on the mapJoin Desc is an empty list for any input,
- * this implies a cross product.
- * 3. Tez, Shuffle Join:
- * Check the parent ReduceSinkOp of the JoinOp. If its keys list is size = 0, then
- * this is a cross product.
- * The parent ReduceSinkOp checked is based on the ReduceWork.tagToInput map on the
- * reduceWork that contains the JoinOp.
- * 4. Tez, Map Join:
- * If the keys expr list on the mapJoin Desc is an empty list for any input,
- * this implies a cross product.
- */
-public class CrossProductCheck implements PhysicalPlanResolver, Dispatcher {
-
- protected static transient final Logger LOG = LoggerFactory
- .getLogger(CrossProductCheck.class);
-
- @Override
- public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException {
- TaskGraphWalker ogw = new TaskGraphWalker(this);
-
- ArrayList<Node> topNodes = new ArrayList<Node>();
- topNodes.addAll(pctx.getRootTasks());
-
- ogw.startWalking(topNodes, null);
- return pctx;
- }
-
- @Override
- public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
- throws SemanticException {
- @SuppressWarnings("unchecked")
- Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd;
- if (currTask instanceof MapRedTask) {
- MapRedTask mrTsk = (MapRedTask)currTask;
- MapredWork mrWrk = mrTsk.getWork();
- checkMapJoins(mrTsk);
- checkMRReducer(currTask.toString(), mrWrk);
- } else if (currTask instanceof ConditionalTask ) {
- List<Task<? extends Serializable>> taskListInConditionalTask =
- ((ConditionalTask) currTask).getListTasks();
- for(Task<? extends Serializable> tsk: taskListInConditionalTask){
- dispatch(tsk, stack, nodeOutputs);
- }
-
- } else if (currTask instanceof TezTask) {
- TezTask tzTask = (TezTask) currTask;
- TezWork tzWrk = tzTask.getWork();
- checkMapJoins(tzWrk);
- checkTezReducer(tzWrk);
- }
- return null;
- }
-
- private void warn(String msg) {
- SessionState.getConsole().printInfo("Warning: " + msg, false);
- }
-
- private void checkMapJoins(MapRedTask mrTsk) throws SemanticException {
- MapredWork mrWrk = mrTsk.getWork();
- MapWork mapWork = mrWrk.getMapWork();
- List<String> warnings = new MapJoinCheck(mrTsk.toString()).analyze(mapWork);
- if (!warnings.isEmpty()) {
- for (String w : warnings) {
- warn(w);
- }
- }
- ReduceWork redWork = mrWrk.getReduceWork();
- if (redWork != null) {
- warnings = new MapJoinCheck(mrTsk.toString()).analyze(redWork);
- if (!warnings.isEmpty()) {
- for (String w : warnings) {
- warn(w);
- }
- }
- }
- }
-
- private void checkMapJoins(TezWork tzWrk) throws SemanticException {
- for(BaseWork wrk : tzWrk.getAllWork() ) {
-
- if ( wrk instanceof MergeJoinWork ) {
- wrk = ((MergeJoinWork)wrk).getMainWork();
- }
-
- List<String> warnings = new MapJoinCheck(wrk.getName()).analyze(wrk);
- if ( !warnings.isEmpty() ) {
- for(String w : warnings) {
- warn(w);
- }
- }
- }
- }
-
- private void checkTezReducer(TezWork tzWrk) throws SemanticException {
- for(BaseWork wrk : tzWrk.getAllWork() ) {
-
- if ( wrk instanceof MergeJoinWork ) {
- wrk = ((MergeJoinWork)wrk).getMainWork();
- }
-
- if ( !(wrk instanceof ReduceWork ) ) {
- continue;
- }
- ReduceWork rWork = (ReduceWork) wrk;
- Operator<? extends OperatorDesc> reducer = ((ReduceWork)wrk).getReducer();
- if ( reducer instanceof JoinOperator || reducer instanceof CommonMergeJoinOperator ) {
- Map<Integer, ExtractReduceSinkInfo.Info> rsInfo = new TreeMap<Integer, ExtractReduceSinkInfo.Info>();
- for(Map.Entry<Integer, String> e : rWork.getTagToInput().entrySet()) {
- rsInfo.putAll(getReducerInfo(tzWrk, rWork.getName(), e.getValue()));
- }
- checkForCrossProduct(rWork.getName(), reducer, rsInfo);
- }
- }
- }
-
- private void checkMRReducer(String taskName, MapredWork mrWrk) throws SemanticException {
- ReduceWork rWrk = mrWrk.getReduceWork();
- if ( rWrk == null) {
- return;
- }
- Operator<? extends OperatorDesc> reducer = rWrk.getReducer();
- if ( reducer instanceof JoinOperator|| reducer instanceof CommonMergeJoinOperator ) {
- BaseWork prntWork = mrWrk.getMapWork();
- checkForCrossProduct(taskName, reducer,
- new ExtractReduceSinkInfo(null).analyze(prntWork));
- }
- }
-
- private void checkForCrossProduct(String taskName,
- Operator<? extends OperatorDesc> reducer,
- Map<Integer, ExtractReduceSinkInfo.Info> rsInfo) {
- if ( rsInfo.isEmpty() ) {
- return;
- }
- Iterator<ExtractReduceSinkInfo.Info> it = rsInfo.values().iterator();
- ExtractReduceSinkInfo.Info info = it.next();
- if (info.keyCols.size() == 0) {
- List<String> iAliases = new ArrayList<String>();
- iAliases.addAll(info.inputAliases);
- while (it.hasNext()) {
- info = it.next();
- iAliases.addAll(info.inputAliases);
- }
- String warning = String.format(
- "Shuffle Join %s[tables = %s] in Stage '%s' is a cross product",
- reducer.toString(),
- iAliases,
- taskName);
- warn(warning);
- }
- }
-
- private Map<Integer, ExtractReduceSinkInfo.Info> getReducerInfo(TezWork tzWrk, String vertex, String prntVertex)
- throws SemanticException {
- BaseWork prntWork = tzWrk.getWorkMap().get(prntVertex);
- return new ExtractReduceSinkInfo(vertex).analyze(prntWork);
- }
-
- /*
- * Given a Work descriptor and the TaskName for the work
- * this is responsible to check each MapJoinOp for cross products.
- * The analyze call returns the warnings list.
- * <p>
- * For MR the taskname is the StageName, for Tez it is the vertex name.
- */
- public static class MapJoinCheck implements NodeProcessor, NodeProcessorCtx {
-
- final List<String> warnings;
- final String taskName;
-
- MapJoinCheck(String taskName) {
- this.taskName = taskName;
- warnings = new ArrayList<String>();
- }
-
- List<String> analyze(BaseWork work) throws SemanticException {
- Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- opRules.put(new RuleRegExp("R1", MapJoinOperator.getOperatorName()
- + "%"), this);
- Dispatcher disp = new DefaultRuleDispatcher(new NoopProcessor(), opRules, this);
- GraphWalker ogw = new DefaultGraphWalker(disp);
- ArrayList<Node> topNodes = new ArrayList<Node>();
- topNodes.addAll(work.getAllRootOperators());
- ogw.startWalking(topNodes, null);
- return warnings;
- }
-
- @Override
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
- Object... nodeOutputs) throws SemanticException {
- @SuppressWarnings("unchecked")
- AbstractMapJoinOperator<? extends MapJoinDesc> mjOp = (AbstractMapJoinOperator<? extends MapJoinDesc>) nd;
- MapJoinDesc mjDesc = mjOp.getConf();
-
- String bigTablAlias = mjDesc.getBigTableAlias();
- if ( bigTablAlias == null ) {
- Operator<? extends OperatorDesc> parent = null;
- for(Operator<? extends OperatorDesc> op : mjOp.getParentOperators() ) {
- if ( op instanceof TableScanOperator ) {
- parent = op;
- }
- }
- if ( parent != null) {
- TableScanDesc tDesc = ((TableScanOperator)parent).getConf();
- bigTablAlias = tDesc.getAlias();
- }
- }
- bigTablAlias = bigTablAlias == null ? "?" : bigTablAlias;
-
- List<ExprNodeDesc> joinExprs = mjDesc.getKeys().values().iterator().next();
-
- if ( joinExprs.size() == 0 ) {
- warnings.add(
- String.format("Map Join %s[bigTable=%s] in task '%s' is a cross product",
- mjOp.toString(), bigTablAlias, taskName));
- }
-
- return null;
- }
- }
-
- /*
- * for a given Work Descriptor, it extracts information about the ReduceSinkOps
- * in the Work. For Tez, you can restrict it to ReduceSinks for a particular output
- * vertex.
- */
- public static class ExtractReduceSinkInfo implements NodeProcessor, NodeProcessorCtx {
-
- static class Info {
- List<ExprNodeDesc> keyCols;
- List<String> inputAliases;
-
- Info(List<ExprNodeDesc> keyCols, List<String> inputAliases) {
- this.keyCols = keyCols;
- this.inputAliases = inputAliases == null ? new ArrayList<String>() : inputAliases;
- }
-
- Info(List<ExprNodeDesc> keyCols, String[] inputAliases) {
- this.keyCols = keyCols;
- this.inputAliases = inputAliases == null ? new ArrayList<String>() : Arrays.asList(inputAliases);
- }
- }
-
- final String outputTaskName;
- final Map<Integer, Info> reduceSinkInfo;
-
- ExtractReduceSinkInfo(String parentTaskName) {
- this.outputTaskName = parentTaskName;
- reduceSinkInfo = new HashMap<Integer, Info>();
- }
-
- Map<Integer, Info> analyze(BaseWork work) throws SemanticException {
- Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- opRules.put(new RuleRegExp("R1", ReduceSinkOperator.getOperatorName()
- + "%"), this);
- Dispatcher disp = new DefaultRuleDispatcher(new NoopProcessor(), opRules, this);
- GraphWalker ogw = new DefaultGraphWalker(disp);
- ArrayList<Node> topNodes = new ArrayList<Node>();
- topNodes.addAll(work.getAllRootOperators());
- ogw.startWalking(topNodes, null);
- return reduceSinkInfo;
- }
-
- @Override
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
- Object... nodeOutputs) throws SemanticException {
- ReduceSinkOperator rsOp = (ReduceSinkOperator) nd;
- ReduceSinkDesc rsDesc = rsOp.getConf();
-
- if ( outputTaskName != null ) {
- String rOutputName = rsDesc.getOutputName();
- if ( rOutputName == null || !outputTaskName.equals(rOutputName)) {
- return null;
- }
- }
-
- reduceSinkInfo.put(rsDesc.getTag(),
- new Info(rsDesc.getKeyCols(), rsOp.getInputAliases()));
-
- return null;
- }
- }
-
- static class NoopProcessor implements NodeProcessor {
- @Override
- public final Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
- Object... nodeOutputs) throws SemanticException {
- return nd;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductHandler.java
new file mode 100644
index 0000000..1442378
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductHandler.java
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.physical;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.plan.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.exec.tez.TezTask;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.lib.TaskGraphWalker;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+/*
+ * Check each MapJoin and ShuffleJoin Operator to see they are performing a cross product.
+ * If yes, output a warning to the Session's console.
+ * The Checks made are the following:
+ * 1. MR, Shuffle Join:
+ * Check the parent ReduceSinkOp of the JoinOp. If its keys list is size = 0, then
+ * this is a cross product.
+ * The parent ReduceSinkOp is in the MapWork for the same Stage.
+ * 2. MR, MapJoin:
+ * If the keys expr list on the mapJoin Desc is an empty list for any input,
+ * this implies a cross product.
+ * 3. Tez, Shuffle Join:
+ * Check the parent ReduceSinkOp of the JoinOp. If its keys list is size = 0, then
+ * this is a cross product.
+ * The parent ReduceSinkOp checked is based on the ReduceWork.tagToInput map on the
+ * reduceWork that contains the JoinOp.
+ * 4. Tez, Map Join:
+ * If the keys expr list on the mapJoin Desc is an empty list for any input,
+ * this implies a cross product.
+ */
+public class CrossProductHandler implements PhysicalPlanResolver, Dispatcher {
+
+ protected static transient final Logger LOG = LoggerFactory
+ .getLogger(CrossProductHandler.class);
+ private Boolean cartesianProductEdgeEnabled = null;
+
+ @Override
+ public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException {
+ cartesianProductEdgeEnabled =
+ HiveConf.getBoolVar(pctx.getConf(), HiveConf.ConfVars.TEZ_CARTESIAN_PRODUCT_EDGE_ENABLED);
+ TaskGraphWalker ogw = new TaskGraphWalker(this);
+
+ ArrayList<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(pctx.getRootTasks());
+
+ ogw.startWalking(topNodes, null);
+ return pctx;
+ }
+
+ @Override
+ public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
+ throws SemanticException {
+ @SuppressWarnings("unchecked")
+ Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd;
+ if (currTask instanceof MapRedTask) {
+ MapRedTask mrTsk = (MapRedTask)currTask;
+ MapredWork mrWrk = mrTsk.getWork();
+ checkMapJoins(mrTsk);
+ checkMRReducer(currTask.toString(), mrWrk);
+ } else if (currTask instanceof ConditionalTask ) {
+ List<Task<? extends Serializable>> taskListInConditionalTask =
+ ((ConditionalTask) currTask).getListTasks();
+ for(Task<? extends Serializable> tsk: taskListInConditionalTask){
+ dispatch(tsk, stack, nodeOutputs);
+ }
+
+ } else if (currTask instanceof TezTask) {
+ TezTask tezTask = (TezTask) currTask;
+ TezWork tezWork = tezTask.getWork();
+ checkMapJoins(tezWork);
+ checkTezReducer(tezWork);
+ }
+ return null;
+ }
+
+ private void warn(String msg) {
+ SessionState.getConsole().printInfo("Warning: " + msg, false);
+ }
+
+ private void checkMapJoins(MapRedTask mrTsk) throws SemanticException {
+ MapredWork mrWrk = mrTsk.getWork();
+ MapWork mapWork = mrWrk.getMapWork();
+ List<String> warnings = new MapJoinCheck(mrTsk.toString()).analyze(mapWork);
+ if (!warnings.isEmpty()) {
+ for (String w : warnings) {
+ warn(w);
+ }
+ }
+ ReduceWork redWork = mrWrk.getReduceWork();
+ if (redWork != null) {
+ warnings = new MapJoinCheck(mrTsk.toString()).analyze(redWork);
+ if (!warnings.isEmpty()) {
+ for (String w : warnings) {
+ warn(w);
+ }
+ }
+ }
+ }
+
+ private void checkMapJoins(TezWork tezWork) throws SemanticException {
+ for(BaseWork wrk : tezWork.getAllWork() ) {
+
+ if ( wrk instanceof MergeJoinWork ) {
+ wrk = ((MergeJoinWork)wrk).getMainWork();
+ }
+
+ List<String> warnings = new MapJoinCheck(wrk.getName()).analyze(wrk);
+ if ( !warnings.isEmpty() ) {
+ for(String w : warnings) {
+ warn(w);
+ }
+ }
+ }
+ }
+
+ private void checkTezReducer(TezWork tezWork) throws SemanticException {
+ for(BaseWork wrk : tezWork.getAllWork() ) {
+ BaseWork origWrk = null;
+
+ if ( wrk instanceof MergeJoinWork ) {
+ origWrk = wrk;
+ wrk = ((MergeJoinWork)wrk).getMainWork();
+ }
+
+ if ( !(wrk instanceof ReduceWork ) ) {
+ continue;
+ }
+ ReduceWork rWork = (ReduceWork) wrk;
+ Operator<? extends OperatorDesc> reducer = ((ReduceWork)wrk).getReducer();
+ if ( reducer instanceof JoinOperator || reducer instanceof CommonMergeJoinOperator ) {
+ boolean noOuterJoin = ((JoinDesc)reducer.getConf()).isNoOuterJoin();
+ Map<Integer, ExtractReduceSinkInfo.Info> rsInfo = new TreeMap<Integer, ExtractReduceSinkInfo.Info>();
+ for(Map.Entry<Integer, String> e : rWork.getTagToInput().entrySet()) {
+ rsInfo.putAll(getReducerInfo(tezWork, rWork.getName(), e.getValue()));
+ }
+ if (checkForCrossProduct(rWork.getName(), reducer, rsInfo)
+ && cartesianProductEdgeEnabled && noOuterJoin) {
+ List<BaseWork> parents = tezWork.getParents(null == origWrk ? wrk : origWrk);
+ for (BaseWork p: parents) {
+ TezEdgeProperty prop = tezWork.getEdgeProperty(p, null == origWrk ? wrk : origWrk);
+ LOG.info("Edge Type: "+prop.getEdgeType());
+ if (prop.getEdgeType().equals(EdgeType.CUSTOM_SIMPLE_EDGE)
+ || prop.getEdgeType().equals(EdgeType.CUSTOM_EDGE)) {
+ prop.setEdgeType(EdgeType.XPROD_EDGE);
+ rWork.setNumReduceTasks(-1);
+ rWork.setMaxReduceTasks(-1);
+ rWork.setMinReduceTasks(-1);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void checkMRReducer(String taskName, MapredWork mrWrk) throws SemanticException {
+ ReduceWork rWrk = mrWrk.getReduceWork();
+ if ( rWrk == null) {
+ return;
+ }
+ Operator<? extends OperatorDesc> reducer = rWrk.getReducer();
+ if ( reducer instanceof JoinOperator|| reducer instanceof CommonMergeJoinOperator ) {
+ BaseWork parentWork = mrWrk.getMapWork();
+ checkForCrossProduct(taskName, reducer,
+ new ExtractReduceSinkInfo(null).analyze(parentWork));
+ }
+ }
+
+ private boolean checkForCrossProduct(String taskName,
+ Operator<? extends OperatorDesc> reducer,
+ Map<Integer, ExtractReduceSinkInfo.Info> rsInfo) {
+ if ( rsInfo.isEmpty() ) {
+ return false;
+ }
+ Iterator<ExtractReduceSinkInfo.Info> it = rsInfo.values().iterator();
+ ExtractReduceSinkInfo.Info info = it.next();
+ if (info.keyCols.size() == 0) {
+ List<String> iAliases = new ArrayList<String>();
+ iAliases.addAll(info.inputAliases);
+ while (it.hasNext()) {
+ info = it.next();
+ iAliases.addAll(info.inputAliases);
+ }
+ String warning = String.format(
+ "Shuffle Join %s[tables = %s] in Stage '%s' is a cross product",
+ reducer.toString(),
+ iAliases,
+ taskName);
+ warn(warning);
+ return true;
+ }
+ return false;
+ }
+
+ private Map<Integer, ExtractReduceSinkInfo.Info> getReducerInfo(TezWork tezWork, String vertex, String prntVertex)
+ throws SemanticException {
+ BaseWork parentWork = tezWork.getWorkMap().get(prntVertex);
+ return new ExtractReduceSinkInfo(vertex).analyze(parentWork);
+ }
+
+ /*
+ * Given a Work descriptor and the TaskName for the work
+ * this is responsible to check each MapJoinOp for cross products.
+ * The analyze call returns the warnings list.
+ * <p>
+ * For MR the taskname is the StageName, for Tez it is the vertex name.
+ */
+ public static class MapJoinCheck implements NodeProcessor, NodeProcessorCtx {
+
+ final List<String> warnings;
+ final String taskName;
+
+ MapJoinCheck(String taskName) {
+ this.taskName = taskName;
+ warnings = new ArrayList<String>();
+ }
+
+ List<String> analyze(BaseWork work) throws SemanticException {
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ opRules.put(new RuleRegExp("R1", MapJoinOperator.getOperatorName()
+ + "%"), this);
+ Dispatcher disp = new DefaultRuleDispatcher(new NoopProcessor(), opRules, this);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+ ArrayList<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(work.getAllRootOperators());
+ ogw.startWalking(topNodes, null);
+ return warnings;
+ }
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ @SuppressWarnings("unchecked")
+ AbstractMapJoinOperator<? extends MapJoinDesc> mjOp = (AbstractMapJoinOperator<? extends MapJoinDesc>) nd;
+ MapJoinDesc mjDesc = mjOp.getConf();
+
+ String bigTablAlias = mjDesc.getBigTableAlias();
+ if ( bigTablAlias == null ) {
+ Operator<? extends OperatorDesc> parent = null;
+ for(Operator<? extends OperatorDesc> op : mjOp.getParentOperators() ) {
+ if ( op instanceof TableScanOperator ) {
+ parent = op;
+ }
+ }
+ if ( parent != null) {
+ TableScanDesc tDesc = ((TableScanOperator)parent).getConf();
+ bigTablAlias = tDesc.getAlias();
+ }
+ }
+ bigTablAlias = bigTablAlias == null ? "?" : bigTablAlias;
+
+ List<ExprNodeDesc> joinExprs = mjDesc.getKeys().values().iterator().next();
+
+ if ( joinExprs.size() == 0 ) {
+ warnings.add(
+ String.format("Map Join %s[bigTable=%s] in task '%s' is a cross product",
+ mjOp.toString(), bigTablAlias, taskName));
+ }
+
+ return null;
+ }
+ }
+
+ /*
+ * for a given Work Descriptor, it extracts information about the ReduceSinkOps
+ * in the Work. For Tez, you can restrict it to ReduceSinks for a particular output
+ * vertex.
+ */
+ public static class ExtractReduceSinkInfo implements NodeProcessor, NodeProcessorCtx {
+
+ static class Info {
+ List<ExprNodeDesc> keyCols;
+ List<String> inputAliases;
+
+ Info(List<ExprNodeDesc> keyCols, List<String> inputAliases) {
+ this.keyCols = keyCols;
+ this.inputAliases = inputAliases == null ? new ArrayList<String>() : inputAliases;
+ }
+
+ Info(List<ExprNodeDesc> keyCols, String[] inputAliases) {
+ this.keyCols = keyCols;
+ this.inputAliases = inputAliases == null ? new ArrayList<String>() : Arrays.asList(inputAliases);
+ }
+ }
+
+ final String outputTaskName;
+ final Map<Integer, Info> reduceSinkInfo;
+
+ ExtractReduceSinkInfo(String parentTaskName) {
+ this.outputTaskName = parentTaskName;
+ reduceSinkInfo = new HashMap<Integer, Info>();
+ }
+
+ Map<Integer, Info> analyze(BaseWork work) throws SemanticException {
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ opRules.put(new RuleRegExp("R1", ReduceSinkOperator.getOperatorName()
+ + "%"), this);
+ Dispatcher disp = new DefaultRuleDispatcher(new NoopProcessor(), opRules, this);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+ ArrayList<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(work.getAllRootOperators());
+ ogw.startWalking(topNodes, null);
+ return reduceSinkInfo;
+ }
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ ReduceSinkOperator rsOp = (ReduceSinkOperator) nd;
+ ReduceSinkDesc rsDesc = rsOp.getConf();
+
+ if ( outputTaskName != null ) {
+ String rOutputName = rsDesc.getOutputName();
+ if ( rOutputName == null || !outputTaskName.equals(rOutputName)) {
+ return null;
+ }
+ }
+
+ reduceSinkInfo.put(rsDesc.getTag(),
+ new Info(rsDesc.getKeyCols(), rsOp.getInputAliases()));
+
+ return null;
+ }
+ }
+
+ static class NoopProcessor implements NodeProcessor {
+ @Override
+ public final Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ return nd;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
index 9377563..c040406 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
@@ -82,7 +82,7 @@ public class PhysicalOptimizer {
}
if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_CHECK_CROSS_PRODUCT)) {
- resolvers.add(new CrossProductCheck());
+ resolvers.add(new CrossProductHandler());
}
// Vectorization should be the last optimization, because it doesn't modify the plan
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java
index 7f3b1b3..9f14c66 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java
@@ -92,9 +92,9 @@ public class SparkCrossProductCheck implements PhysicalPlanResolver, Dispatcher
for (ReduceWork reduceWork : sparkWork.getAllReduceWork()) {
Operator<? extends OperatorDesc> reducer = reduceWork.getReducer();
if (reducer instanceof JoinOperator || reducer instanceof CommonMergeJoinOperator) {
- Map<Integer, CrossProductCheck.ExtractReduceSinkInfo.Info> rsInfo = new TreeMap<Integer, CrossProductCheck.ExtractReduceSinkInfo.Info>();
+ Map<Integer, CrossProductHandler.ExtractReduceSinkInfo.Info> rsInfo = new TreeMap<Integer, CrossProductHandler.ExtractReduceSinkInfo.Info>();
for (BaseWork parent : sparkWork.getParents(reduceWork)) {
- rsInfo.putAll(new CrossProductCheck.ExtractReduceSinkInfo(null).analyze(parent));
+ rsInfo.putAll(new CrossProductHandler.ExtractReduceSinkInfo(null).analyze(parent));
}
checkForCrossProduct(reduceWork.getName(), reducer, rsInfo);
}
@@ -105,7 +105,7 @@ public class SparkCrossProductCheck implements PhysicalPlanResolver, Dispatcher
SparkWork sparkWork = sparkTask.getWork();
for (BaseWork baseWork : sparkWork.getAllWork()) {
List<String> warnings =
- new CrossProductCheck.MapJoinCheck(sparkTask.toString()).analyze(baseWork);
+ new CrossProductHandler.MapJoinCheck(sparkTask.toString()).analyze(baseWork);
for (String w : warnings) {
warn(w);
}
@@ -114,12 +114,12 @@ public class SparkCrossProductCheck implements PhysicalPlanResolver, Dispatcher
private void checkForCrossProduct(String workName,
Operator<? extends OperatorDesc> reducer,
- Map<Integer, CrossProductCheck.ExtractReduceSinkInfo.Info> rsInfo) {
+ Map<Integer, CrossProductHandler.ExtractReduceSinkInfo.Info> rsInfo) {
if (rsInfo.isEmpty()) {
return;
}
- Iterator<CrossProductCheck.ExtractReduceSinkInfo.Info> it = rsInfo.values().iterator();
- CrossProductCheck.ExtractReduceSinkInfo.Info info = it.next();
+ Iterator<CrossProductHandler.ExtractReduceSinkInfo.Info> it = rsInfo.values().iterator();
+ CrossProductHandler.ExtractReduceSinkInfo.Info info = it.next();
if (info.keyCols.size() == 0) {
List<String> iAliases = new ArrayList<String>();
iAliases.addAll(info.inputAliases);
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index 15836ec..da30c3b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -80,7 +80,7 @@ import org.apache.hadoop.hive.ql.optimizer.SharedWorkOptimizer;
import org.apache.hadoop.hive.ql.optimizer.correlation.ReduceSinkJoinDeDuplication;
import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits;
import org.apache.hadoop.hive.ql.optimizer.physical.AnnotateRunTimeStatsOptimizer;
-import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck;
+import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductHandler;
import org.apache.hadoop.hive.ql.optimizer.physical.LlapClusterStateForCompile;
import org.apache.hadoop.hive.ql.optimizer.physical.LlapDecider;
import org.apache.hadoop.hive.ql.optimizer.physical.LlapPreVectorizationPass;
@@ -658,7 +658,7 @@ public class TezCompiler extends TaskCompiler {
}
if (conf.getBoolVar(HiveConf.ConfVars.HIVE_CHECK_CROSS_PRODUCT)) {
- physicalCtx = new CrossProductCheck().resolve(physicalCtx);
+ physicalCtx = new CrossProductHandler().resolve(physicalCtx);
} else {
LOG.debug("Skipping cross product analysis");
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java
index bbed9be..d43b81a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java
@@ -28,7 +28,8 @@ public class TezEdgeProperty {
CONTAINS,//used for union (all?)
CUSTOM_EDGE,//CO_PARTITION_EDGE
CUSTOM_SIMPLE_EDGE,//PARTITION_EDGE
- ONE_TO_ONE_EDGE
+ ONE_TO_ONE_EDGE,
+ XPROD_EDGE
}
private HiveConf hiveConf;
@@ -107,4 +108,5 @@ public class TezEdgeProperty {
public void setEdgeType(EdgeType type) {
this.edgeType = type;
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
index 2dc334d..47aa936 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
@@ -109,8 +109,8 @@ public class TestTezTask {
});
when(utils.createEdge(any(JobConf.class), any(Vertex.class), any(Vertex.class),
- any(TezEdgeProperty.class), any(VertexType.class))).thenAnswer(new Answer<Edge>() {
-
+ any(TezEdgeProperty.class), any(BaseWork.class), any(TezWork.class)))
+ .thenAnswer(new Answer<Edge>() {
@Override
public Edge answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/queries/clientpositive/cross_prod_1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/cross_prod_1.q b/ql/src/test/queries/clientpositive/cross_prod_1.q
new file mode 100644
index 0000000..b5a84ea
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/cross_prod_1.q
@@ -0,0 +1,34 @@
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.tez.cartesian-product.enabled=true;
+
+create table X as
+select distinct * from src order by key limit 10;
+
+explain select * from X as A, X as B order by A.key, B.key;
+select * from X as A, X as B order by A.key, B.key;
+
+explain select * from X as A join X as B on A.key<B.key;
+select * from X as A join X as B on A.key<B.key order by A.key, B.key;
+
+explain select * from X as A join X as B on A.key between "103" and "105";
+select * from X as A join X as B on A.key between "103" and "105" order by A.key, B.key;
+
+explain select * from X as A, X as B, X as C;
+select * from X as A, X as B, X as C order by A.key, B.key, C.key;
+
+explain select * from X as A join X as B on A.key in ("103", "104", "105");
+select * from X as A join X as B on A.key in ("103", "104", "105") order by A.key, B.key;
+
+explain select A.key, count(*) from X as A, X as B group by A.key;
+select A.key, count(*) from X as A, X as B group by A.key order by A.key;
+
+explain select * from X as A left outer join X as B on (A.key = B.key or A.value between "val_103" and "val_105");
+explain select * from X as A right outer join X as B on (A.key = B.key or A.value between "val_103" and "val_105");
+explain select * from X as A full outer join X as B on (A.key = B.key or A.value between "val_103" and "val_105");
+
+explain select * from (select X.key, count(*) from X group by X.key) as A, (select X.key, count(*) from X group by X.key) as B;
+select * from (select X.key, count(*) from X group by X.key) as A, (select X.key, count(*) from X group by X.key) as B order by A.key, B.key;
+
+explain select * from (select * from X union all select * from X as y) a join X;
+select * from (select * from X union all select * from X as y) a join X order by a.key, X.key;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/queries/clientpositive/cross_prod_3.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/cross_prod_3.q b/ql/src/test/queries/clientpositive/cross_prod_3.q
new file mode 100644
index 0000000..a233f17
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/cross_prod_3.q
@@ -0,0 +1,13 @@
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.tez.cartesian-product.enabled=true;
+set hive.auto.convert.join=true;
+set hive.convert.join.bucket.mapjoin.tez=true;
+
+create table X (key string, value string) clustered by (key) into 2 buckets;
+insert overwrite table X select distinct * from src order by key limit 10;
+
+create table Y as
+select * from src order by key limit 1;
+
+explain select * from Y, (select * from X as A join X as B on A.key=B.key) as C where Y.key=C.key;
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/queries/clientpositive/cross_prod_4.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/cross_prod_4.q b/ql/src/test/queries/clientpositive/cross_prod_4.q
new file mode 100644
index 0000000..ea58e98
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/cross_prod_4.q
@@ -0,0 +1,10 @@
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.auto.convert.join=true;
+set hive.tez.cartesian-product.enabled=true;
+
+create table X as
+select distinct * from src order by key limit 10;
+
+explain select * from X as A, X as B;
+select * from X as A, X as B order by A.key, B.key;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q b/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q
index b1e289f..b8b04ee 100644
--- a/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q
+++ b/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q
@@ -85,7 +85,7 @@ SELECT agg.amount
FROM agg_01 agg,
dim_shops d1
WHERE agg.dim_shops_id = d1.id
-and agg.dim_shops_id = 1;
+and agg.dim_shops_id = 1 order by agg.amount;
set hive.tez.dynamic.partition.pruning.max.event.size=1;
set hive.tez.dynamic.partition.pruning.max.data.size=1000000;
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/queries/clientpositive/hybridgrace_hashjoin_1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/hybridgrace_hashjoin_1.q b/ql/src/test/queries/clientpositive/hybridgrace_hashjoin_1.q
index 9c19a86..e404dd0 100644
--- a/ql/src/test/queries/clientpositive/hybridgrace_hashjoin_1.q
+++ b/ql/src/test/queries/clientpositive/hybridgrace_hashjoin_1.q
@@ -1,5 +1,6 @@
set hive.mapred.mode=nonstrict;
set hive.explain.user=false;
+set tez.cartesian-product.max-parallelism=1;
-- Hybrid Grace Hash Join
-- Test basic functionalities:
-- 1. Various cases when hash partitions spill
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/queries/clientpositive/subquery_multi.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/subquery_multi.q b/ql/src/test/queries/clientpositive/subquery_multi.q
index c546d24..6ef198d 100644
--- a/ql/src/test/queries/clientpositive/subquery_multi.q
+++ b/ql/src/test/queries/clientpositive/subquery_multi.q
@@ -36,7 +36,7 @@ select * from part_null where p_name IN (select p_name from part_null) AND p_bra
-- NOT IN is always true and IN is false for where p_name is NULL, hence should return all but one row
explain select * from part_null where p_name IN (select p_name from part_null) AND p_brand NOT IN (select p_type from part_null);
-select * from part_null where p_name IN (select p_name from part_null) AND p_brand NOT IN (select p_type from part_null);
+select * from part_null where p_name IN (select p_name from part_null) AND p_brand NOT IN (select p_type from part_null) order by part_null.p_partkey;
-- NOT IN has one NULL value so this whole query should not return any row
explain select * from part_null where p_brand IN (select p_brand from part_null) AND p_brand NOT IN (select p_name from part_null);
@@ -49,7 +49,7 @@ select * from part_null where p_name NOT IN (select c from tempty) AND p_brand I
-- IN, EXISTS
explain select * from part_null where p_name IN (select p_name from part_null) AND EXISTS (select c from tnull);
-select * from part_null where p_name IN (select p_name from part_null) AND EXISTS (select c from tnull);
+select * from part_null where p_name IN (select p_name from part_null) AND EXISTS (select c from tnull) order by part_null.p_partkey;
explain select * from part_null where p_size IN (select p_size from part_null) AND EXISTS (select c from tempty);
select * from part_null where p_size IN (select p_size from part_null) AND EXISTS (select c from tempty);
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/queries/clientpositive/subquery_notin.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/subquery_notin.q b/ql/src/test/queries/clientpositive/subquery_notin.q
index e23eb2b..c509654 100644
--- a/ql/src/test/queries/clientpositive/subquery_notin.q
+++ b/ql/src/test/queries/clientpositive/subquery_notin.q
@@ -109,8 +109,8 @@ explain select * from part where p_brand <> 'Brand#14' AND p_size NOT IN (select
select * from part where p_brand <> 'Brand#14' AND p_size NOT IN (select (p_size*p_size) from part p where p.p_type = part.p_type ) AND p_size <> 340;
--lhs contains non-simple expression
-explain select * from part where (p_size-1) NOT IN (select min(p_size) from part group by p_type);
-select * from part where (p_size-1) NOT IN (select min(p_size) from part group by p_type);
+explain select * from part where (p_size-1) NOT IN (select min(p_size) from part group by p_type) order by p_partkey;
+select * from part where (p_size-1) NOT IN (select min(p_size) from part group by p_type) order by p_partkey;
explain select * from part where (p_partkey*p_size) NOT IN (select min(p_partkey) from part group by p_type);
select * from part where (p_partkey*p_size) NOT IN (select min(p_partkey) from part group by p_type);
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/queries/clientpositive/subquery_select.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/subquery_select.q b/ql/src/test/queries/clientpositive/subquery_select.q
index 15377a4..c1766ff 100644
--- a/ql/src/test/queries/clientpositive/subquery_select.q
+++ b/ql/src/test/queries/clientpositive/subquery_select.q
@@ -155,8 +155,8 @@ SELECT p_size, (SELECT count(p_size) FROM part p
WHERE p.p_type = part.p_type) IS NULL from part;
-- scalar, non-corr, non agg
-explain select p_type, (select p_size from part order by p_size limit 1) = 1 from part;
-select p_type, (select p_size from part order by p_size limit 1) = 1 from part;
+explain select p_type, (select p_size from part order by p_size limit 1) = 1 from part order by p_type;
+select p_type, (select p_size from part order by p_size limit 1) = 1 from part order by p_type;
-- in corr, multiple
EXPLAIN SELECT p_size, p_size IN (
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/results/clientpositive/llap/auto_join0.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/auto_join0.q.out b/ql/src/test/results/clientpositive/llap/auto_join0.q.out
index 7f0a878..29945ad 100644
--- a/ql/src/test/results/clientpositive/llap/auto_join0.q.out
+++ b/ql/src/test/results/clientpositive/llap/auto_join0.q.out
@@ -1,4 +1,4 @@
-Warning: Map Join MAPJOIN[22][bigTable=?] in task 'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[22][tables = [src1, src2]] in Stage 'Reducer 3' is a cross product
PREHOOK: query: explain
select sum(hash(a.k1,a.v1,a.k2, a.v2))
from (
@@ -30,9 +30,10 @@ STAGE PLANS:
Tez
#### A masked pattern was here ####
Edges:
- Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (BROADCAST_EDGE)
- Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
- Reducer 4 <- Map 1 (SIMPLE_EDGE)
+ Reducer 2 <- Map 1 (SIMPLE_EDGE)
+ Reducer 3 <- Reducer 2 (XPROD_EDGE), Reducer 5 (XPROD_EDGE)
+ Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
+ Reducer 5 <- Map 1 (SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -64,28 +65,33 @@ STAGE PLANS:
expressions: KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1 (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 166 Data size: 29548 Basic stats: COMPLETE Column stats: COMPLETE
- Map Join Operator
- condition map:
- Inner Join 0 to 1
- keys:
- 0
- 1
- outputColumnNames: _col0, _col1, _col2, _col3
- input vertices:
- 1 Reducer 4
- Statistics: Num rows: 27556 Data size: 9809936 Basic stats: COMPLETE Column stats: COMPLETE
- Group By Operator
- aggregations: sum(hash(_col0,_col1,_col2,_col3))
- mode: hash
- outputColumnNames: _col0
- Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- sort order:
- Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
- value expressions: _col0 (type: bigint)
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 166 Data size: 29548 Basic stats: COMPLETE Column stats: COMPLETE
+ value expressions: _col0 (type: string), _col1 (type: string)
Reducer 3
Execution mode: llap
Reduce Operator Tree:
+ Merge Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0
+ 1
+ outputColumnNames: _col0, _col1, _col2, _col3
+ Statistics: Num rows: 27556 Data size: 9809936 Basic stats: COMPLETE Column stats: COMPLETE
+ Group By Operator
+ aggregations: sum(hash(_col0,_col1,_col2,_col3))
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
+ value expressions: _col0 (type: bigint)
+ Reducer 4
+ Execution mode: llap
+ Reduce Operator Tree:
Group By Operator
aggregations: sum(VALUE._col0)
mode: mergepartial
@@ -98,7 +104,7 @@ STAGE PLANS:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- Reducer 4
+ Reducer 5
Execution mode: llap
Reduce Operator Tree:
Select Operator
@@ -116,7 +122,7 @@ STAGE PLANS:
Processor Tree:
ListSink
-Warning: Map Join MAPJOIN[22][bigTable=?] in task 'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[22][tables = [src1, src2]] in Stage 'Reducer 3' is a cross product
PREHOOK: query: select sum(hash(a.k1,a.v1,a.k2, a.v2))
from (
SELECT src1.key as k1, src1.value as v1,
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/results/clientpositive/llap/auto_join_filters.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/auto_join_filters.q.out b/ql/src/test/results/clientpositive/llap/auto_join_filters.q.out
index d1d9408..079f047 100644
--- a/ql/src/test/results/clientpositive/llap/auto_join_filters.q.out
+++ b/ql/src/test/results/clientpositive/llap/auto_join_filters.q.out
@@ -14,7 +14,7 @@ POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/in3.txt' INTO TABLE my
POSTHOOK: type: LOAD
#### A masked pattern was here ####
POSTHOOK: Output: default@myinput1
-Warning: Map Join MAPJOIN[18][bigTable=?] in task 'Map 1' is a cross product
+Warning: Shuffle Join MERGEJOIN[18][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product
PREHOOK: query: SELECT sum(hash(a.key,a.value,b.key,b.value)) FROM myinput1 a JOIN myinput1 b on a.key > 40 AND a.value > 50 AND a.key = a.value AND b.key > 40 AND b.value > 50 AND b.key = b.value
PREHOOK: type: QUERY
PREHOOK: Input: default@myinput1
@@ -300,7 +300,7 @@ POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/in2.txt' into table sm
POSTHOOK: type: LOAD
#### A masked pattern was here ####
POSTHOOK: Output: default@smb_input2
-Warning: Map Join MAPJOIN[18][bigTable=?] in task 'Map 1' is a cross product
+Warning: Shuffle Join MERGEJOIN[18][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product
PREHOOK: query: SELECT sum(hash(a.key,a.value,b.key,b.value)) FROM myinput1 a JOIN myinput1 b on a.key > 40 AND a.value > 50 AND a.key = a.value AND b.key > 40 AND b.value > 50 AND b.key = b.value
PREHOOK: type: QUERY
PREHOOK: Input: default@myinput1
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/results/clientpositive/llap/auto_join_nulls.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/auto_join_nulls.q.out b/ql/src/test/results/clientpositive/llap/auto_join_nulls.q.out
index 5984e8f..04da1f2 100644
--- a/ql/src/test/results/clientpositive/llap/auto_join_nulls.q.out
+++ b/ql/src/test/results/clientpositive/llap/auto_join_nulls.q.out
@@ -14,7 +14,7 @@ POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/in1.txt' INTO TABLE my
POSTHOOK: type: LOAD
#### A masked pattern was here ####
POSTHOOK: Output: default@myinput1
-Warning: Map Join MAPJOIN[14][bigTable=?] in task 'Map 1' is a cross product
+Warning: Shuffle Join MERGEJOIN[14][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product
PREHOOK: query: SELECT sum(hash(a.key,a.value,b.key,b.value)) FROM myinput1 a JOIN myinput1 b
PREHOOK: type: QUERY
PREHOOK: Input: default@myinput1
http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_12.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_12.q.out b/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_12.q.out
index 6ef1f34..3acbb20 100644
--- a/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_12.q.out
+++ b/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_12.q.out
@@ -134,7 +134,7 @@ POSTHOOK: query: load data local inpath '../../data/files/smallsrcsortbucket3out
POSTHOOK: type: LOAD
#### A masked pattern was here ####
POSTHOOK: Output: default@bucket_medium@ds=2008-04-08
-Warning: Map Join MAPJOIN[34][bigTable=?] in task 'Map 3' is a cross product
+Warning: Shuffle Join MERGEJOIN[34][tables = [$hdt$_1, $hdt$_2, $hdt$_0, $hdt$_3]] in Stage 'Reducer 4' is a cross product
PREHOOK: query: explain extended select count(*) FROM bucket_small a JOIN bucket_medium b ON a.key = b.key JOIN bucket_big c ON c.key = b.key JOIN bucket_medium d ON c.key = b.key
PREHOOK: type: QUERY
POSTHOOK: query: explain extended select count(*) FROM bucket_small a JOIN bucket_medium b ON a.key = b.key JOIN bucket_big c ON c.key = b.key JOIN bucket_medium d ON c.key = b.key
@@ -148,8 +148,9 @@ STAGE PLANS:
Tez
#### A masked pattern was here ####
Edges:
- Map 3 <- Map 1 (BROADCAST_EDGE), Map 2 (BROADCAST_EDGE), Map 5 (BROADCAST_EDGE)
- Reducer 4 <- Map 3 (CUSTOM_SIMPLE_EDGE)
+ Map 3 <- Map 1 (BROADCAST_EDGE), Map 2 (BROADCAST_EDGE)
+ Reducer 4 <- Map 3 (XPROD_EDGE), Map 6 (XPROD_EDGE)
+ Reducer 5 <- Reducer 4 (CUSTOM_SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -336,29 +337,12 @@ STAGE PLANS:
1 Map 2
Position of Big Table: 2
Statistics: Num rows: 244 Data size: 43381 Basic stats: COMPLETE Column stats: NONE
- Map Join Operator
- condition map:
- Inner Join 0 to 1
- Estimated key counts: Map 5 => 1
- keys:
- 0
- 1
- input vertices:
- 1 Map 5
- Position of Big Table: 0
- Statistics: Num rows: 244 Data size: 45577 Basic stats: COMPLETE Column stats: NONE
- Group By Operator
- aggregations: count()
- mode: hash
- outputColumnNames: _col0
- Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
- Reduce Output Operator
- null sort order:
- sort order:
- Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
- tag: -1
- value expressions: _col0 (type: bigint)
- auto parallelism: false
+ Reduce Output Operator
+ null sort order:
+ sort order:
+ Statistics: Num rows: 244 Data size: 43381 Basic stats: COMPLETE Column stats: NONE
+ tag: 0
+ auto parallelism: false
Execution mode: llap
LLAP IO: no inputs
Path -> Alias:
@@ -465,7 +449,7 @@ STAGE PLANS:
Truncated Path -> Alias:
/bucket_big/ds=2008-04-08 [c]
/bucket_big/ds=2008-04-09 [c]
- Map 5
+ Map 6
Map Operator Tree:
TableScan
alias: d
@@ -539,6 +523,30 @@ STAGE PLANS:
Execution mode: llap
Needs Tagging: false
Reduce Operator Tree:
+ Merge Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0
+ 1
+ Position of Big Table: 0
+ Statistics: Num rows: 244 Data size: 45577 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: count()
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ null sort order:
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ tag: -1
+ value expressions: _col0 (type: bigint)
+ auto parallelism: false
+ Reducer 5
+ Execution mode: llap
+ Needs Tagging: false
+ Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
mode: mergepartial
@@ -573,7 +581,7 @@ STAGE PLANS:
Processor Tree:
ListSink
-Warning: Map Join MAPJOIN[34][bigTable=?] in task 'Map 3' is a cross product
+Warning: Shuffle Join MERGEJOIN[34][tables = [$hdt$_1, $hdt$_2, $hdt$_0, $hdt$_3]] in Stage 'Reducer 4' is a cross product
PREHOOK: query: select count(*) FROM bucket_small a JOIN bucket_medium b ON a.key = b.key JOIN bucket_big c ON c.key = b.key JOIN bucket_medium d ON c.key = b.key
PREHOOK: type: QUERY
PREHOOK: Input: default@bucket_big