You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jp...@apache.org on 2015/01/19 20:42:22 UTC
svn commit: r1653088 [1/2] - in
/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql: exec/ optimizer/
optimizer/correlation/ optimizer/physical/ optimizer/spark/ parse/ plan/
Author: jpullokk
Date: Mon Jan 19 19:42:21 2015
New Revision: 1653088
URL: http://svn.apache.org/r1653088
Log:
HIVE-9200: Inline Join, Properties (Jesus Camacho Rodriguez via Laljo John Pullokkaran)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkMapJoinProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSMBJoinHintOptimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinOptimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1653088&r1=1653087&r2=1653088&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Mon Jan 19 19:42:21 2015
@@ -1067,7 +1067,6 @@ public final class Utilities {
removeField(kryo, Operator.class, "colExprMap");
removeField(kryo, ColumnInfo.class, "objectInspector");
removeField(kryo, MapWork.class, "opParseCtxMap");
- removeField(kryo, MapWork.class, "joinTree");
return kryo;
};
};
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java?rev=1653088&r1=1653087&r2=1653088&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java Mon Jan 19 19:42:21 2015
@@ -48,7 +48,6 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.QB;
-import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.TableAccessAnalyzer;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -133,23 +132,21 @@ abstract public class AbstractBucketJoin
protected boolean canConvertMapJoinToBucketMapJoin(
MapJoinOperator mapJoinOp,
- ParseContext pGraphContext,
BucketJoinProcCtx context) throws SemanticException {
- QBJoinTree joinCtx = pGraphContext.getMapJoinContext().get(mapJoinOp);
- if (joinCtx == null) {
+ if (!this.pGraphContext.getMapJoinOps().contains(mapJoinOp)) {
return false;
}
List<String> joinAliases = new ArrayList<String>();
- String[] srcs = joinCtx.getBaseSrc();
- String[] left = joinCtx.getLeftAliases();
- List<String> mapAlias = joinCtx.getMapAliases();
+ String[] srcs = mapJoinOp.getConf().getBaseSrc();
+ String[] left = mapJoinOp.getConf().getLeftAliases();
+ List<String> mapAlias = mapJoinOp.getConf().getMapAliases();
String baseBigAlias = null;
for (String s : left) {
if (s != null) {
- String subQueryAlias = QB.getAppendedAliasFromId(joinCtx.getId(), s);
+ String subQueryAlias = QB.getAppendedAliasFromId(mapJoinOp.getConf().getId(), s);
if (!joinAliases.contains(subQueryAlias)) {
joinAliases.add(subQueryAlias);
if (!mapAlias.contains(s)) {
@@ -161,7 +158,7 @@ abstract public class AbstractBucketJoin
for (String s : srcs) {
if (s != null) {
- String subQueryAlias = QB.getAppendedAliasFromId(joinCtx.getId(), s);
+ String subQueryAlias = QB.getAppendedAliasFromId(mapJoinOp.getConf().getId(), s);
if (!joinAliases.contains(subQueryAlias)) {
joinAliases.add(subQueryAlias);
if (!mapAlias.contains(s)) {
@@ -174,9 +171,8 @@ abstract public class AbstractBucketJoin
Map<Byte, List<ExprNodeDesc>> keysMap = mapJoinOp.getConf().getKeys();
return checkConvertBucketMapJoin(
- pGraphContext,
context,
- joinCtx,
+ mapJoinOp.getConf().getAliasToOpInfo(),
keysMap,
baseBigAlias,
joinAliases);
@@ -191,9 +187,8 @@ abstract public class AbstractBucketJoin
* d. The number of buckets in the big table can be divided by no of buckets in small tables.
*/
protected boolean checkConvertBucketMapJoin(
- ParseContext pGraphContext,
BucketJoinProcCtx context,
- QBJoinTree joinCtx,
+ Map<String, Operator<? extends OperatorDesc>> aliasToOpInfo,
Map<Byte, List<ExprNodeDesc>> keysMap,
String baseBigAlias,
List<String> joinAliases) throws SemanticException {
@@ -218,7 +213,7 @@ abstract public class AbstractBucketJoin
boolean bigTablePartitioned = true;
for (int index = 0; index < joinAliases.size(); index++) {
String alias = joinAliases.get(index);
- Operator<? extends OperatorDesc> topOp = joinCtx.getAliasToOpInfo().get(alias);
+ Operator<? extends OperatorDesc> topOp = aliasToOpInfo.get(alias);
// The alias may not be present in case of a sub-query
if (topOp == null) {
return false;
@@ -459,7 +454,7 @@ abstract public class AbstractBucketJoin
}
// convert partition to partition spec string
- private static Map<String, List<String>> convert(Map<Partition, List<String>> mapping) {
+ private Map<String, List<String>> convert(Map<Partition, List<String>> mapping) {
Map<String, List<String>> converted = new HashMap<String, List<String>>();
for (Map.Entry<Partition, List<String>> entry : mapping.entrySet()) {
converted.put(entry.getKey().getName(), entry.getValue());
@@ -488,7 +483,7 @@ abstract public class AbstractBucketJoin
}
// called for each partition of big table and populates mapping for each file in the partition
- private static void fillMappingBigTableBucketFileNameToSmallTableBucketFileNames(
+ private void fillMappingBigTableBucketFileNameToSmallTableBucketFileNames(
List<Integer> smallTblBucketNums,
List<List<String>> smallTblFilesList,
Map<String, List<String>> bigTableBucketFileNameToSmallTableBucketFileNames,
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java?rev=1653088&r1=1653087&r2=1653088&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java Mon Jan 19 19:42:21 2015
@@ -46,7 +46,6 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.QB;
-import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.TableAccessAnalyzer;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -97,17 +96,17 @@ abstract public class AbstractSMBJoinPro
return false;
}
- boolean tableEligibleForBucketedSortMergeJoin = true;
- QBJoinTree joinCxt = this.pGraphContext.getMapJoinContext()
- .get(mapJoinOp);
- if (joinCxt == null) {
+ if (!this.pGraphContext.getMapJoinOps().contains(mapJoinOp)) {
return false;
}
- String[] srcs = joinCxt.getBaseSrc();
+
+ String[] srcs = mapJoinOp.getConf().getBaseSrc();
for (int srcPos = 0; srcPos < srcs.length; srcPos++) {
- srcs[srcPos] = QB.getAppendedAliasFromId(joinCxt.getId(), srcs[srcPos]);
+ srcs[srcPos] = QB.getAppendedAliasFromId(mapJoinOp.getConf().getId(), srcs[srcPos]);
}
+ boolean tableEligibleForBucketedSortMergeJoin = true;
+
// All the tables/partitions columns should be sorted in the same order
// For example, if tables A and B are being joined on columns c1, c2 and c3
// which are the sorted and bucketed columns. The join would work, as long
@@ -117,9 +116,8 @@ abstract public class AbstractSMBJoinPro
for (int pos = 0; pos < srcs.length; pos++) {
tableEligibleForBucketedSortMergeJoin = tableEligibleForBucketedSortMergeJoin
&& isEligibleForBucketSortMergeJoin(smbJoinContext,
- pGraphContext,
mapJoinOp.getConf().getKeys().get((byte) pos),
- joinCxt,
+ mapJoinOp.getConf().getAliasToOpInfo(),
srcs,
pos,
sortColumnsFirstTable);
@@ -141,8 +139,7 @@ abstract public class AbstractSMBJoinPro
// Convert the bucket map-join operator to a sort-merge map join operator
protected SMBMapJoinOperator convertBucketMapJoinToSMBJoin(MapJoinOperator mapJoinOp,
- SortBucketJoinProcCtx smbJoinContext,
- ParseContext parseContext) {
+ SortBucketJoinProcCtx smbJoinContext) {
String[] srcs = smbJoinContext.getSrcs();
SMBMapJoinOperator smbJop = new SMBMapJoinOperator(mapJoinOp);
@@ -219,10 +216,13 @@ abstract public class AbstractSMBJoinPro
child.getParentOperators().remove(index);
child.getParentOperators().add(index, smbJop);
}
- parseContext.getSmbMapJoinContext().put(smbJop,
- parseContext.getMapJoinContext().get(mapJoinOp));
- parseContext.getMapJoinContext().remove(mapJoinOp);
- parseContext.getOpParseCtx().put(smbJop, parseContext.getOpParseCtx().get(mapJoinOp));
+
+ // Data structures coming from QBJoinTree
+ smbJop.getConf().setQBJoinTreeProps(mapJoinOp.getConf());
+ //
+ pGraphContext.getSmbMapJoinOps().add(smbJop);
+ pGraphContext.getMapJoinOps().remove(mapJoinOp);
+ pGraphContext.getOpParseCtx().put(smbJop, pGraphContext.getOpParseCtx().get(mapJoinOp));
return smbJop;
}
@@ -242,9 +242,8 @@ abstract public class AbstractSMBJoinPro
*/
private boolean isEligibleForBucketSortMergeJoin(
SortBucketJoinProcCtx smbJoinContext,
- ParseContext pctx,
List<ExprNodeDesc> keys,
- QBJoinTree joinTree,
+ Map<String, Operator<? extends OperatorDesc>> aliasToOpInfo,
String[] aliases,
int pos,
List<Order> sortColumnsFirstTable) throws SemanticException {
@@ -264,7 +263,7 @@ abstract public class AbstractSMBJoinPro
* table. If the object being map-joined is a base table, then aliasToOpInfo
* contains the TableScanOperator, and TableAccessAnalyzer is a no-op.
*/
- Operator<? extends OperatorDesc> topOp = joinTree.getAliasToOpInfo().get(alias);
+ Operator<? extends OperatorDesc> topOp = aliasToOpInfo.get(alias);
if (topOp == null) {
return false;
}
@@ -384,15 +383,13 @@ abstract public class AbstractSMBJoinPro
// It is already verified that the join can be converted to a bucket map join
protected boolean checkConvertJoinToSMBJoin(
JoinOperator joinOperator,
- SortBucketJoinProcCtx smbJoinContext,
- ParseContext pGraphContext) throws SemanticException {
-
- QBJoinTree joinCtx = pGraphContext.getJoinContext().get(joinOperator);
+ SortBucketJoinProcCtx smbJoinContext) throws SemanticException {
- if (joinCtx == null) {
+ if (!this.pGraphContext.getJoinOps().contains(joinOperator)) {
return false;
}
- String[] srcs = joinCtx.getBaseSrc();
+
+ String[] srcs = joinOperator.getConf().getBaseSrc();
// All the tables/partitions columns should be sorted in the same order
// For example, if tables A and B are being joined on columns c1, c2 and c3
@@ -402,9 +399,8 @@ abstract public class AbstractSMBJoinPro
for (int pos = 0; pos < srcs.length; pos++) {
if (!isEligibleForBucketSortMergeJoin(smbJoinContext,
- pGraphContext,
smbJoinContext.getKeyExprMap().get((byte) pos),
- joinCtx,
+ joinOperator.getConf().getAliasToOpInfo(),
srcs,
pos,
sortColumnsFirstTable)) {
@@ -419,12 +415,10 @@ abstract public class AbstractSMBJoinPro
// Can the join operator be converted to a sort-merge join operator ?
protected boolean canConvertJoinToSMBJoin(
JoinOperator joinOperator,
- SortBucketJoinProcCtx smbJoinContext,
- ParseContext pGraphContext) throws SemanticException {
+ SortBucketJoinProcCtx smbJoinContext) throws SemanticException {
boolean canConvert =
canConvertJoinToBucketMapJoin(
joinOperator,
- pGraphContext,
smbJoinContext
);
@@ -432,13 +426,12 @@ abstract public class AbstractSMBJoinPro
return false;
}
- return checkConvertJoinToSMBJoin(joinOperator, smbJoinContext, pGraphContext);
+ return checkConvertJoinToSMBJoin(joinOperator, smbJoinContext);
}
// Can the join operator be converted to a bucket map-merge join operator ?
protected boolean canConvertJoinToBucketMapJoin(
JoinOperator joinOp,
- ParseContext pGraphContext,
SortBucketJoinProcCtx context) throws SemanticException {
// This has already been inspected and rejected
@@ -446,8 +439,7 @@ abstract public class AbstractSMBJoinPro
return false;
}
- QBJoinTree joinCtx = pGraphContext.getJoinContext().get(joinOp);
- if (joinCtx == null) {
+ if (!this.pGraphContext.getJoinOps().contains(joinOp)) {
return false;
}
@@ -480,8 +472,9 @@ abstract public class AbstractSMBJoinPro
context.setBigTablePosition(bigTablePosition);
String joinAlias =
bigTablePosition == 0 ?
- joinCtx.getLeftAlias() : joinCtx.getRightAliases()[bigTablePosition - 1];
- joinAlias = QB.getAppendedAliasFromId(joinCtx.getId(), joinAlias);
+ joinOp.getConf().getLeftAlias() :
+ joinOp.getConf().getRightAliases()[bigTablePosition - 1];
+ joinAlias = QB.getAppendedAliasFromId(joinOp.getConf().getId(), joinAlias);
Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>();
List<Operator<? extends OperatorDesc>> parentOps = joinOp.getParentOperators();
@@ -495,10 +488,10 @@ abstract public class AbstractSMBJoinPro
context.setKeyExprMap(keyExprMap);
// Make a deep copy of the aliases so that they are not changed in the context
- String[] joinSrcs = joinCtx.getBaseSrc();
+ String[] joinSrcs = joinOp.getConf().getBaseSrc();
String[] srcs = new String[joinSrcs.length];
for (int srcPos = 0; srcPos < joinSrcs.length; srcPos++) {
- joinSrcs[srcPos] = QB.getAppendedAliasFromId(joinCtx.getId(), joinSrcs[srcPos]);
+ joinSrcs[srcPos] = QB.getAppendedAliasFromId(joinOp.getConf().getId(), joinSrcs[srcPos]);
srcs[srcPos] = new String(joinSrcs[srcPos]);
}
@@ -506,9 +499,8 @@ abstract public class AbstractSMBJoinPro
// The candidate map-join was derived from the pluggable sort merge join big
// table matcher.
return checkConvertBucketMapJoin(
- pGraphContext,
context,
- joinCtx,
+ joinOp.getConf().getAliasToOpInfo(),
keyExprMap,
joinAlias,
Arrays.asList(srcs));
@@ -517,19 +509,23 @@ abstract public class AbstractSMBJoinPro
// Convert the join operator to a bucket map-join join operator
protected MapJoinOperator convertJoinToBucketMapJoin(
JoinOperator joinOp,
- SortBucketJoinProcCtx joinContext,
- ParseContext parseContext) throws SemanticException {
+ SortBucketJoinProcCtx joinContext) throws SemanticException {
MapJoinOperator mapJoinOp = new MapJoinProcessor().convertMapJoin(
- parseContext.getConf(),
- parseContext.getOpParseCtx(),
+ pGraphContext.getConf(),
+ pGraphContext.getOpParseCtx(),
joinOp,
- pGraphContext.getJoinContext().get(joinOp),
+ joinOp.getConf().isLeftInputJoin(),
+ joinOp.getConf().getBaseSrc(),
+ joinOp.getConf().getMapAliases(),
joinContext.getBigTablePosition(),
false,
false);
// Remove the join operator from the query join context
- parseContext.getMapJoinContext().put(mapJoinOp, parseContext.getJoinContext().get(joinOp));
- parseContext.getJoinContext().remove(joinOp);
+ // Data structures coming from QBJoinTree
+ mapJoinOp.getConf().setQBJoinTreeProps(joinOp.getConf());
+ //
+ pGraphContext.getMapJoinOps().add(mapJoinOp);
+ pGraphContext.getJoinOps().remove(joinOp);
convertMapJoinToBucketMapJoin(mapJoinOp, joinContext);
return mapJoinOp;
}
@@ -537,11 +533,10 @@ abstract public class AbstractSMBJoinPro
// Convert the join operator to a sort-merge join operator
protected void convertJoinToSMBJoin(
JoinOperator joinOp,
- SortBucketJoinProcCtx smbJoinContext,
- ParseContext parseContext) throws SemanticException {
- MapJoinOperator mapJoinOp = convertJoinToBucketMapJoin(joinOp, smbJoinContext, parseContext);
+ SortBucketJoinProcCtx smbJoinContext) throws SemanticException {
+ MapJoinOperator mapJoinOp = convertJoinToBucketMapJoin(joinOp, smbJoinContext);
SMBMapJoinOperator smbMapJoinOp =
- convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext, parseContext);
+ convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext);
smbMapJoinOp.setConvertedAutomaticallySMBJoin(true);
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java?rev=1653088&r1=1653087&r2=1653088&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java Mon Jan 19 19:42:21 2015
@@ -45,7 +45,7 @@ public class BucketMapjoinProc extends A
// can the mapjoin present be converted to a bucketed mapjoin
boolean convert = canConvertMapJoinToBucketMapJoin(
- mapJoinOperator, pGraphContext, context);
+ mapJoinOperator, context);
HiveConf conf = context.getConf();
// Throw an error if the user asked for bucketed mapjoin to be enforced and
@@ -67,13 +67,13 @@ public class BucketMapjoinProc extends A
* and do the version if possible.
*/
public static void checkAndConvertBucketMapJoin(ParseContext pGraphContext,
- MapJoinOperator mapJoinOp, QBJoinTree joinCtx, String baseBigAlias,
+ MapJoinOperator mapJoinOp, String baseBigAlias,
List<String> joinAliases) throws SemanticException {
BucketJoinProcCtx ctx = new BucketJoinProcCtx(pGraphContext.getConf());
BucketMapjoinProc proc = new BucketMapjoinProc(pGraphContext);
Map<Byte, List<ExprNodeDesc>> keysMap = mapJoinOp.getConf().getKeys();
- if (proc.checkConvertBucketMapJoin(pGraphContext, ctx,
- joinCtx, keysMap, baseBigAlias, joinAliases)) {
+ if (proc.checkConvertBucketMapJoin(ctx, mapJoinOp.getConf().getAliasToOpInfo(),
+ keysMap, baseBigAlias, joinAliases)) {
proc.convertMapJoinToBucketMapJoin(mapJoinOp, ctx);
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1653088&r1=1653087&r2=1653088&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Mon Jan 19 19:42:21 2015
@@ -47,8 +47,8 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
+import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
@@ -231,13 +231,16 @@ public class ConvertJoinMapJoin implemen
ParseContext parseContext = context.parseContext;
MapJoinDesc mapJoinDesc = null;
if (adjustParentsChildren) {
- mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf, parseContext.getOpParseCtx(),
- joinOp, parseContext.getJoinContext().get(joinOp), mapJoinConversionPos, true);
+ mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf, parseContext.getOpParseCtx(),
+ joinOp, joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(), joinOp.getConf().getMapAliases(),
+ mapJoinConversionPos, true);
} else {
JoinDesc joinDesc = joinOp.getConf();
// retain the original join desc in the map join.
mapJoinDesc =
- new MapJoinDesc(MapJoinProcessor.getKeys(parseContext.getJoinContext().get(joinOp), joinOp).getSecond(),
+ new MapJoinDesc(
+ MapJoinProcessor.getKeys(joinOp.getConf().isLeftInputJoin(),
+ joinOp.getConf().getBaseSrc(), joinOp).getSecond(),
null, joinDesc.getExprs(), null, null,
joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(),
joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null);
@@ -606,7 +609,8 @@ public class ConvertJoinMapJoin implemen
ParseContext parseContext = context.parseContext;
MapJoinOperator mapJoinOp =
MapJoinProcessor.convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(), joinOp,
- parseContext.getJoinContext().get(joinOp), bigTablePosition, true);
+ joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(), joinOp.getConf().getMapAliases(),
+ bigTablePosition, true);
Operator<? extends OperatorDesc> parentBigTableOp =
mapJoinOp.getParentOperators().get(bigTablePosition);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1653088&r1=1653087&r2=1653088&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Mon Jan 19 19:42:21 2015
@@ -18,7 +18,20 @@
package org.apache.hadoop.hive.ql.optimizer;
-import com.google.common.collect.Interner;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@@ -67,7 +80,6 @@ import org.apache.hadoop.hive.ql.parse.B
import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
-import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.parse.QBParseInfo;
import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -102,19 +114,7 @@ import org.apache.hadoop.hive.ql.stats.S
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.mapred.InputFormat;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
+import com.google.common.collect.Interner;
/**
* General utility common functions for the Processor to convert operator into
@@ -1066,17 +1066,23 @@ public final class GenMapRedUtils {
if (needsTagging(cplan.getReduceWork())) {
Operator<? extends OperatorDesc> reducerOp = cplan.getReduceWork().getReducer();
- QBJoinTree joinTree = null;
+ String id = null;
if (reducerOp instanceof JoinOperator) {
- joinTree = parseCtx.getJoinContext().get(reducerOp);
+ if (parseCtx.getJoinOps().contains(reducerOp)) {
+ id = ((JoinOperator)reducerOp).getConf().getId();
+ }
} else if (reducerOp instanceof MapJoinOperator) {
- joinTree = parseCtx.getMapJoinContext().get(reducerOp);
+ if (parseCtx.getMapJoinOps().contains(reducerOp)) {
+ id = ((MapJoinOperator)reducerOp).getConf().getId();
+ }
} else if (reducerOp instanceof SMBMapJoinOperator) {
- joinTree = parseCtx.getSmbMapJoinContext().get(reducerOp);
+ if (parseCtx.getSmbMapJoinOps().contains(reducerOp)) {
+ id = ((SMBMapJoinOperator)reducerOp).getConf().getId();
+ }
}
- if (joinTree != null && joinTree.getId() != null) {
- streamDesc = joinTree.getId() + ":$INTNAME";
+ if (id != null) {
+ streamDesc = id + ":$INTNAME";
} else {
streamDesc = "$INTNAME";
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java?rev=1653088&r1=1653087&r2=1653088&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java Mon Jan 19 19:42:21 2015
@@ -26,7 +26,6 @@ import org.apache.hadoop.hive.ql.exec.Op
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -95,9 +94,9 @@ public class JoinReorder implements Tran
private Set<String> getBigTables(ParseContext joinCtx) {
Set<String> bigTables = new HashSet<String>();
- for (QBJoinTree qbJoin : joinCtx.getJoinContext().values()) {
- if (qbJoin.getStreamAliases() != null) {
- bigTables.addAll(qbJoin.getStreamAliases());
+ for (JoinOperator joinOp : joinCtx.getJoinOps()) {
+ if (joinOp.getConf().getStreamAliases() != null) {
+ bigTables.addAll(joinOp.getConf().getStreamAliases());
}
}
@@ -155,7 +154,7 @@ public class JoinReorder implements Tran
public ParseContext transform(ParseContext pactx) throws SemanticException {
Set<String> bigTables = getBigTables(pactx);
- for (JoinOperator joinOp : pactx.getJoinContext().keySet()) {
+ for (JoinOperator joinOp : pactx.getJoinOps()) {
reorder(joinOp, bigTables);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1653088&r1=1653087&r2=1653088&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Mon Jan 19 19:42:21 2015
@@ -61,7 +61,6 @@ import org.apache.hadoop.hive.ql.lib.Rul
import org.apache.hadoop.hive.ql.parse.GenMapRedWalker;
import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -96,18 +95,13 @@ public class MapJoinProcessor implements
// needs to be passed. Use the string defined below for that.
private static final String MAPJOINKEY_FIELDPREFIX = "mapjoinkey";
- private ParseContext pGraphContext;
-
- /**
- * empty constructor.
- */
public MapJoinProcessor() {
- pGraphContext = null;
}
@SuppressWarnings("nls")
- private Operator<? extends OperatorDesc>
- putOpInsertMap(Operator<? extends OperatorDesc> op, RowResolver rr) {
+ private static Operator<? extends OperatorDesc> putOpInsertMap (
+ ParseContext pGraphContext, Operator<? extends OperatorDesc> op,
+ RowResolver rr) {
OpParseContext ctx = new OpParseContext(rr);
pGraphContext.getOpParseCtx().put(op, ctx);
return op;
@@ -232,10 +226,10 @@ public class MapJoinProcessor implements
throws SemanticException {
LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap =
newWork.getMapWork().getOpParseCtxMap();
- QBJoinTree newJoinTree = newWork.getMapWork().getJoinTree();
// generate the map join operator; already checked the map join
MapJoinOperator newMapJoinOp = new MapJoinProcessor().convertMapJoin(conf, opParseCtxMap, op,
- newJoinTree, mapJoinPos, true, false);
+ newWork.getMapWork().isLeftInputJoin(), newWork.getMapWork().getBaseSrc(), newWork.getMapWork().getMapAliases(),
+ mapJoinPos, true, false);
genLocalWorkForMapJoin(newWork, newMapJoinOp, mapJoinPos);
}
@@ -247,7 +241,9 @@ public class MapJoinProcessor implements
MapJoinProcessor.genMapJoinLocalWork(newWork, newMapJoinOp, mapJoinPos);
// clean up the mapred work
newWork.getMapWork().setOpParseCtxMap(null);
- newWork.getMapWork().setJoinTree(null);
+ newWork.getMapWork().setLeftInputJoin(false);
+ newWork.getMapWork().setBaseSrc(null);
+ newWork.getMapWork().setMapAliases(null);
} catch (Exception e) {
e.printStackTrace();
@@ -307,9 +303,8 @@ public class MapJoinProcessor implements
*/
public MapJoinOperator convertMapJoin(HiveConf conf,
LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
- JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin,
- boolean validateMapJoinTree)
- throws SemanticException {
+ JoinOperator op, boolean leftInputJoin, String[] baseSrc, List<String> mapAliases,
+ int mapJoinPos, boolean noCheckOuterJoin, boolean validateMapJoinTree) throws SemanticException {
// outer join cannot be performed on a table which is being cached
JoinDesc desc = op.getConf();
@@ -324,8 +319,6 @@ public class MapJoinProcessor implements
// Walk over all the sources (which are guaranteed to be reduce sink
// operators).
// The join outputs a concatenation of all the inputs.
- QBJoinTree leftSrc = joinTree.getJoinSrc();
-
List<Operator<? extends OperatorDesc>> parentOps = op.getParentOperators();
List<Operator<? extends OperatorDesc>> newParentOps =
new ArrayList<Operator<? extends OperatorDesc>>();
@@ -333,7 +326,7 @@ public class MapJoinProcessor implements
new ArrayList<Operator<? extends OperatorDesc>>();
// found a source which is not to be stored in memory
- if (leftSrc != null) {
+ if (leftInputJoin) {
// assert mapJoinPos == 0;
Operator<? extends OperatorDesc> parentOp = parentOps.get(0);
assert parentOp.getParentOperators().size() == 1;
@@ -345,7 +338,7 @@ public class MapJoinProcessor implements
byte pos = 0;
// Remove parent reduce-sink operators
- for (String src : joinTree.getBaseSrc()) {
+ for (String src : baseSrc) {
if (src != null) {
Operator<? extends OperatorDesc> parentOp = parentOps.get(pos);
assert parentOp.getParentOperators().size() == 1;
@@ -360,7 +353,7 @@ public class MapJoinProcessor implements
// create the map-join operator
MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(conf, opParseCtxMap,
- op, joinTree, mapJoinPos, noCheckOuterJoin);
+ op, leftInputJoin, baseSrc, mapAliases, mapJoinPos, noCheckOuterJoin);
// remove old parents
@@ -384,11 +377,12 @@ public class MapJoinProcessor implements
public static MapJoinOperator convertJoinOpMapJoinOp(HiveConf hconf,
LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
- JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin)
- throws SemanticException {
+ JoinOperator op, boolean leftInputJoin, String[] baseSrc, List<String> mapAliases,
+ int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException {
MapJoinDesc mapJoinDescriptor =
- getMapJoinDesc(hconf, opParseCtxMap, op, joinTree, mapJoinPos, noCheckOuterJoin);
+ getMapJoinDesc(hconf, opParseCtxMap, op, leftInputJoin, baseSrc, mapAliases,
+ mapJoinPos, noCheckOuterJoin);
// reduce sink row resolver used to generate map join op
RowResolver outputRS = opParseCtxMap.get(op).getRowResolver();
@@ -441,7 +435,7 @@ public class MapJoinProcessor implements
*/
public static MapJoinOperator convertSMBJoinToMapJoin(HiveConf hconf,
Map<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
- SMBMapJoinOperator smbJoinOp, QBJoinTree joinTree, int bigTablePos, boolean noCheckOuterJoin)
+ SMBMapJoinOperator smbJoinOp, int bigTablePos, boolean noCheckOuterJoin)
throws SemanticException {
// Create a new map join operator
SMBJoinDesc smbJoinDesc = smbJoinOp.getConf();
@@ -488,7 +482,7 @@ public class MapJoinProcessor implements
}
public MapJoinOperator generateMapJoinOperator(ParseContext pctx, JoinOperator op,
- QBJoinTree joinTree, int mapJoinPos) throws SemanticException {
+ int mapJoinPos) throws SemanticException {
HiveConf hiveConf = pctx.getConf();
boolean noCheckOuterJoin = HiveConf.getBoolVar(hiveConf,
HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)
@@ -497,7 +491,8 @@ public class MapJoinProcessor implements
LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap = pctx
.getOpParseCtx();
MapJoinOperator mapJoinOp = convertMapJoin(pctx.getConf(), opParseCtxMap, op,
- joinTree, mapJoinPos, noCheckOuterJoin, true);
+ op.getConf().isLeftInputJoin(), op.getConf().getBaseSrc(), op.getConf().getMapAliases(),
+ mapJoinPos, noCheckOuterJoin, true);
// create a dummy select to select all columns
genSelectPlan(pctx, mapJoinOp);
return mapJoinOp;
@@ -626,7 +621,7 @@ public class MapJoinProcessor implements
SelectDesc select = new SelectDesc(exprs, outputs, false);
- SelectOperator sel = (SelectOperator) putOpInsertMap(OperatorFactory.getAndMakeChild(select,
+ SelectOperator sel = (SelectOperator) putOpInsertMap(pctx, OperatorFactory.getAndMakeChild(select,
new RowSchema(inputRR.getColumnInfos()), input), inputRR);
sel.setColumnExprMap(colExprMap);
@@ -643,24 +638,22 @@ public class MapJoinProcessor implements
*
* @param op
* join operator
- * @param joinTree
- * qb join tree
* @return -1 if it cannot be converted to a map-side join, position of the map join node
* otherwise
*/
- private int mapSideJoin(JoinOperator op, QBJoinTree joinTree) throws SemanticException {
+ private int mapSideJoin(JoinOperator op) throws SemanticException {
int mapJoinPos = -1;
- if (joinTree.isMapSideJoin()) {
+ if (op.getConf().isMapSideJoin()) {
int pos = 0;
// In a map-side join, exactly one table is not present in memory.
// The client provides the list of tables which can be cached in memory
// via a hint.
- if (joinTree.getJoinSrc() != null) {
+ if (op.getConf().isLeftInputJoin()) {
mapJoinPos = pos;
}
- for (String src : joinTree.getBaseSrc()) {
+ for (String src : op.getConf().getBaseSrc()) {
if (src != null) {
- if (!joinTree.getMapAliases().contains(src)) {
+ if (!op.getConf().getMapAliases().contains(src)) {
if (mapJoinPos >= 0) {
return -1;
}
@@ -675,7 +668,7 @@ public class MapJoinProcessor implements
// leaving some table from the list of tables to be cached
if (mapJoinPos == -1) {
throw new SemanticException(ErrorMsg.INVALID_MAPJOIN_HINT.getMsg(
- Arrays.toString(joinTree.getBaseSrc())));
+ Arrays.toString(op.getConf().getBaseSrc())));
}
}
@@ -691,36 +684,34 @@ public class MapJoinProcessor implements
*/
@Override
public ParseContext transform(ParseContext pactx) throws SemanticException {
- pGraphContext = pactx;
List<MapJoinOperator> listMapJoinOps = new ArrayList<MapJoinOperator>();
// traverse all the joins and convert them if necessary
- if (pGraphContext.getJoinContext() != null) {
- Map<JoinOperator, QBJoinTree> joinMap = new HashMap<JoinOperator, QBJoinTree>();
- Map<MapJoinOperator, QBJoinTree> mapJoinMap = pGraphContext.getMapJoinContext();
+ if (pactx.getJoinOps() != null) {
+ Set<JoinOperator> joinMap = new HashSet<JoinOperator>();
+ Set<MapJoinOperator> mapJoinMap = pactx.getMapJoinOps();
if (mapJoinMap == null) {
- mapJoinMap = new HashMap<MapJoinOperator, QBJoinTree>();
- pGraphContext.setMapJoinContext(mapJoinMap);
+ mapJoinMap = new HashSet<MapJoinOperator>();
+ pactx.setMapJoinOps(mapJoinMap);
}
- Set<Map.Entry<JoinOperator, QBJoinTree>> joinCtx = pGraphContext.getJoinContext().entrySet();
- Iterator<Map.Entry<JoinOperator, QBJoinTree>> joinCtxIter = joinCtx.iterator();
+ Iterator<JoinOperator> joinCtxIter = pactx.getJoinOps().iterator();
while (joinCtxIter.hasNext()) {
- Map.Entry<JoinOperator, QBJoinTree> joinEntry = joinCtxIter.next();
- JoinOperator joinOp = joinEntry.getKey();
- QBJoinTree qbJoin = joinEntry.getValue();
- int mapJoinPos = mapSideJoin(joinOp, qbJoin);
+ JoinOperator joinOp = joinCtxIter.next();
+ int mapJoinPos = mapSideJoin(joinOp);
if (mapJoinPos >= 0) {
- MapJoinOperator mapJoinOp = generateMapJoinOperator(pactx, joinOp, qbJoin, mapJoinPos);
+ MapJoinOperator mapJoinOp = generateMapJoinOperator(pactx, joinOp, mapJoinPos);
listMapJoinOps.add(mapJoinOp);
- mapJoinMap.put(mapJoinOp, qbJoin);
+ mapJoinOp.getConf().setQBJoinTreeProps(joinOp.getConf());
+ mapJoinMap.add(mapJoinOp);
} else {
- joinMap.put(joinOp, qbJoin);
+ joinOp.getConf().setQBJoinTreeProps(joinOp.getConf());
+ joinMap.add(joinOp);
}
}
// store the new joinContext
- pGraphContext.setJoinContext(joinMap);
+ pactx.setJoinOps(joinMap);
}
// Go over the list and find if a reducer is not needed
@@ -746,15 +737,15 @@ public class MapJoinProcessor implements
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
Dispatcher disp = new DefaultRuleDispatcher(getDefault(), opRules, new MapJoinWalkerCtx(
- listMapJoinOpsNoRed, pGraphContext));
+ listMapJoinOpsNoRed, pactx));
GraphWalker ogw = new GenMapRedWalker(disp);
ArrayList<Node> topNodes = new ArrayList<Node>();
topNodes.addAll(listMapJoinOps);
ogw.startWalking(topNodes, null);
- pGraphContext.setListMapJoinOpsNoReducer(listMapJoinOpsNoRed);
- return pGraphContext;
+ pactx.setListMapJoinOpsNoReducer(listMapJoinOpsNoRed);
+ return pactx;
}
/**
@@ -800,7 +791,7 @@ public class MapJoinProcessor implements
}
Operator<? extends OperatorDesc> ch = parent.getChildOperators().get(0);
if (ch instanceof MapJoinOperator) {
- if (!nonSubqueryMapJoin(ctx.getpGraphContext(), (MapJoinOperator) ch, mapJoin)) {
+ if (!nonSubqueryMapJoin((MapJoinOperator) ch, mapJoin)) {
if (ch.getParentOperators().indexOf(parent) == ((MapJoinOperator) ch).getConf()
.getPosBigTable()) {
// not come from the local branch
@@ -820,11 +811,8 @@ public class MapJoinProcessor implements
}
}
- private boolean nonSubqueryMapJoin(ParseContext pGraphContext, MapJoinOperator mapJoin,
- MapJoinOperator parentMapJoin) {
- QBJoinTree joinTree = pGraphContext.getMapJoinContext().get(mapJoin);
- QBJoinTree parentJoinTree = pGraphContext.getMapJoinContext().get(parentMapJoin);
- if (joinTree.getJoinSrc() != null && joinTree.getJoinSrc().equals(parentJoinTree)) {
+ private boolean nonSubqueryMapJoin(MapJoinOperator mapJoin, MapJoinOperator parentMapJoin) {
+ if (mapJoin.getParentOperators().contains(parentMapJoin)) {
return true;
}
return false;
@@ -1028,15 +1016,15 @@ public class MapJoinProcessor implements
}
- public static ObjectPair<List<ReduceSinkOperator>, Map<Byte, List<ExprNodeDesc>>> getKeys(QBJoinTree joinTree, JoinOperator op) {
+ public static ObjectPair<List<ReduceSinkOperator>, Map<Byte, List<ExprNodeDesc>>> getKeys(
+ boolean leftInputJoin, String[] baseSrc, JoinOperator op) {
// Walk over all the sources (which are guaranteed to be reduce sink
// operators).
// The join outputs a concatenation of all the inputs.
- QBJoinTree leftSrc = joinTree.getJoinSrc();
List<ReduceSinkOperator> oldReduceSinkParentOps =
new ArrayList<ReduceSinkOperator>(op.getNumParent());
- if (leftSrc != null) {
+ if (leftInputJoin) {
// assert mapJoinPos == 0;
Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(0);
assert parentOp.getParentOperators().size() == 1;
@@ -1044,7 +1032,7 @@ public class MapJoinProcessor implements
}
byte pos = 0;
- for (String src : joinTree.getBaseSrc()) {
+ for (String src : baseSrc) {
if (src != null) {
Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos);
assert parentOp.getParentOperators().size() == 1;
@@ -1062,12 +1050,14 @@ public class MapJoinProcessor implements
keyExprMap.put(pos, keyCols);
}
- return new ObjectPair<List<ReduceSinkOperator>, Map<Byte,List<ExprNodeDesc>>>(oldReduceSinkParentOps, keyExprMap);
+ return new ObjectPair<List<ReduceSinkOperator>, Map<Byte,List<ExprNodeDesc>>>(
+ oldReduceSinkParentOps, keyExprMap);
}
public static MapJoinDesc getMapJoinDesc(HiveConf hconf,
LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
- JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException {
+ JoinOperator op, boolean leftInputJoin, String[] baseSrc, List<String> mapAliases,
+ int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException {
JoinDesc desc = op.getConf();
JoinCondDesc[] condns = desc.getConds();
Byte[] tagOrder = desc.getTagOrder();
@@ -1084,7 +1074,8 @@ public class MapJoinProcessor implements
Map<Byte, List<ExprNodeDesc>> valueExprs = op.getConf().getExprs();
Map<Byte, List<ExprNodeDesc>> newValueExprs = new HashMap<Byte, List<ExprNodeDesc>>();
- ObjectPair<List<ReduceSinkOperator>, Map<Byte,List<ExprNodeDesc>>> pair = getKeys(joinTree, op);
+ ObjectPair<List<ReduceSinkOperator>, Map<Byte,List<ExprNodeDesc>>> pair =
+ getKeys(leftInputJoin, baseSrc, op);
List<ReduceSinkOperator> oldReduceSinkParentOps = pair.getFirst();
for (Map.Entry<Byte, List<ExprNodeDesc>> entry : valueExprs.entrySet()) {
byte tag = entry.getKey();
@@ -1174,8 +1165,8 @@ public class MapJoinProcessor implements
// create dumpfile prefix needed to create descriptor
String dumpFilePrefix = "";
- if (joinTree.getMapAliases() != null) {
- for (String mapAlias : joinTree.getMapAliases()) {
+ if (mapAliases != null) {
+ for (String mapAlias : mapAliases) {
dumpFilePrefix = dumpFilePrefix + mapAlias;
}
dumpFilePrefix = dumpFilePrefix + "-" + PlanUtils.getCountForMapJoinDumpFilePrefix();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java?rev=1653088&r1=1653087&r2=1653088&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java Mon Jan 19 19:42:21 2015
@@ -30,6 +30,7 @@ import java.util.Stack;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
@@ -42,7 +43,6 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.lib.Rule;
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -55,15 +55,12 @@ import org.apache.hadoop.hive.ql.plan.Op
*/
public class NonBlockingOpDeDupProc implements Transform {
- private ParseContext pctx;
-
@Override
public ParseContext transform(ParseContext pctx) throws SemanticException {
- this.pctx = pctx;
String SEL = SelectOperator.getOperatorName();
String FIL = FilterOperator.getOperatorName();
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- opRules.put(new RuleRegExp("R1", SEL + "%" + SEL + "%"), new SelectDedup());
+ opRules.put(new RuleRegExp("R1", SEL + "%" + SEL + "%"), new SelectDedup(pctx));
opRules.put(new RuleRegExp("R2", FIL + "%" + FIL + "%"), new FilterDedup());
Dispatcher disp = new DefaultRuleDispatcher(null, opRules, null);
@@ -76,6 +73,13 @@ public class NonBlockingOpDeDupProc impl
}
private class SelectDedup implements NodeProcessor {
+
+ private ParseContext pctx;
+
+ public SelectDedup (ParseContext pctx) {
+ this.pctx = pctx;
+ }
+
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
@@ -178,28 +182,37 @@ public class NonBlockingOpDeDupProc impl
}
return true;
}
- }
- /**
- * Change existing references in the context to point from child to parent operator.
- * @param cSEL child operator (to be removed, and merged into parent)
- * @param pSEL parent operator
- */
- private void fixContextReferences(SelectOperator cSEL, SelectOperator pSEL) {
- Collection<QBJoinTree> qbJoinTrees = new ArrayList<QBJoinTree>();
- qbJoinTrees.addAll(pctx.getJoinContext().values());
- qbJoinTrees.addAll(pctx.getMapJoinContext().values());
- for (QBJoinTree qbJoinTree : qbJoinTrees) {
- Map<String, Operator<? extends OperatorDesc>> aliasToOpInfo = qbJoinTree.getAliasToOpInfo();
- for (Map.Entry<String, Operator<? extends OperatorDesc>> entry : aliasToOpInfo.entrySet()) {
- if (entry.getValue() == cSEL) {
- aliasToOpInfo.put(entry.getKey(), pSEL);
+ /**
+ * Change existing references in the context to point from child to parent operator.
+ * @param cSEL child operator (to be removed, and merged into parent)
+ * @param pSEL parent operator
+ */
+ private void fixContextReferences(SelectOperator cSEL, SelectOperator pSEL) {
+ Collection<Map<String, Operator<? extends OperatorDesc>>> mapsAliasToOpInfo =
+ new ArrayList<Map<String, Operator<? extends OperatorDesc>>>();
+ for (JoinOperator joinOp : pctx.getJoinOps()) {
+ if (joinOp.getConf().getAliasToOpInfo() != null) {
+ mapsAliasToOpInfo.add(joinOp.getConf().getAliasToOpInfo());
+ }
+ }
+ for (MapJoinOperator mapJoinOp : pctx.getMapJoinOps()) {
+ if (mapJoinOp.getConf().getAliasToOpInfo() != null) {
+ mapsAliasToOpInfo.add(mapJoinOp.getConf().getAliasToOpInfo());
+ }
+ }
+ for (Map<String, Operator<? extends OperatorDesc>> aliasToOpInfo : mapsAliasToOpInfo) {
+ for (Map.Entry<String, Operator<? extends OperatorDesc>> entry : aliasToOpInfo.entrySet()) {
+ if (entry.getValue() == cSEL) {
+ aliasToOpInfo.put(entry.getKey(), pSEL);
+ }
}
}
}
}
private class FilterDedup implements NodeProcessor {
+
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java?rev=1653088&r1=1653087&r2=1653088&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java Mon Jan 19 19:42:21 2015
@@ -79,11 +79,13 @@ import org.apache.hadoop.hive.serde2.typ
public class SkewJoinOptimizer implements Transform {
private static final Log LOG = LogFactory.getLog(SkewJoinOptimizer.class.getName());
- private static ParseContext parseContext;
public static class SkewJoinProc implements NodeProcessor {
- public SkewJoinProc() {
+ private ParseContext parseContext;
+
+ public SkewJoinProc(ParseContext parseContext) {
super();
+ this.parseContext = parseContext;
}
@Override
@@ -165,23 +167,14 @@ public class SkewJoinOptimizer implement
return null;
}
- // have to create a QBJoinTree for the cloned join operator
- QBJoinTree originJoinTree = parseContext.getJoinContext().get(joinOp);
- QBJoinTree newJoinTree;
- try {
- newJoinTree = originJoinTree.clone();
- } catch (CloneNotSupportedException e) {
- LOG.debug("QBJoinTree could not be cloned: ", e);
- return null;
- }
-
JoinOperator joinOpClone;
if (processSelect) {
joinOpClone = (JoinOperator)(currOpClone.getParentOperators().get(0));
} else {
joinOpClone = (JoinOperator)currOpClone;
}
- parseContext.getJoinContext().put(joinOpClone, newJoinTree);
+ joinOpClone.getConf().cloneQBJoinTreeProps(joinOp.getConf());
+ parseContext.getJoinOps().add(joinOpClone);
List<TableScanOperator> tableScanCloneOpsForJoin =
new ArrayList<TableScanOperator>();
@@ -211,7 +204,7 @@ public class SkewJoinOptimizer implement
}
parseContext.getTopOps().put(newAlias, tso);
- setUpAlias(originJoinTree, newJoinTree, tabAlias, newAlias, tso);
+ setUpAlias(joinOp, joinOpClone, tabAlias, newAlias, tso);
}
// Now do a union of the select operators: selectOp and selectOpClone
@@ -627,19 +620,19 @@ public class SkewJoinOptimizer implement
/**
* Set alias in the cloned join tree
*/
- private static void setUpAlias(QBJoinTree origin, QBJoinTree cloned, String origAlias,
+ private static void setUpAlias(JoinOperator origin, JoinOperator cloned, String origAlias,
String newAlias, Operator<? extends OperatorDesc> topOp) {
- cloned.getAliasToOpInfo().remove(origAlias);
- cloned.getAliasToOpInfo().put(newAlias, topOp);
- if (origin.getLeftAlias().equals(origAlias)) {
- cloned.setLeftAlias(null);
- cloned.setLeftAlias(newAlias);
- }
- replaceAlias(origin.getLeftAliases(), cloned.getLeftAliases(), origAlias, newAlias);
- replaceAlias(origin.getRightAliases(), cloned.getRightAliases(), origAlias, newAlias);
- replaceAlias(origin.getBaseSrc(), cloned.getBaseSrc(), origAlias, newAlias);
- replaceAlias(origin.getMapAliases(), cloned.getMapAliases(), origAlias, newAlias);
- replaceAlias(origin.getStreamAliases(), cloned.getStreamAliases(), origAlias, newAlias);
+ cloned.getConf().getAliasToOpInfo().remove(origAlias);
+ cloned.getConf().getAliasToOpInfo().put(newAlias, topOp);
+ if (origin.getConf().getLeftAlias().equals(origAlias)) {
+ cloned.getConf().setLeftAlias(null);
+ cloned.getConf().setLeftAlias(newAlias);
+ }
+ replaceAlias(origin.getConf().getLeftAliases(), cloned.getConf().getLeftAliases(), origAlias, newAlias);
+ replaceAlias(origin.getConf().getRightAliases(), cloned.getConf().getRightAliases(), origAlias, newAlias);
+ replaceAlias(origin.getConf().getBaseSrc(), cloned.getConf().getBaseSrc(), origAlias, newAlias);
+ replaceAlias(origin.getConf().getMapAliases(), cloned.getConf().getMapAliases(), origAlias, newAlias);
+ replaceAlias(origin.getConf().getStreamAliases(), cloned.getConf().getStreamAliases(), origAlias, newAlias);
}
private static void replaceAlias(String[] origin, String[] cloned,
@@ -675,7 +668,7 @@ public class SkewJoinOptimizer implement
public ParseContext transform(ParseContext pctx) throws SemanticException {
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- opRules.put(new RuleRegExp("R1", "TS%.*RS%JOIN%"), getSkewJoinProc());
+ opRules.put(new RuleRegExp("R1", "TS%.*RS%JOIN%"), getSkewJoinProc(pctx));
SkewJoinOptProcCtx skewJoinOptProcCtx = new SkewJoinOptProcCtx(pctx);
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
@@ -690,8 +683,8 @@ public class SkewJoinOptimizer implement
return pctx;
}
- private NodeProcessor getSkewJoinProc() {
- return new SkewJoinProc();
+ private NodeProcessor getSkewJoinProc(ParseContext parseContext) {
+ return new SkewJoinProc(parseContext);
}
/**
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java?rev=1653088&r1=1653087&r2=1653088&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java Mon Jan 19 19:42:21 2015
@@ -60,7 +60,7 @@ public class SortedMergeBucketMapjoinPro
}
if (convert) {
- convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext, pGraphContext);
+ convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext);
}
return null;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java?rev=1653088&r1=1653087&r2=1653088&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java Mon Jan 19 19:42:21 2015
@@ -44,10 +44,10 @@ public class SortedMergeJoinProc extends
SortBucketJoinProcCtx smbJoinContext = (SortBucketJoinProcCtx) procCtx;
boolean convert =
canConvertJoinToSMBJoin(
- joinOp, smbJoinContext, pGraphContext);
+ joinOp, smbJoinContext);
if (convert) {
- convertJoinToSMBJoin(joinOp, smbJoinContext, pGraphContext);
+ convertJoinToSMBJoin(joinOp, smbJoinContext);
}
return null;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkMapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkMapJoinProcessor.java?rev=1653088&r1=1653087&r2=1653088&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkMapJoinProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkMapJoinProcessor.java Mon Jan 19 19:42:21 2015
@@ -21,18 +21,18 @@ package org.apache.hadoop.hive.ql.optimi
import java.util.LinkedHashMap;
import java.util.List;
-import com.google.common.base.Preconditions;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.parse.OpParseContext;
-import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import com.google.common.base.Preconditions;
+
public class SparkMapJoinProcessor extends MapJoinProcessor {
/**
@@ -50,8 +50,8 @@ public class SparkMapJoinProcessor exten
@Override
public MapJoinOperator convertMapJoin(HiveConf conf,
LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
- JoinOperator op, QBJoinTree joinTree, int bigTablePos,
- boolean noCheckOuterJoin,
+ JoinOperator op, boolean leftSrc, String[] baseSrc, List<String> mapAliases,
+ int bigTablePos, boolean noCheckOuterJoin,
boolean validateMapJoinTree) throws SemanticException {
// outer join cannot be performed on a table which is being cached
@@ -65,7 +65,8 @@ public class SparkMapJoinProcessor exten
// create the map-join operator
MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(conf, opParseCtxMap,
- op, joinTree, bigTablePos, noCheckOuterJoin);
+ op, op.getConf().isLeftInputJoin(), op.getConf().getBaseSrc(),
+ op.getConf().getMapAliases(), bigTablePos, noCheckOuterJoin);
// 1. remove RS as parent for the big table branch
// 2. remove old join op from child set of all the RSs
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java?rev=1653088&r1=1653087&r2=1653088&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java Mon Jan 19 19:42:21 2015
@@ -107,7 +107,7 @@ public class CorrelationOptimizer implem
// that has both intermediate tables and query input tables as input tables,
// we should be able to guess if this JoinOperator will be converted to a MapJoin
// based on hive.auto.convert.join.noconditionaltask.size.
- for (JoinOperator joinOp: pCtx.getJoinContext().keySet()) {
+ for (JoinOperator joinOp: pCtx.getJoinOps()) {
boolean isAbleToGuess = true;
boolean mayConvert = false;
// Get total size and individual alias's size
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java?rev=1653088&r1=1653087&r2=1653088&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java Mon Jan 19 19:42:21 2015
@@ -34,7 +34,6 @@ import org.apache.hadoop.hive.ql.lib.Dis
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.TaskGraphWalker.TaskGraphWalkerContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
/**
@@ -53,8 +52,7 @@ public abstract class AbstractJoinTaskDi
throws SemanticException;
protected void replaceTaskWithConditionalTask(
- Task<? extends Serializable> currTask, ConditionalTask cndTsk,
- PhysicalContext physicalContext) {
+ Task<? extends Serializable> currTask, ConditionalTask cndTsk) {
// add this task into task tree
// set all parent tasks
List<Task<? extends Serializable>> parentTasks = currTask.getParentTasks();
@@ -88,8 +86,7 @@ public abstract class AbstractJoinTaskDi
// Replace the task with the new task. Copy the children and parents of the old
// task to the new task.
protected void replaceTask(
- Task<? extends Serializable> currTask, Task<? extends Serializable> newTask,
- PhysicalContext physicalContext) {
+ Task<? extends Serializable> currTask, Task<? extends Serializable> newTask) {
// add this task into task tree
// set all parent tasks
List<Task<? extends Serializable>> parentTasks = currTask.getParentTasks();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java?rev=1653088&r1=1653087&r2=1653088&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java Mon Jan 19 19:42:21 2015
@@ -48,7 +48,6 @@ import org.apache.hadoop.hive.ql.lib.Dis
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor;
import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin;
import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx;
@@ -400,7 +399,6 @@ public class CommonJoinTaskDispatcher ex
// get parseCtx for this Join Operator
ParseContext parseCtx = physicalContext.getParseContext();
- QBJoinTree joinTree = parseCtx.getJoinContext().get(joinOp);
// start to generate multiple map join tasks
JoinDesc joinDesc = joinOp.getConf();
@@ -458,7 +456,9 @@ public class CommonJoinTaskDispatcher ex
}
currWork.setOpParseCtxMap(parseCtx.getOpParseCtx());
- currWork.setJoinTree(joinTree);
+ currWork.setLeftInputJoin(joinOp.getConf().isLeftInputJoin());
+ currWork.setBaseSrc(joinOp.getConf().getBaseSrc());
+ currWork.setMapAliases(joinOp.getConf().getMapAliases());
if (bigTablePosition >= 0) {
// create map join task and set big table as bigTablePosition
@@ -466,7 +466,7 @@ public class CommonJoinTaskDispatcher ex
newTask.setTaskTag(Task.MAPJOIN_ONLY_NOBACKUP);
newTask.setFetchSource(currTask.isFetchSource());
- replaceTask(currTask, newTask, physicalContext);
+ replaceTask(currTask, newTask);
// Can this task be merged with the child task. This can happen if a big table is being
// joined with multiple small tables on different keys
@@ -522,7 +522,9 @@ public class CommonJoinTaskDispatcher ex
listTasks.add(currTask);
// clear JoinTree and OP Parse Context
currWork.setOpParseCtxMap(null);
- currWork.setJoinTree(null);
+ currWork.setLeftInputJoin(false);
+ currWork.setBaseSrc(null);
+ currWork.setMapAliases(null);
// create conditional task and insert conditional task into task tree
ConditionalWork cndWork = new ConditionalWork(listWorks);
@@ -541,7 +543,7 @@ public class CommonJoinTaskDispatcher ex
cndTsk.setResolverCtx(resolverCtx);
// replace the current task with the new generated conditional task
- replaceTaskWithConditionalTask(currTask, cndTsk, physicalContext);
+ replaceTaskWithConditionalTask(currTask, cndTsk);
return cndTsk;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java?rev=1653088&r1=1653087&r2=1653088&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java Mon Jan 19 19:42:21 2015
@@ -46,7 +46,6 @@ import org.apache.hadoop.hive.ql.optimiz
import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor;
import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin;
import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx;
@@ -168,8 +167,7 @@ public class SortMergeJoinTaskDispatcher
// create map join task and set big table as bigTablePosition
private MapRedTask convertSMBTaskToMapJoinTask(MapredWork origWork,
int bigTablePosition,
- SMBMapJoinOperator smbJoinOp,
- QBJoinTree joinTree)
+ SMBMapJoinOperator smbJoinOp)
throws UnsupportedEncodingException, SemanticException {
// deep copy a new mapred work
MapredWork newWork = Utilities.clonePlan(origWork);
@@ -178,7 +176,7 @@ public class SortMergeJoinTaskDispatcher
.getParseContext().getConf());
// generate the map join operator; already checked the map join
MapJoinOperator newMapJoinOp =
- getMapJoinOperator(newTask, newWork, smbJoinOp, joinTree, bigTablePosition);
+ getMapJoinOperator(newTask, newWork, smbJoinOp, bigTablePosition);
// The reducer needs to be restored - Consider a query like:
// select count(*) FROM bucket_big a JOIN bucket_small b ON a.key = b.key;
@@ -246,7 +244,6 @@ public class SortMergeJoinTaskDispatcher
// get parseCtx for this Join Operator
ParseContext parseCtx = physicalContext.getParseContext();
- QBJoinTree joinTree = parseCtx.getSmbMapJoinContext().get(originalSMBJoinOp);
// Convert the work containing to sort-merge join into a work, as if it had a regular join.
// Note that the operator tree is not changed - is still contains the SMB join, but the
@@ -257,9 +254,13 @@ public class SortMergeJoinTaskDispatcher
SMBMapJoinOperator newSMBJoinOp = getSMBMapJoinOp(currJoinWork);
currWork.getMapWork().setOpParseCtxMap(parseCtx.getOpParseCtx());
- currWork.getMapWork().setJoinTree(joinTree);
+ currWork.getMapWork().setLeftInputJoin(originalSMBJoinOp.getConf().isLeftInputJoin());
+ currWork.getMapWork().setBaseSrc(originalSMBJoinOp.getConf().getBaseSrc());
+ currWork.getMapWork().setMapAliases(originalSMBJoinOp.getConf().getMapAliases());
currJoinWork.getMapWork().setOpParseCtxMap(parseCtx.getOpParseCtx());
- currJoinWork.getMapWork().setJoinTree(joinTree);
+ currJoinWork.getMapWork().setLeftInputJoin(originalSMBJoinOp.getConf().isLeftInputJoin());
+ currJoinWork.getMapWork().setBaseSrc(originalSMBJoinOp.getConf().getBaseSrc());
+ currJoinWork.getMapWork().setMapAliases(originalSMBJoinOp.getConf().getMapAliases());
// create conditional work list and task list
List<Serializable> listWorks = new ArrayList<Serializable>();
@@ -296,7 +297,7 @@ public class SortMergeJoinTaskDispatcher
// create map join task for the given big table position
MapRedTask newTask = convertSMBTaskToMapJoinTask(
- currJoinWork, bigTablePosition, newSMBJoinOp, joinTree);
+ currJoinWork, bigTablePosition, newSMBJoinOp);
MapWork mapWork = newTask.getWork().getMapWork();
Operator<?> parentOp = originalSMBJoinOp.getParentOperators().get(bigTablePosition);
@@ -334,7 +335,9 @@ public class SortMergeJoinTaskDispatcher
listTasks.add(currTask);
// clear JoinTree and OP Parse Context
currWork.getMapWork().setOpParseCtxMap(null);
- currWork.getMapWork().setJoinTree(null);
+ currWork.getMapWork().setLeftInputJoin(false);
+ currWork.getMapWork().setBaseSrc(null);
+ currWork.getMapWork().setMapAliases(null);
// create conditional task and insert conditional task into task tree
ConditionalWork cndWork = new ConditionalWork(listWorks);
@@ -353,7 +356,7 @@ public class SortMergeJoinTaskDispatcher
cndTsk.setResolverCtx(resolverCtx);
// replace the current task with the new generated conditional task
- replaceTaskWithConditionalTask(currTask, cndTsk, physicalContext);
+ replaceTaskWithConditionalTask(currTask, cndTsk);
return cndTsk;
}
@@ -426,7 +429,6 @@ public class SortMergeJoinTaskDispatcher
private MapJoinOperator getMapJoinOperator(MapRedTask task,
MapredWork work,
SMBMapJoinOperator oldSMBJoinOp,
- QBJoinTree joinTree,
int mapJoinPos) throws SemanticException {
SMBMapJoinOperator newSMBJoinOp = getSMBMapJoinOp(task.getWork());
@@ -437,7 +439,6 @@ public class SortMergeJoinTaskDispatcher
// generate the map join operator
return MapJoinProcessor.convertSMBJoinToMapJoin(physicalContext.getConf(),
- opParseContextMap, newSMBJoinOp,
- joinTree, mapJoinPos, true);
+ opParseContextMap, newSMBJoinOp, mapJoinPos, true);
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java?rev=1653088&r1=1653087&r2=1653088&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java Mon Jan 19 19:42:21 2015
@@ -40,7 +40,6 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.optimizer.BucketMapjoinProc;
import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor;
import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
@@ -130,7 +129,6 @@ public class SparkMapJoinOptimizer imple
private int convertJoinBucketMapJoin(JoinOperator joinOp, MapJoinOperator mapJoinOp,
OptimizeSparkProcContext context, int bigTablePosition) throws SemanticException {
ParseContext parseContext = context.getParseContext();
- QBJoinTree joinTree = parseContext.getJoinContext().get(joinOp);
List<String> joinAliases = new ArrayList<String>();
String baseBigAlias = null;
Map<Integer, Set<String>> posToAliasMap = joinOp.getPosToAliasMap();
@@ -146,7 +144,10 @@ public class SparkMapJoinOptimizer imple
}
mapJoinOp.setPosToAliasMap(posToAliasMap);
BucketMapjoinProc.checkAndConvertBucketMapJoin(
- parseContext, mapJoinOp, joinTree, baseBigAlias, joinAliases);
+ parseContext,
+ mapJoinOp,
+ baseBigAlias,
+ joinAliases);
MapJoinDesc joinDesc = mapJoinOp.getConf();
return joinDesc.isBucketMapJoin()
? joinDesc.getBigTableBucketNumMapping().size() : -1;
@@ -374,7 +375,8 @@ public class SparkMapJoinOptimizer imple
ParseContext parseContext = context.getParseContext();
MapJoinOperator mapJoinOp =
MapJoinProcessor.convertJoinOpMapJoinOp(context.getConf(), parseContext.getOpParseCtx(), joinOp,
- parseContext.getJoinContext().get(joinOp), bigTablePosition, true);
+ joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(), joinOp.getConf().getMapAliases(),
+ bigTablePosition, true);
Operator<? extends OperatorDesc> parentBigTableOp =
mapJoinOp.getParentOperators().get(bigTablePosition);
@@ -393,6 +395,9 @@ public class SparkMapJoinOptimizer imple
}
}
+ // Data structures
+ mapJoinOp.getConf().setQBJoinTreeProps(joinOp.getConf());
+
return mapJoinOp;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSMBJoinHintOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSMBJoinHintOptimizer.java?rev=1653088&r1=1653087&r2=1653088&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSMBJoinHintOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSMBJoinHintOptimizer.java Mon Jan 19 19:42:21 2015
@@ -70,7 +70,7 @@ public class SparkSMBJoinHintOptimizer e
if (convert) {
removeSmallTableReduceSink(mapJoinOp);
- convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext, pGraphContext);
+ convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext);
}
return null;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java?rev=1653088&r1=1653087&r2=1653088&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java Mon Jan 19 19:42:21 2015
@@ -38,7 +38,6 @@ import org.apache.hadoop.hive.ql.optimiz
import org.apache.hadoop.hive.ql.optimizer.physical.SkewJoinProcFactory;
import org.apache.hadoop.hive.ql.optimizer.physical.SparkMapJoinResolver;
import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.spark.GenSparkUtils;
import org.apache.hadoop.hive.ql.plan.BaseWork;
@@ -138,16 +137,22 @@ public class SparkSkewJoinProcFactory {
String streamDesc = taskTmpDir.toUri().toString();
if (GenMapRedUtils.needsTagging((ReduceWork) childWork)) {
Operator<? extends OperatorDesc> childReducer = ((ReduceWork) childWork).getReducer();
- QBJoinTree joinTree = null;
+ String id = null;
if (childReducer instanceof JoinOperator) {
- joinTree = parseContext.getJoinContext().get(childReducer);
+ if (parseContext.getJoinOps().contains(childReducer)) {
+ id = ((JoinOperator)childReducer).getConf().getId();
+ }
} else if (childReducer instanceof MapJoinOperator) {
- joinTree = parseContext.getMapJoinContext().get(childReducer);
+ if (parseContext.getMapJoinOps().contains(childReducer)) {
+ id = ((MapJoinOperator)childReducer).getConf().getId();
+ }
} else if (childReducer instanceof SMBMapJoinOperator) {
- joinTree = parseContext.getSmbMapJoinContext().get(childReducer);
+ if (parseContext.getSmbMapJoinOps().contains(childReducer)) {
+ id = ((SMBMapJoinOperator)childReducer).getConf().getId();
+ }
}
- if (joinTree != null && joinTree.getId() != null) {
- streamDesc = joinTree.getId() + ":$INTNAME";
+ if (id != null) {
+ streamDesc = id + ":$INTNAME";
} else {
streamDesc = "$INTNAME";
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinOptimizer.java?rev=1653088&r1=1653087&r2=1653088&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinOptimizer.java Mon Jan 19 19:42:21 2015
@@ -65,7 +65,7 @@ public class SparkSortMergeJoinOptimizer
joinOp, smbJoinContext, pGraphContext, stack);
if (convert) {
- return convertJoinToSMBJoinAndReturn(joinOp, smbJoinContext, pGraphContext);
+ return convertJoinToSMBJoinAndReturn(joinOp, smbJoinContext);
}
return null;
}
@@ -76,7 +76,7 @@ public class SparkSortMergeJoinOptimizer
if (!supportBucketMapJoin(stack)) {
return false;
}
- return canConvertJoinToSMBJoin(joinOperator, smbJoinContext, pGraphContext);
+ return canConvertJoinToSMBJoin(joinOperator, smbJoinContext);
}
//Preliminary checks. In the MR version of the code, these used to be done via another walk,
@@ -102,11 +102,10 @@ public class SparkSortMergeJoinOptimizer
protected SMBMapJoinOperator convertJoinToSMBJoinAndReturn(
JoinOperator joinOp,
- SortBucketJoinProcCtx smbJoinContext,
- ParseContext parseContext) throws SemanticException {
- MapJoinOperator mapJoinOp = convertJoinToBucketMapJoin(joinOp, smbJoinContext, parseContext);
+ SortBucketJoinProcCtx smbJoinContext) throws SemanticException {
+ MapJoinOperator mapJoinOp = convertJoinToBucketMapJoin(joinOp, smbJoinContext);
SMBMapJoinOperator smbMapJoinOp =
- convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext, parseContext);
+ convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext);
smbMapJoinOp.setConvertedAutomaticallySMBJoin(true);
return smbMapJoinOp;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java?rev=1653088&r1=1653087&r2=1653088&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java Mon Jan 19 19:42:21 2015
@@ -71,9 +71,9 @@ public class ParseContext {
private Map<TableScanOperator, Map<String, ExprNodeDesc>> opToPartToSkewedPruner;
private HashMap<String, Operator<? extends OperatorDesc>> topOps;
private LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtx;
- private Map<JoinOperator, QBJoinTree> joinContext;
- private Map<MapJoinOperator, QBJoinTree> mapJoinContext;
- private Map<SMBMapJoinOperator, QBJoinTree> smbMapJoinContext;
+ private Set<JoinOperator> joinOps;
+ private Set<MapJoinOperator> mapJoinOps;
+ private Set<SMBMapJoinOperator> smbMapJoinOps;
private List<ReduceSinkOperator> reduceSinkOperatorsAddedByEnforceBucketingSorting;
private HashMap<String, SplitSample> nameToSplitSample;
private List<LoadTableDesc> loadTableWork;
@@ -124,7 +124,7 @@ public class ParseContext {
* @param opParseCtx
* operator parse context - contains a mapping from operator to
* operator parse state (row resolver etc.)
- * @param joinContext
+ * @param joinOps
* context needed join processing (map join specifically)
* @param loadTableWork
* list of destination tables being loaded
@@ -153,8 +153,8 @@ public class ParseContext {
HashMap<TableScanOperator, PrunedPartitionList> opToPartList,
HashMap<String, Operator<? extends OperatorDesc>> topOps,
LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtx,
- Map<JoinOperator, QBJoinTree> joinContext,
- Map<SMBMapJoinOperator, QBJoinTree> smbMapJoinContext,
+ Set<JoinOperator> joinOps,
+ Set<SMBMapJoinOperator> smbMapJoinOps,
List<LoadTableDesc> loadTableWork, List<LoadFileDesc> loadFileWork,
Context ctx, HashMap<String, String> idToTableNameMap, int destTableId,
UnionProcContext uCtx, List<AbstractMapJoinOperator<? extends MapJoinDesc>> listMapJoinOpsNoReducer,
@@ -173,8 +173,8 @@ public class ParseContext {
this.ast = ast;
this.opToPartPruner = opToPartPruner;
this.opToPartList = opToPartList;
- this.joinContext = joinContext;
- this.smbMapJoinContext = smbMapJoinContext;
+ this.joinOps = joinOps;
+ this.smbMapJoinOps = smbMapJoinOps;
this.loadFileWork = loadFileWork;
this.loadTableWork = loadTableWork;
this.opParseCtx = opParseCtx;
@@ -403,18 +403,18 @@ public class ParseContext {
}
/**
- * @return the joinContext
+ * @return the joinOps
*/
- public Map<JoinOperator, QBJoinTree> getJoinContext() {
- return joinContext;
+ public Set<JoinOperator> getJoinOps() {
+ return joinOps;
}
/**
- * @param joinContext
- * the joinContext to set
+ * @param joinOps
+ * the joinOps to set
*/
- public void setJoinContext(Map<JoinOperator, QBJoinTree> joinContext) {
- this.joinContext = joinContext;
+ public void setJoinOps(Set<JoinOperator> joinOps) {
+ this.joinOps = joinOps;
}
/**
@@ -497,20 +497,20 @@ public class ParseContext {
return lInfo;
}
- public Map<MapJoinOperator, QBJoinTree> getMapJoinContext() {
- return mapJoinContext;
+ public Set<MapJoinOperator> getMapJoinOps() {
+ return mapJoinOps;
}
- public void setMapJoinContext(Map<MapJoinOperator, QBJoinTree> mapJoinContext) {
- this.mapJoinContext = mapJoinContext;
+ public void setMapJoinOps(Set<MapJoinOperator> mapJoinOps) {
+ this.mapJoinOps = mapJoinOps;
}
- public Map<SMBMapJoinOperator, QBJoinTree> getSmbMapJoinContext() {
- return smbMapJoinContext;
+ public Set<SMBMapJoinOperator> getSmbMapJoinOps() {
+ return smbMapJoinOps;
}
- public void setSmbMapJoinContext(Map<SMBMapJoinOperator, QBJoinTree> smbMapJoinContext) {
- this.smbMapJoinContext = smbMapJoinContext;
+ public void setSmbMapJoinOps(Set<SMBMapJoinOperator> smbMapJoinOps) {
+ this.smbMapJoinOps = smbMapJoinOps;
}
public GlobalLimitCtx getGlobalLimitCtx() {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1653088&r1=1653087&r2=1653088&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Mon Jan 19 19:42:21 2015
@@ -384,8 +384,6 @@ public class SemanticAnalyzer extends Ba
opParseCtx = pctx.getOpParseCtx();
loadTableWork = pctx.getLoadTableWork();
loadFileWork = pctx.getLoadFileWork();
- joinContext = pctx.getJoinContext();
- smbMapJoinContext = pctx.getSmbMapJoinContext();
ctx = pctx.getContext();
destTableId = pctx.getDestTableId();
idToTableNameMap = pctx.getIdToTableNameMap();
@@ -400,8 +398,10 @@ public class SemanticAnalyzer extends Ba
public ParseContext getParseContext() {
return new ParseContext(conf, qb, ast, opToPartPruner, opToPartList,
- topOps, opParseCtx, joinContext, smbMapJoinContext, loadTableWork,
- loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
+ topOps, opParseCtx,
+ new HashSet<JoinOperator>(joinContext.keySet()),
+ new HashSet<SMBMapJoinOperator>(smbMapJoinContext.keySet()),
+ loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions,
opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks,
opToPartToSkewedPruner, viewAliasToInput,
@@ -7521,6 +7521,7 @@ public class SemanticAnalyzer extends Ba
JoinOperator joinOp = (JoinOperator) genJoinOperatorChildren(joinTree,
joinSrcOp, srcOps, omitOpts, joinKeys);
+ joinOp.getConf().setQBJoinTreeProps(joinTree);
joinContext.put(joinOp, joinTree);
Operator op = joinOp;
@@ -10163,12 +10164,13 @@ public class SemanticAnalyzer extends Ba
// 4. Generate Parse Context for Optimizer & Physical compiler
ParseContext pCtx = new ParseContext(conf, qb, plannerCtx.child,
- opToPartPruner, opToPartList, topOps, opParseCtx, joinContext,
- smbMapJoinContext, loadTableWork, loadFileWork, ctx, idToTableNameMap,
- destTableId, uCtx, listMapJoinOpsNoReducer, groupOpToInputTables,
- prunedPartitions, opToSamplePruner, globalLimitCtx, nameToSplitSample,
- inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput,
- reduceSinkOperatorsAddedByEnforceBucketingSorting, queryProperties);
+ opToPartPruner, opToPartList, topOps, opParseCtx,
+ new HashSet<JoinOperator>(joinContext.keySet()),
+ new HashSet<SMBMapJoinOperator>(smbMapJoinContext.keySet()),
+ loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
+ listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, opToSamplePruner,
+ globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner,
+ viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting, queryProperties);
// 5. Take care of view creation
if (createVwDesc != null) {