You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2015/01/22 06:05:10 UTC
svn commit: r1653769 [6/14] - in /hive/branches/spark: ./
beeline/src/java/org/apache/hive/beeline/
cli/src/java/org/apache/hadoop/hive/cli/
common/src/java/org/apache/hadoop/hive/common/
common/src/java/org/apache/hadoop/hive/conf/ data/scripts/ dev-s...
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java Thu Jan 22 05:05:05 2015
@@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.Warehouse;
@@ -147,7 +148,7 @@ public class HiveMetaStoreChecker {
for (Path dbPath : dbPaths) {
FileSystem fs = dbPath.getFileSystem(conf);
- FileStatus[] statuses = fs.listStatus(dbPath);
+ FileStatus[] statuses = fs.listStatus(dbPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
for (FileStatus status : statuses) {
if (status.isDir() && !tableNames.contains(status.getPath().getName())) {
@@ -362,7 +363,7 @@ public class HiveMetaStoreChecker {
private void getAllLeafDirs(Path basePath, Set<Path> allDirs, FileSystem fs)
throws IOException {
- FileStatus[] statuses = fs.listStatus(basePath);
+ FileStatus[] statuses = fs.listStatus(basePath, FileUtils.HIDDEN_FILES_PATH_FILTER);
boolean directoryFound=false;
for (FileStatus status : statuses) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java Thu Jan 22 05:05:05 2015
@@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.ProtectMode;
import org.apache.hadoop.hive.metastore.Warehouse;
@@ -336,7 +337,7 @@ public class Partition implements Serial
* partition String pathPattern = this.partPath.toString() + "/*"; try {
* FileSystem fs = FileSystem.get(this.table.getDataLocation(),
* Hive.get().getConf()); FileStatus srcs[] = fs.globStatus(new
- * Path(pathPattern)); numBuckets = srcs.length; } catch (Exception e) {
+ * Path(pathPattern), FileUtils.HIDDEN_FILES_PATH_FILTER); numBuckets = srcs.length; } catch (Exception e) {
* throw new RuntimeException("Cannot get bucket count for table " +
* this.table.getName(), e); } } return numBuckets;
*/
@@ -372,7 +373,7 @@ public class Partition implements Serial
pathPattern = pathPattern + "/*";
}
LOG.info("Path pattern = " + pathPattern);
- FileStatus srcs[] = fs.globStatus(new Path(pathPattern));
+ FileStatus srcs[] = fs.globStatus(new Path(pathPattern), FileUtils.HIDDEN_FILES_PATH_FILTER);
Arrays.sort(srcs);
for (FileStatus src : srcs) {
LOG.info("Got file: " + src.getPath());
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Thu Jan 22 05:05:05 2015
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.ProtectMode;
@@ -636,7 +637,7 @@ public class Table implements Serializab
protected void replaceFiles(Path srcf, boolean isSrcLocal)
throws HiveException {
Path tableDest = getPath();
- Hive.replaceFiles(srcf, tableDest, tableDest, Hive.get().getConf(),
+ Hive.replaceFiles(tableDest, srcf, tableDest, tableDest, Hive.get().getConf(),
isSrcLocal);
}
@@ -951,7 +952,7 @@ public class Table implements Serializab
pathPattern = pathPattern + "/*";
}
LOG.info("Path pattern = " + pathPattern);
- FileStatus srcs[] = fs.globStatus(new Path(pathPattern));
+ FileStatus srcs[] = fs.globStatus(new Path(pathPattern), FileUtils.HIDDEN_FILES_PATH_FILTER);
Arrays.sort(srcs);
for (FileStatus src : srcs) {
LOG.info("Got file: " + src.getPath());
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java Thu Jan 22 05:05:05 2015
@@ -35,6 +35,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -47,7 +48,6 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.QB;
-import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.TableAccessAnalyzer;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -81,7 +81,7 @@ abstract public class AbstractBucketJoin
List<String> fileNames = new ArrayList<String>();
try {
FileSystem fs = location.getFileSystem(pGraphContext.getConf());
- FileStatus[] files = fs.listStatus(new Path(location.toString()));
+ FileStatus[] files = fs.listStatus(new Path(location.toString()), FileUtils.HIDDEN_FILES_PATH_FILTER);
if (files != null) {
for (FileStatus file : files) {
fileNames.add(file.getPath().toString());
@@ -132,23 +132,21 @@ abstract public class AbstractBucketJoin
protected boolean canConvertMapJoinToBucketMapJoin(
MapJoinOperator mapJoinOp,
- ParseContext pGraphContext,
BucketJoinProcCtx context) throws SemanticException {
- QBJoinTree joinCtx = pGraphContext.getMapJoinContext().get(mapJoinOp);
- if (joinCtx == null) {
+ if (!this.pGraphContext.getMapJoinOps().contains(mapJoinOp)) {
return false;
}
List<String> joinAliases = new ArrayList<String>();
- String[] srcs = joinCtx.getBaseSrc();
- String[] left = joinCtx.getLeftAliases();
- List<String> mapAlias = joinCtx.getMapAliases();
+ String[] srcs = mapJoinOp.getConf().getBaseSrc();
+ String[] left = mapJoinOp.getConf().getLeftAliases();
+ List<String> mapAlias = mapJoinOp.getConf().getMapAliases();
String baseBigAlias = null;
for (String s : left) {
if (s != null) {
- String subQueryAlias = QB.getAppendedAliasFromId(joinCtx.getId(), s);
+ String subQueryAlias = QB.getAppendedAliasFromId(mapJoinOp.getConf().getId(), s);
if (!joinAliases.contains(subQueryAlias)) {
joinAliases.add(subQueryAlias);
if (!mapAlias.contains(s)) {
@@ -160,7 +158,7 @@ abstract public class AbstractBucketJoin
for (String s : srcs) {
if (s != null) {
- String subQueryAlias = QB.getAppendedAliasFromId(joinCtx.getId(), s);
+ String subQueryAlias = QB.getAppendedAliasFromId(mapJoinOp.getConf().getId(), s);
if (!joinAliases.contains(subQueryAlias)) {
joinAliases.add(subQueryAlias);
if (!mapAlias.contains(s)) {
@@ -173,9 +171,8 @@ abstract public class AbstractBucketJoin
Map<Byte, List<ExprNodeDesc>> keysMap = mapJoinOp.getConf().getKeys();
return checkConvertBucketMapJoin(
- pGraphContext,
context,
- joinCtx,
+ mapJoinOp.getConf().getAliasToOpInfo(),
keysMap,
baseBigAlias,
joinAliases);
@@ -190,9 +187,8 @@ abstract public class AbstractBucketJoin
* d. The number of buckets in the big table can be divided by no of buckets in small tables.
*/
protected boolean checkConvertBucketMapJoin(
- ParseContext pGraphContext,
BucketJoinProcCtx context,
- QBJoinTree joinCtx,
+ Map<String, Operator<? extends OperatorDesc>> aliasToOpInfo,
Map<Byte, List<ExprNodeDesc>> keysMap,
String baseBigAlias,
List<String> joinAliases) throws SemanticException {
@@ -203,7 +199,6 @@ abstract public class AbstractBucketJoin
new LinkedHashMap<String, List<List<String>>>();
HashMap<String, Operator<? extends OperatorDesc>> topOps = pGraphContext.getTopOps();
- Map<TableScanOperator, Table> topToTable = pGraphContext.getTopToTable();
HashMap<String, String> aliasToNewAliasMap = new HashMap<String, String>();
@@ -218,7 +213,7 @@ abstract public class AbstractBucketJoin
boolean bigTablePartitioned = true;
for (int index = 0; index < joinAliases.size(); index++) {
String alias = joinAliases.get(index);
- Operator<? extends OperatorDesc> topOp = joinCtx.getAliasToOpInfo().get(alias);
+ Operator<? extends OperatorDesc> topOp = aliasToOpInfo.get(alias);
// The alias may not be present in case of a sub-query
if (topOp == null) {
return false;
@@ -270,7 +265,7 @@ abstract public class AbstractBucketJoin
joinKeyOrder = new Integer[keys.size()];
}
- Table tbl = topToTable.get(tso);
+ Table tbl = tso.getConf().getTableMetadata();
if (tbl.isPartitioned()) {
PrunedPartitionList prunedParts = pGraphContext.getPrunedPartitions(alias, tso);
List<Partition> partitions = prunedParts.getNotDeniedPartns();
@@ -459,7 +454,7 @@ abstract public class AbstractBucketJoin
}
// convert partition to partition spec string
- private static Map<String, List<String>> convert(Map<Partition, List<String>> mapping) {
+ private Map<String, List<String>> convert(Map<Partition, List<String>> mapping) {
Map<String, List<String>> converted = new HashMap<String, List<String>>();
for (Map.Entry<Partition, List<String>> entry : mapping.entrySet()) {
converted.put(entry.getKey().getName(), entry.getValue());
@@ -488,7 +483,7 @@ abstract public class AbstractBucketJoin
}
// called for each partition of big table and populates mapping for each file in the partition
- private static void fillMappingBigTableBucketFileNameToSmallTableBucketFileNames(
+ private void fillMappingBigTableBucketFileNameToSmallTableBucketFileNames(
List<Integer> smallTblBucketNums,
List<List<String>> smallTblFilesList,
Map<String, List<String>> bigTableBucketFileNameToSmallTableBucketFileNames,
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java Thu Jan 22 05:05:05 2015
@@ -46,7 +46,6 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.QB;
-import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.TableAccessAnalyzer;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -97,17 +96,17 @@ abstract public class AbstractSMBJoinPro
return false;
}
- boolean tableEligibleForBucketedSortMergeJoin = true;
- QBJoinTree joinCxt = this.pGraphContext.getMapJoinContext()
- .get(mapJoinOp);
- if (joinCxt == null) {
+ if (!this.pGraphContext.getMapJoinOps().contains(mapJoinOp)) {
return false;
}
- String[] srcs = joinCxt.getBaseSrc();
+
+ String[] srcs = mapJoinOp.getConf().getBaseSrc();
for (int srcPos = 0; srcPos < srcs.length; srcPos++) {
- srcs[srcPos] = QB.getAppendedAliasFromId(joinCxt.getId(), srcs[srcPos]);
+ srcs[srcPos] = QB.getAppendedAliasFromId(mapJoinOp.getConf().getId(), srcs[srcPos]);
}
+ boolean tableEligibleForBucketedSortMergeJoin = true;
+
// All the tables/partitions columns should be sorted in the same order
// For example, if tables A and B are being joined on columns c1, c2 and c3
// which are the sorted and bucketed columns. The join would work, as long
@@ -117,9 +116,8 @@ abstract public class AbstractSMBJoinPro
for (int pos = 0; pos < srcs.length; pos++) {
tableEligibleForBucketedSortMergeJoin = tableEligibleForBucketedSortMergeJoin
&& isEligibleForBucketSortMergeJoin(smbJoinContext,
- pGraphContext,
mapJoinOp.getConf().getKeys().get((byte) pos),
- joinCxt,
+ mapJoinOp.getConf().getAliasToOpInfo(),
srcs,
pos,
sortColumnsFirstTable);
@@ -141,8 +139,7 @@ abstract public class AbstractSMBJoinPro
// Convert the bucket map-join operator to a sort-merge map join operator
protected SMBMapJoinOperator convertBucketMapJoinToSMBJoin(MapJoinOperator mapJoinOp,
- SortBucketJoinProcCtx smbJoinContext,
- ParseContext parseContext) {
+ SortBucketJoinProcCtx smbJoinContext) {
String[] srcs = smbJoinContext.getSrcs();
SMBMapJoinOperator smbJop = new SMBMapJoinOperator(mapJoinOp);
@@ -219,10 +216,13 @@ abstract public class AbstractSMBJoinPro
child.getParentOperators().remove(index);
child.getParentOperators().add(index, smbJop);
}
- parseContext.getSmbMapJoinContext().put(smbJop,
- parseContext.getMapJoinContext().get(mapJoinOp));
- parseContext.getMapJoinContext().remove(mapJoinOp);
- parseContext.getOpParseCtx().put(smbJop, parseContext.getOpParseCtx().get(mapJoinOp));
+
+ // Data structures coming from QBJoinTree
+ smbJop.getConf().setQBJoinTreeProps(mapJoinOp.getConf());
+ //
+ pGraphContext.getSmbMapJoinOps().add(smbJop);
+ pGraphContext.getMapJoinOps().remove(mapJoinOp);
+ pGraphContext.getOpParseCtx().put(smbJop, pGraphContext.getOpParseCtx().get(mapJoinOp));
return smbJop;
}
@@ -242,15 +242,12 @@ abstract public class AbstractSMBJoinPro
*/
private boolean isEligibleForBucketSortMergeJoin(
SortBucketJoinProcCtx smbJoinContext,
- ParseContext pctx,
List<ExprNodeDesc> keys,
- QBJoinTree joinTree,
+ Map<String, Operator<? extends OperatorDesc>> aliasToOpInfo,
String[] aliases,
int pos,
List<Order> sortColumnsFirstTable) throws SemanticException {
String alias = aliases[pos];
- Map<TableScanOperator, Table> topToTable = this.pGraphContext
- .getTopToTable();
/*
* Consider a query like:
@@ -266,7 +263,7 @@ abstract public class AbstractSMBJoinPro
* table. If the object being map-joined is a base table, then aliasToOpInfo
* contains the TableScanOperator, and TableAccessAnalyzer is a no-op.
*/
- Operator<? extends OperatorDesc> topOp = joinTree.getAliasToOpInfo().get(alias);
+ Operator<? extends OperatorDesc> topOp = aliasToOpInfo.get(alias);
if (topOp == null) {
return false;
}
@@ -313,7 +310,7 @@ abstract public class AbstractSMBJoinPro
return false;
}
- Table tbl = topToTable.get(tso);
+ Table tbl = tso.getConf().getTableMetadata();
if (tbl.isPartitioned()) {
PrunedPartitionList prunedParts = pGraphContext.getPrunedPartitions(alias, tso);
List<Partition> partitions = prunedParts.getNotDeniedPartns();
@@ -386,15 +383,13 @@ abstract public class AbstractSMBJoinPro
// It is already verified that the join can be converted to a bucket map join
protected boolean checkConvertJoinToSMBJoin(
JoinOperator joinOperator,
- SortBucketJoinProcCtx smbJoinContext,
- ParseContext pGraphContext) throws SemanticException {
-
- QBJoinTree joinCtx = pGraphContext.getJoinContext().get(joinOperator);
+ SortBucketJoinProcCtx smbJoinContext) throws SemanticException {
- if (joinCtx == null) {
+ if (!this.pGraphContext.getJoinOps().contains(joinOperator)) {
return false;
}
- String[] srcs = joinCtx.getBaseSrc();
+
+ String[] srcs = joinOperator.getConf().getBaseSrc();
// All the tables/partitions columns should be sorted in the same order
// For example, if tables A and B are being joined on columns c1, c2 and c3
@@ -404,9 +399,8 @@ abstract public class AbstractSMBJoinPro
for (int pos = 0; pos < srcs.length; pos++) {
if (!isEligibleForBucketSortMergeJoin(smbJoinContext,
- pGraphContext,
smbJoinContext.getKeyExprMap().get((byte) pos),
- joinCtx,
+ joinOperator.getConf().getAliasToOpInfo(),
srcs,
pos,
sortColumnsFirstTable)) {
@@ -421,12 +415,10 @@ abstract public class AbstractSMBJoinPro
// Can the join operator be converted to a sort-merge join operator ?
protected boolean canConvertJoinToSMBJoin(
JoinOperator joinOperator,
- SortBucketJoinProcCtx smbJoinContext,
- ParseContext pGraphContext) throws SemanticException {
+ SortBucketJoinProcCtx smbJoinContext) throws SemanticException {
boolean canConvert =
canConvertJoinToBucketMapJoin(
joinOperator,
- pGraphContext,
smbJoinContext
);
@@ -434,13 +426,12 @@ abstract public class AbstractSMBJoinPro
return false;
}
- return checkConvertJoinToSMBJoin(joinOperator, smbJoinContext, pGraphContext);
+ return checkConvertJoinToSMBJoin(joinOperator, smbJoinContext);
}
// Can the join operator be converted to a bucket map-merge join operator ?
protected boolean canConvertJoinToBucketMapJoin(
JoinOperator joinOp,
- ParseContext pGraphContext,
SortBucketJoinProcCtx context) throws SemanticException {
// This has already been inspected and rejected
@@ -448,8 +439,7 @@ abstract public class AbstractSMBJoinPro
return false;
}
- QBJoinTree joinCtx = pGraphContext.getJoinContext().get(joinOp);
- if (joinCtx == null) {
+ if (!this.pGraphContext.getJoinOps().contains(joinOp)) {
return false;
}
@@ -482,8 +472,9 @@ abstract public class AbstractSMBJoinPro
context.setBigTablePosition(bigTablePosition);
String joinAlias =
bigTablePosition == 0 ?
- joinCtx.getLeftAlias() : joinCtx.getRightAliases()[bigTablePosition - 1];
- joinAlias = QB.getAppendedAliasFromId(joinCtx.getId(), joinAlias);
+ joinOp.getConf().getLeftAlias() :
+ joinOp.getConf().getRightAliases()[bigTablePosition - 1];
+ joinAlias = QB.getAppendedAliasFromId(joinOp.getConf().getId(), joinAlias);
Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>();
List<Operator<? extends OperatorDesc>> parentOps = joinOp.getParentOperators();
@@ -497,10 +488,10 @@ abstract public class AbstractSMBJoinPro
context.setKeyExprMap(keyExprMap);
// Make a deep copy of the aliases so that they are not changed in the context
- String[] joinSrcs = joinCtx.getBaseSrc();
+ String[] joinSrcs = joinOp.getConf().getBaseSrc();
String[] srcs = new String[joinSrcs.length];
for (int srcPos = 0; srcPos < joinSrcs.length; srcPos++) {
- joinSrcs[srcPos] = QB.getAppendedAliasFromId(joinCtx.getId(), joinSrcs[srcPos]);
+ joinSrcs[srcPos] = QB.getAppendedAliasFromId(joinOp.getConf().getId(), joinSrcs[srcPos]);
srcs[srcPos] = new String(joinSrcs[srcPos]);
}
@@ -508,9 +499,8 @@ abstract public class AbstractSMBJoinPro
// The candidate map-join was derived from the pluggable sort merge join big
// table matcher.
return checkConvertBucketMapJoin(
- pGraphContext,
context,
- joinCtx,
+ joinOp.getConf().getAliasToOpInfo(),
keyExprMap,
joinAlias,
Arrays.asList(srcs));
@@ -519,19 +509,23 @@ abstract public class AbstractSMBJoinPro
// Convert the join operator to a bucket map-join join operator
protected MapJoinOperator convertJoinToBucketMapJoin(
JoinOperator joinOp,
- SortBucketJoinProcCtx joinContext,
- ParseContext parseContext) throws SemanticException {
+ SortBucketJoinProcCtx joinContext) throws SemanticException {
MapJoinOperator mapJoinOp = new MapJoinProcessor().convertMapJoin(
- parseContext.getConf(),
- parseContext.getOpParseCtx(),
+ pGraphContext.getConf(),
+ pGraphContext.getOpParseCtx(),
joinOp,
- pGraphContext.getJoinContext().get(joinOp),
+ joinOp.getConf().isLeftInputJoin(),
+ joinOp.getConf().getBaseSrc(),
+ joinOp.getConf().getMapAliases(),
joinContext.getBigTablePosition(),
false,
false);
// Remove the join operator from the query join context
- parseContext.getMapJoinContext().put(mapJoinOp, parseContext.getJoinContext().get(joinOp));
- parseContext.getJoinContext().remove(joinOp);
+ // Data structures coming from QBJoinTree
+ mapJoinOp.getConf().setQBJoinTreeProps(joinOp.getConf());
+ //
+ pGraphContext.getMapJoinOps().add(mapJoinOp);
+ pGraphContext.getJoinOps().remove(joinOp);
convertMapJoinToBucketMapJoin(mapJoinOp, joinContext);
return mapJoinOp;
}
@@ -539,11 +533,10 @@ abstract public class AbstractSMBJoinPro
// Convert the join operator to a sort-merge join operator
protected void convertJoinToSMBJoin(
JoinOperator joinOp,
- SortBucketJoinProcCtx smbJoinContext,
- ParseContext parseContext) throws SemanticException {
- MapJoinOperator mapJoinOp = convertJoinToBucketMapJoin(joinOp, smbJoinContext, parseContext);
+ SortBucketJoinProcCtx smbJoinContext) throws SemanticException {
+ MapJoinOperator mapJoinOp = convertJoinToBucketMapJoin(joinOp, smbJoinContext);
SMBMapJoinOperator smbMapJoinOp =
- convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext, parseContext);
+ convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext);
smbMapJoinOp.setConvertedAutomaticallySMBJoin(true);
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java Thu Jan 22 05:05:05 2015
@@ -71,7 +71,7 @@ public class AvgPartitionSizeBasedBigTab
int numPartitions = 1; // in case the sizes match, preference is
// given to the table with fewer partitions
- Table table = parseCtx.getTopToTable().get(topOp);
+ Table table = topOp.getConf().getTableMetadata();
long averageSize = 0;
if (!table.isPartitioned()) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java Thu Jan 22 05:05:05 2015
@@ -45,7 +45,7 @@ public class BucketMapjoinProc extends A
// can the mapjoin present be converted to a bucketed mapjoin
boolean convert = canConvertMapJoinToBucketMapJoin(
- mapJoinOperator, pGraphContext, context);
+ mapJoinOperator, context);
HiveConf conf = context.getConf();
// Throw an error if the user asked for bucketed mapjoin to be enforced and
@@ -67,13 +67,13 @@ public class BucketMapjoinProc extends A
* and do the version if possible.
*/
public static void checkAndConvertBucketMapJoin(ParseContext pGraphContext,
- MapJoinOperator mapJoinOp, QBJoinTree joinCtx, String baseBigAlias,
+ MapJoinOperator mapJoinOp, String baseBigAlias,
List<String> joinAliases) throws SemanticException {
BucketJoinProcCtx ctx = new BucketJoinProcCtx(pGraphContext.getConf());
BucketMapjoinProc proc = new BucketMapjoinProc(pGraphContext);
Map<Byte, List<ExprNodeDesc>> keysMap = mapJoinOp.getConf().getKeys();
- if (proc.checkConvertBucketMapJoin(pGraphContext, ctx,
- joinCtx, keysMap, baseBigAlias, joinAliases)) {
+ if (proc.checkConvertBucketMapJoin(ctx, mapJoinOp.getConf().getAliasToOpInfo(),
+ keysMap, baseBigAlias, joinAliases)) {
proc.convertMapJoinToBucketMapJoin(mapJoinOp, ctx);
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java Thu Jan 22 05:05:05 2015
@@ -390,7 +390,7 @@ public class BucketingSortingReduceSinkO
}
}
- Table destTable = pGraphContext.getFsopToTable().get(fsOp);
+ Table destTable = fsOp.getConf().getTable();
if (destTable == null) {
return null;
}
@@ -465,7 +465,7 @@ public class BucketingSortingReduceSinkO
if (op instanceof TableScanOperator) {
assert !useBucketSortPositions;
TableScanOperator ts = (TableScanOperator) op;
- Table srcTable = pGraphContext.getTopToTable().get(ts);
+ Table srcTable = ts.getConf().getTableMetadata();
// Find the positions of the bucketed columns in the table corresponding
// to the select list.
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Thu Jan 22 05:05:05 2015
@@ -141,6 +141,17 @@ public final class ColumnPrunerProcFacto
colLists = Utilities.mergeUniqElems(colLists, param.getCols());
}
}
+ int groupingSetPosition = conf.getGroupingSetPosition();
+ if (groupingSetPosition >= 0) {
+ List<String> cols = cppCtx.genColLists(op);
+ String groupingColumn = conf.getOutputColumnNames().get(groupingSetPosition);
+ if (!cols.contains(groupingColumn)) {
+ conf.getOutputColumnNames().remove(groupingSetPosition);
+ if (op.getSchema() != null) {
+ op.getSchema().getSignature().remove(groupingSetPosition);
+ }
+ }
+ }
cppCtx.getPrunedColLists().put(op, colLists);
return null;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java Thu Jan 22 05:05:05 2015
@@ -863,7 +863,7 @@ public final class ConstantPropagateProc
if (op.getChildOperators().size() == 1
&& op.getChildOperators().get(0) instanceof JoinOperator) {
JoinOperator joinOp = (JoinOperator) op.getChildOperators().get(0);
- if (skipFolding(joinOp.getConf(), rsDesc.getTag())) {
+ if (skipFolding(joinOp.getConf())) {
LOG.debug("Skip folding in outer join " + op);
cppCtx.getOpToConstantExprs().put(op, new HashMap<ColumnInfo, ExprNodeDesc>());
return null;
@@ -889,22 +889,16 @@ public final class ConstantPropagateProc
rsDesc.setKeyCols(newKeyEpxrs);
// partition columns
- if (!rsDesc.getPartitionCols().isEmpty()) {
- ArrayList<ExprNodeDesc> newPartExprs = new ArrayList<ExprNodeDesc>();
- for (ExprNodeDesc desc : rsDesc.getPartitionCols()) {
- ExprNodeDesc expr = foldExpr(desc, constants, cppCtx, op, 0, false);
- if (expr instanceof ExprNodeConstantDesc || expr instanceof ExprNodeNullDesc) {
- continue;
- }
- newPartExprs.add(expr);
+ ArrayList<ExprNodeDesc> newPartExprs = new ArrayList<ExprNodeDesc>();
+ for (ExprNodeDesc desc : rsDesc.getPartitionCols()) {
+ ExprNodeDesc expr = foldExpr(desc, constants, cppCtx, op, 0, false);
+ if (expr != desc && desc instanceof ExprNodeColumnDesc
+ && expr instanceof ExprNodeConstantDesc) {
+ ((ExprNodeConstantDesc) expr).setFoldedFromCol(((ExprNodeColumnDesc) desc).getColumn());
}
- if (newPartExprs.isEmpty()) {
- // If all partition columns are removed because of constant, insert an extra column to avoid
- // random partitioning.
- newPartExprs.add(new ExprNodeConstantDesc(""));
- }
- rsDesc.setPartitionCols(newPartExprs);
+ newPartExprs.add(expr);
}
+ rsDesc.setPartitionCols(newPartExprs);
// value columns
ArrayList<ExprNodeDesc> newValExprs = new ArrayList<ExprNodeDesc>();
@@ -916,28 +910,19 @@ public final class ConstantPropagateProc
return null;
}
- private boolean skipFolding(JoinDesc joinDesc, int tag) {
- JoinCondDesc[] conds = joinDesc.getConds();
- int i;
- for (i = conds.length - 1; i >= 0; i--) {
- if (conds[i].getType() == JoinDesc.INNER_JOIN) {
- if (tag == i + 1)
- return false;
- } else if (conds[i].getType() == JoinDesc.FULL_OUTER_JOIN) {
- return true;
- } else if (conds[i].getType() == JoinDesc.RIGHT_OUTER_JOIN) {
- if (tag == i + 1)
- return false;
- return true;
- } else if (conds[i].getType() == JoinDesc.LEFT_OUTER_JOIN) {
- if (tag == i + 1)
- return true;
+ /**
+ * Skip folding constants if there is outer join in join tree.
+ * @param joinDesc
+ * @return true if to skip.
+ */
+ private boolean skipFolding(JoinDesc joinDesc) {
+ for (JoinCondDesc cond : joinDesc.getConds()) {
+ if (cond.getType() == JoinDesc.INNER_JOIN || cond.getType() == JoinDesc.UNIQUE_JOIN) {
+ continue;
}
+ return true;
}
- if (tag == 0) {
- return false;
- }
- return true;
+ return false;
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Thu Jan 22 05:05:05 2015
@@ -47,8 +47,8 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
+import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
@@ -231,13 +231,16 @@ public class ConvertJoinMapJoin implemen
ParseContext parseContext = context.parseContext;
MapJoinDesc mapJoinDesc = null;
if (adjustParentsChildren) {
- mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf, parseContext.getOpParseCtx(),
- joinOp, parseContext.getJoinContext().get(joinOp), mapJoinConversionPos, true);
+ mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf, parseContext.getOpParseCtx(),
+ joinOp, joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(), joinOp.getConf().getMapAliases(),
+ mapJoinConversionPos, true);
} else {
JoinDesc joinDesc = joinOp.getConf();
// retain the original join desc in the map join.
mapJoinDesc =
- new MapJoinDesc(MapJoinProcessor.getKeys(parseContext.getJoinContext().get(joinOp), joinOp).getSecond(),
+ new MapJoinDesc(
+ MapJoinProcessor.getKeys(joinOp.getConf().isLeftInputJoin(),
+ joinOp.getConf().getBaseSrc(), joinOp).getSecond(),
null, joinDesc.getExprs(), null, null,
joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(),
joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null);
@@ -504,7 +507,38 @@ public class ConvertJoinMapJoin implemen
}
public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context,
- int buckets) {
+ int buckets) throws SemanticException {
+ /*
+ * HIVE-9038: Join tests fail in tez when we have more than 1 join on the same key and there is
+ * an outer join down the join tree that requires filterTag. We disable this conversion to map
+ * join here now. We need to emulate the behavior of HashTableSinkOperator as in MR or create a
+ * new operation to be able to support this. This seems like a corner case enough to special
+ * case this for now.
+ */
+ if (joinOp.getConf().getConds().length > 1) {
+ boolean hasOuter = false;
+ for (JoinCondDesc joinCondDesc : joinOp.getConf().getConds()) {
+ switch (joinCondDesc.getType()) {
+ case JoinDesc.INNER_JOIN:
+ case JoinDesc.LEFT_SEMI_JOIN:
+ case JoinDesc.UNIQUE_JOIN:
+ hasOuter = false;
+ break;
+
+ case JoinDesc.FULL_OUTER_JOIN:
+ case JoinDesc.LEFT_OUTER_JOIN:
+ case JoinDesc.RIGHT_OUTER_JOIN:
+ hasOuter = true;
+ break;
+
+ default:
+ throw new SemanticException("Unknown join type " + joinCondDesc.getType());
+ }
+ }
+ if (hasOuter) {
+ return -1;
+ }
+ }
Set<Integer> bigTableCandidateSet =
MapJoinProcessor.getBigTableCandidates(joinOp.getConf().getConds());
@@ -606,7 +640,8 @@ public class ConvertJoinMapJoin implemen
ParseContext parseContext = context.parseContext;
MapJoinOperator mapJoinOp =
MapJoinProcessor.convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(), joinOp,
- parseContext.getJoinContext().get(joinOp), bigTablePosition, true);
+ joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(), joinOp.getConf().getMapAliases(),
+ bigTablePosition, true);
Operator<? extends OperatorDesc> parentBigTableOp =
mapJoinOp.getParentOperators().get(bigTablePosition);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java Thu Jan 22 05:05:05 2015
@@ -186,7 +186,7 @@ public class DynamicPartitionPruningOpti
String column = extractColName(ctx.parent);
if (ts != null && column != null) {
- Table table = parseContext.getTopToTable().get(ts);
+ Table table = ts.getConf().getTableMetadata();
if (table != null && table.isPartitionKey(column)) {
String alias = ts.getConf().getAlias();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java Thu Jan 22 05:05:05 2015
@@ -70,7 +70,7 @@ public class GenMRTableScan1 implements
TableScanOperator op = (TableScanOperator) nd;
GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
ParseContext parseCtx = ctx.getParseCtx();
- Class<? extends InputFormat> inputFormat = parseCtx.getTopToTable().get(op)
+ Class<? extends InputFormat> inputFormat = op.getConf().getTableMetadata()
.getInputFormatClass();
Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Thu Jan 22 05:05:05 2015
@@ -18,7 +18,20 @@
package org.apache.hadoop.hive.ql.optimizer;
-import com.google.common.collect.Interner;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@@ -67,7 +80,6 @@ import org.apache.hadoop.hive.ql.parse.B
import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
-import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.parse.QBParseInfo;
import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -102,19 +114,7 @@ import org.apache.hadoop.hive.ql.stats.S
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.mapred.InputFormat;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
+import com.google.common.collect.Interner;
/**
* General utility common functions for the Processor to convert operator into
@@ -521,12 +521,11 @@ public final class GenMapRedUtils {
// The table does not have any partitions
if (aliasPartnDesc == null) {
- aliasPartnDesc = new PartitionDesc(Utilities.getTableDesc(parseCtx
- .getTopToTable().get(topOp)), null);
-
+ aliasPartnDesc = new PartitionDesc(Utilities.getTableDesc(((TableScanOperator) topOp)
+ .getConf().getTableMetadata()), null);
}
- Map<String, String> props = parseCtx.getTopToProps().get(topOp);
+ Map<String, String> props = topOp.getConf().getOpProps();
if (props != null) {
Properties target = aliasPartnDesc.getProperties();
if (target == null) {
@@ -955,7 +954,7 @@ public final class GenMapRedUtils {
public static TableScanOperator createTemporaryTableScanOperator(RowSchema rowSchema) {
TableScanOperator tableScanOp =
- (TableScanOperator) OperatorFactory.get(new TableScanDesc(), rowSchema);
+ (TableScanOperator) OperatorFactory.get(new TableScanDesc(null), rowSchema);
// Set needed columns for this dummy TableScanOperator
List<Integer> neededColumnIds = new ArrayList<Integer>();
List<String> neededColumnNames = new ArrayList<String>();
@@ -1067,17 +1066,23 @@ public final class GenMapRedUtils {
if (needsTagging(cplan.getReduceWork())) {
Operator<? extends OperatorDesc> reducerOp = cplan.getReduceWork().getReducer();
- QBJoinTree joinTree = null;
+ String id = null;
if (reducerOp instanceof JoinOperator) {
- joinTree = parseCtx.getJoinContext().get(reducerOp);
+ if (parseCtx.getJoinOps().contains(reducerOp)) {
+ id = ((JoinOperator)reducerOp).getConf().getId();
+ }
} else if (reducerOp instanceof MapJoinOperator) {
- joinTree = parseCtx.getMapJoinContext().get(reducerOp);
+ if (parseCtx.getMapJoinOps().contains(reducerOp)) {
+ id = ((MapJoinOperator)reducerOp).getConf().getId();
+ }
} else if (reducerOp instanceof SMBMapJoinOperator) {
- joinTree = parseCtx.getSmbMapJoinContext().get(reducerOp);
+ if (parseCtx.getSmbMapJoinOps().contains(reducerOp)) {
+ id = ((SMBMapJoinOperator)reducerOp).getConf().getId();
+ }
}
- if (joinTree != null && joinTree.getId() != null) {
- streamDesc = joinTree.getId() + ":$INTNAME";
+ if (id != null) {
+ streamDesc = id + ":$INTNAME";
} else {
streamDesc = "$INTNAME";
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java Thu Jan 22 05:05:05 2015
@@ -62,10 +62,8 @@ public class GlobalLimitOptimizer implem
GlobalLimitCtx globalLimitCtx = pctx.getGlobalLimitCtx();
Map<TableScanOperator, ExprNodeDesc> opToPartPruner = pctx.getOpToPartPruner();
Map<String, SplitSample> nameToSplitSample = pctx.getNameToSplitSample();
- Map<TableScanOperator, Table> topToTable = pctx.getTopToTable();
QB qb = pctx.getQB();
- HiveConf conf = pctx.getConf();
QBParseInfo qbParseInfo = qb.getParseInfo();
// determine the query qualifies reduce input size for LIMIT
@@ -93,7 +91,7 @@ public class GlobalLimitOptimizer implem
// query qualify for the optimization
if (tempGlobalLimit != null && tempGlobalLimit != 0) {
TableScanOperator ts = (TableScanOperator) topOps.values().toArray()[0];
- Table tab = topToTable.get(ts);
+ Table tab = ts.getConf().getTableMetadata();
if (!tab.isPartitioned()) {
if (qbParseInfo.getDestToWhereExpr().isEmpty()) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java Thu Jan 22 05:05:05 2015
@@ -304,7 +304,7 @@ public class GroupByOptimizer implements
// Create a mapping from the group by columns to the table columns
Map<String, String> tableColsMapping = new HashMap<String, String>();
Set<String> constantCols = new HashSet<String>();
- Table table = pGraphContext.getTopToTable().get(currOp);
+ Table table = tableScanOp.getConf().getTableMetadata();
for (FieldSchema col : table.getAllCols()) {
tableColsMapping.put(col.getName(), col.getName());
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java Thu Jan 22 05:05:05 2015
@@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Index;
import org.apache.hadoop.hive.ql.Driver;
@@ -123,22 +124,23 @@ public final class IndexUtils {
Partition part) throws HiveException {
LOG.info("checking index staleness...");
try {
- FileSystem partFs = part.getDataLocation().getFileSystem(hive.getConf());
- FileStatus partFss = partFs.getFileStatus(part.getDataLocation());
- String ts = index.getParameters().get(part.getSpec().toString());
- if (ts == null) {
+ String indexTs = index.getParameters().get(part.getSpec().toString());
+ if (indexTs == null) {
return false;
}
- long indexTs = Long.parseLong(ts);
- LOG.info(partFss.getModificationTime());
- LOG.info(ts);
- if (partFss.getModificationTime() > indexTs) {
- LOG.info("index is stale on the partitions that matched " + part.getSpec());
- return false;
+
+ FileSystem partFs = part.getDataLocation().getFileSystem(hive.getConf());
+ FileStatus[] parts = partFs.listStatus(part.getDataLocation(), FileUtils.HIDDEN_FILES_PATH_FILTER);
+ for (FileStatus status : parts) {
+ if (status.getModificationTime() > Long.parseLong(indexTs)) {
+ LOG.info("Index is stale on partition '" + part.getName()
+ + "'. Modified time (" + status.getModificationTime() + ") for '" + status.getPath()
+ + "' is higher than index creation time (" + indexTs + ").");
+ return false;
+ }
}
} catch (IOException e) {
- LOG.info("failed to grab timestamp info");
- throw new HiveException(e);
+ throw new HiveException("Failed to grab timestamp information from partition '" + part.getName() + "': " + e.getMessage(), e);
}
return true;
}
@@ -156,22 +158,23 @@ public final class IndexUtils {
for (Index index : indexes) {
LOG.info("checking index staleness...");
try {
- FileSystem srcFs = src.getPath().getFileSystem(hive.getConf());
- FileStatus srcFss= srcFs.getFileStatus(src.getPath());
- String ts = index.getParameters().get("base_timestamp");
- if (ts == null) {
+ String indexTs = index.getParameters().get("base_timestamp");
+ if (indexTs == null) {
return false;
}
- long indexTs = Long.parseLong(ts);
- LOG.info(srcFss.getModificationTime());
- LOG.info(ts);
- if (srcFss.getModificationTime() > indexTs) {
- LOG.info("index is stale ");
- return false;
+
+ FileSystem srcFs = src.getPath().getFileSystem(hive.getConf());
+ FileStatus[] srcs = srcFs.listStatus(src.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
+ for (FileStatus status : srcs) {
+ if (status.getModificationTime() > Long.parseLong(indexTs)) {
+ LOG.info("Index is stale on table '" + src.getTableName()
+ + "'. Modified time (" + status.getModificationTime() + ") for '" + status.getPath()
+ + "' is higher than index creation time (" + indexTs + ").");
+ return false;
+ }
}
} catch (IOException e) {
- LOG.info("failed to grab timestamp info");
- throw new HiveException(e);
+ throw new HiveException("Failed to grab timestamp information from table '" + src.getTableName() + "': " + e.getMessage(), e);
}
}
return true;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java Thu Jan 22 05:05:05 2015
@@ -26,7 +26,6 @@ import org.apache.hadoop.hive.ql.exec.Op
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -95,9 +94,9 @@ public class JoinReorder implements Tran
private Set<String> getBigTables(ParseContext joinCtx) {
Set<String> bigTables = new HashSet<String>();
- for (QBJoinTree qbJoin : joinCtx.getJoinContext().values()) {
- if (qbJoin.getStreamAliases() != null) {
- bigTables.addAll(qbJoin.getStreamAliases());
+ for (JoinOperator joinOp : joinCtx.getJoinOps()) {
+ if (joinOp.getConf().getStreamAliases() != null) {
+ bigTables.addAll(joinOp.getConf().getStreamAliases());
}
}
@@ -155,7 +154,7 @@ public class JoinReorder implements Tran
public ParseContext transform(ParseContext pactx) throws SemanticException {
Set<String> bigTables = getBigTables(pactx);
- for (JoinOperator joinOp : pactx.getJoinContext().keySet()) {
+ for (JoinOperator joinOp : pactx.getJoinOps()) {
reorder(joinOp, bigTables);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Thu Jan 22 05:05:05 2015
@@ -61,7 +61,6 @@ import org.apache.hadoop.hive.ql.lib.Rul
import org.apache.hadoop.hive.ql.parse.GenMapRedWalker;
import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -96,18 +95,13 @@ public class MapJoinProcessor implements
// needs to be passed. Use the string defined below for that.
private static final String MAPJOINKEY_FIELDPREFIX = "mapjoinkey";
- private ParseContext pGraphContext;
-
- /**
- * empty constructor.
- */
public MapJoinProcessor() {
- pGraphContext = null;
}
@SuppressWarnings("nls")
- private Operator<? extends OperatorDesc>
- putOpInsertMap(Operator<? extends OperatorDesc> op, RowResolver rr) {
+ private static Operator<? extends OperatorDesc> putOpInsertMap (
+ ParseContext pGraphContext, Operator<? extends OperatorDesc> op,
+ RowResolver rr) {
OpParseContext ctx = new OpParseContext(rr);
pGraphContext.getOpParseCtx().put(op, ctx);
return op;
@@ -232,10 +226,10 @@ public class MapJoinProcessor implements
throws SemanticException {
LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap =
newWork.getMapWork().getOpParseCtxMap();
- QBJoinTree newJoinTree = newWork.getMapWork().getJoinTree();
// generate the map join operator; already checked the map join
MapJoinOperator newMapJoinOp = new MapJoinProcessor().convertMapJoin(conf, opParseCtxMap, op,
- newJoinTree, mapJoinPos, true, false);
+ newWork.getMapWork().isLeftInputJoin(), newWork.getMapWork().getBaseSrc(), newWork.getMapWork().getMapAliases(),
+ mapJoinPos, true, false);
genLocalWorkForMapJoin(newWork, newMapJoinOp, mapJoinPos);
}
@@ -247,7 +241,9 @@ public class MapJoinProcessor implements
MapJoinProcessor.genMapJoinLocalWork(newWork, newMapJoinOp, mapJoinPos);
// clean up the mapred work
newWork.getMapWork().setOpParseCtxMap(null);
- newWork.getMapWork().setJoinTree(null);
+ newWork.getMapWork().setLeftInputJoin(false);
+ newWork.getMapWork().setBaseSrc(null);
+ newWork.getMapWork().setMapAliases(null);
} catch (Exception e) {
e.printStackTrace();
@@ -307,9 +303,8 @@ public class MapJoinProcessor implements
*/
public MapJoinOperator convertMapJoin(HiveConf conf,
LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
- JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin,
- boolean validateMapJoinTree)
- throws SemanticException {
+ JoinOperator op, boolean leftInputJoin, String[] baseSrc, List<String> mapAliases,
+ int mapJoinPos, boolean noCheckOuterJoin, boolean validateMapJoinTree) throws SemanticException {
// outer join cannot be performed on a table which is being cached
JoinDesc desc = op.getConf();
@@ -324,8 +319,6 @@ public class MapJoinProcessor implements
// Walk over all the sources (which are guaranteed to be reduce sink
// operators).
// The join outputs a concatenation of all the inputs.
- QBJoinTree leftSrc = joinTree.getJoinSrc();
-
List<Operator<? extends OperatorDesc>> parentOps = op.getParentOperators();
List<Operator<? extends OperatorDesc>> newParentOps =
new ArrayList<Operator<? extends OperatorDesc>>();
@@ -333,7 +326,7 @@ public class MapJoinProcessor implements
new ArrayList<Operator<? extends OperatorDesc>>();
// found a source which is not to be stored in memory
- if (leftSrc != null) {
+ if (leftInputJoin) {
// assert mapJoinPos == 0;
Operator<? extends OperatorDesc> parentOp = parentOps.get(0);
assert parentOp.getParentOperators().size() == 1;
@@ -345,7 +338,7 @@ public class MapJoinProcessor implements
byte pos = 0;
// Remove parent reduce-sink operators
- for (String src : joinTree.getBaseSrc()) {
+ for (String src : baseSrc) {
if (src != null) {
Operator<? extends OperatorDesc> parentOp = parentOps.get(pos);
assert parentOp.getParentOperators().size() == 1;
@@ -360,7 +353,7 @@ public class MapJoinProcessor implements
// create the map-join operator
MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(conf, opParseCtxMap,
- op, joinTree, mapJoinPos, noCheckOuterJoin);
+ op, leftInputJoin, baseSrc, mapAliases, mapJoinPos, noCheckOuterJoin);
// remove old parents
@@ -384,11 +377,12 @@ public class MapJoinProcessor implements
public static MapJoinOperator convertJoinOpMapJoinOp(HiveConf hconf,
LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
- JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin)
- throws SemanticException {
+ JoinOperator op, boolean leftInputJoin, String[] baseSrc, List<String> mapAliases,
+ int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException {
MapJoinDesc mapJoinDescriptor =
- getMapJoinDesc(hconf, opParseCtxMap, op, joinTree, mapJoinPos, noCheckOuterJoin);
+ getMapJoinDesc(hconf, opParseCtxMap, op, leftInputJoin, baseSrc, mapAliases,
+ mapJoinPos, noCheckOuterJoin);
// reduce sink row resolver used to generate map join op
RowResolver outputRS = opParseCtxMap.get(op).getRowResolver();
@@ -441,7 +435,7 @@ public class MapJoinProcessor implements
*/
public static MapJoinOperator convertSMBJoinToMapJoin(HiveConf hconf,
Map<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
- SMBMapJoinOperator smbJoinOp, QBJoinTree joinTree, int bigTablePos, boolean noCheckOuterJoin)
+ SMBMapJoinOperator smbJoinOp, int bigTablePos, boolean noCheckOuterJoin)
throws SemanticException {
// Create a new map join operator
SMBJoinDesc smbJoinDesc = smbJoinOp.getConf();
@@ -488,7 +482,7 @@ public class MapJoinProcessor implements
}
public MapJoinOperator generateMapJoinOperator(ParseContext pctx, JoinOperator op,
- QBJoinTree joinTree, int mapJoinPos) throws SemanticException {
+ int mapJoinPos) throws SemanticException {
HiveConf hiveConf = pctx.getConf();
boolean noCheckOuterJoin = HiveConf.getBoolVar(hiveConf,
HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)
@@ -497,7 +491,8 @@ public class MapJoinProcessor implements
LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap = pctx
.getOpParseCtx();
MapJoinOperator mapJoinOp = convertMapJoin(pctx.getConf(), opParseCtxMap, op,
- joinTree, mapJoinPos, noCheckOuterJoin, true);
+ op.getConf().isLeftInputJoin(), op.getConf().getBaseSrc(), op.getConf().getMapAliases(),
+ mapJoinPos, noCheckOuterJoin, true);
// create a dummy select to select all columns
genSelectPlan(pctx, mapJoinOp);
return mapJoinOp;
@@ -626,7 +621,7 @@ public class MapJoinProcessor implements
SelectDesc select = new SelectDesc(exprs, outputs, false);
- SelectOperator sel = (SelectOperator) putOpInsertMap(OperatorFactory.getAndMakeChild(select,
+ SelectOperator sel = (SelectOperator) putOpInsertMap(pctx, OperatorFactory.getAndMakeChild(select,
new RowSchema(inputRR.getColumnInfos()), input), inputRR);
sel.setColumnExprMap(colExprMap);
@@ -643,24 +638,22 @@ public class MapJoinProcessor implements
*
* @param op
* join operator
- * @param joinTree
- * qb join tree
* @return -1 if it cannot be converted to a map-side join, position of the map join node
* otherwise
*/
- private int mapSideJoin(JoinOperator op, QBJoinTree joinTree) throws SemanticException {
+ private int mapSideJoin(JoinOperator op) throws SemanticException {
int mapJoinPos = -1;
- if (joinTree.isMapSideJoin()) {
+ if (op.getConf().isMapSideJoin()) {
int pos = 0;
// In a map-side join, exactly one table is not present in memory.
// The client provides the list of tables which can be cached in memory
// via a hint.
- if (joinTree.getJoinSrc() != null) {
+ if (op.getConf().isLeftInputJoin()) {
mapJoinPos = pos;
}
- for (String src : joinTree.getBaseSrc()) {
+ for (String src : op.getConf().getBaseSrc()) {
if (src != null) {
- if (!joinTree.getMapAliases().contains(src)) {
+ if (!op.getConf().getMapAliases().contains(src)) {
if (mapJoinPos >= 0) {
return -1;
}
@@ -675,7 +668,7 @@ public class MapJoinProcessor implements
// leaving some table from the list of tables to be cached
if (mapJoinPos == -1) {
throw new SemanticException(ErrorMsg.INVALID_MAPJOIN_HINT.getMsg(
- Arrays.toString(joinTree.getBaseSrc())));
+ Arrays.toString(op.getConf().getBaseSrc())));
}
}
@@ -691,36 +684,34 @@ public class MapJoinProcessor implements
*/
@Override
public ParseContext transform(ParseContext pactx) throws SemanticException {
- pGraphContext = pactx;
List<MapJoinOperator> listMapJoinOps = new ArrayList<MapJoinOperator>();
// traverse all the joins and convert them if necessary
- if (pGraphContext.getJoinContext() != null) {
- Map<JoinOperator, QBJoinTree> joinMap = new HashMap<JoinOperator, QBJoinTree>();
- Map<MapJoinOperator, QBJoinTree> mapJoinMap = pGraphContext.getMapJoinContext();
+ if (pactx.getJoinOps() != null) {
+ Set<JoinOperator> joinMap = new HashSet<JoinOperator>();
+ Set<MapJoinOperator> mapJoinMap = pactx.getMapJoinOps();
if (mapJoinMap == null) {
- mapJoinMap = new HashMap<MapJoinOperator, QBJoinTree>();
- pGraphContext.setMapJoinContext(mapJoinMap);
+ mapJoinMap = new HashSet<MapJoinOperator>();
+ pactx.setMapJoinOps(mapJoinMap);
}
- Set<Map.Entry<JoinOperator, QBJoinTree>> joinCtx = pGraphContext.getJoinContext().entrySet();
- Iterator<Map.Entry<JoinOperator, QBJoinTree>> joinCtxIter = joinCtx.iterator();
+ Iterator<JoinOperator> joinCtxIter = pactx.getJoinOps().iterator();
while (joinCtxIter.hasNext()) {
- Map.Entry<JoinOperator, QBJoinTree> joinEntry = joinCtxIter.next();
- JoinOperator joinOp = joinEntry.getKey();
- QBJoinTree qbJoin = joinEntry.getValue();
- int mapJoinPos = mapSideJoin(joinOp, qbJoin);
+ JoinOperator joinOp = joinCtxIter.next();
+ int mapJoinPos = mapSideJoin(joinOp);
if (mapJoinPos >= 0) {
- MapJoinOperator mapJoinOp = generateMapJoinOperator(pactx, joinOp, qbJoin, mapJoinPos);
+ MapJoinOperator mapJoinOp = generateMapJoinOperator(pactx, joinOp, mapJoinPos);
listMapJoinOps.add(mapJoinOp);
- mapJoinMap.put(mapJoinOp, qbJoin);
+ mapJoinOp.getConf().setQBJoinTreeProps(joinOp.getConf());
+ mapJoinMap.add(mapJoinOp);
} else {
- joinMap.put(joinOp, qbJoin);
+ joinOp.getConf().setQBJoinTreeProps(joinOp.getConf());
+ joinMap.add(joinOp);
}
}
// store the new joinContext
- pGraphContext.setJoinContext(joinMap);
+ pactx.setJoinOps(joinMap);
}
// Go over the list and find if a reducer is not needed
@@ -746,15 +737,15 @@ public class MapJoinProcessor implements
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
Dispatcher disp = new DefaultRuleDispatcher(getDefault(), opRules, new MapJoinWalkerCtx(
- listMapJoinOpsNoRed, pGraphContext));
+ listMapJoinOpsNoRed, pactx));
GraphWalker ogw = new GenMapRedWalker(disp);
ArrayList<Node> topNodes = new ArrayList<Node>();
topNodes.addAll(listMapJoinOps);
ogw.startWalking(topNodes, null);
- pGraphContext.setListMapJoinOpsNoReducer(listMapJoinOpsNoRed);
- return pGraphContext;
+ pactx.setListMapJoinOpsNoReducer(listMapJoinOpsNoRed);
+ return pactx;
}
/**
@@ -800,7 +791,7 @@ public class MapJoinProcessor implements
}
Operator<? extends OperatorDesc> ch = parent.getChildOperators().get(0);
if (ch instanceof MapJoinOperator) {
- if (!nonSubqueryMapJoin(ctx.getpGraphContext(), (MapJoinOperator) ch, mapJoin)) {
+ if (!nonSubqueryMapJoin((MapJoinOperator) ch, mapJoin)) {
if (ch.getParentOperators().indexOf(parent) == ((MapJoinOperator) ch).getConf()
.getPosBigTable()) {
// not come from the local branch
@@ -820,11 +811,8 @@ public class MapJoinProcessor implements
}
}
- private boolean nonSubqueryMapJoin(ParseContext pGraphContext, MapJoinOperator mapJoin,
- MapJoinOperator parentMapJoin) {
- QBJoinTree joinTree = pGraphContext.getMapJoinContext().get(mapJoin);
- QBJoinTree parentJoinTree = pGraphContext.getMapJoinContext().get(parentMapJoin);
- if (joinTree.getJoinSrc() != null && joinTree.getJoinSrc().equals(parentJoinTree)) {
+ private boolean nonSubqueryMapJoin(MapJoinOperator mapJoin, MapJoinOperator parentMapJoin) {
+ if (mapJoin.getParentOperators().contains(parentMapJoin)) {
return true;
}
return false;
@@ -1028,15 +1016,15 @@ public class MapJoinProcessor implements
}
- public static ObjectPair<List<ReduceSinkOperator>, Map<Byte, List<ExprNodeDesc>>> getKeys(QBJoinTree joinTree, JoinOperator op) {
+ public static ObjectPair<List<ReduceSinkOperator>, Map<Byte, List<ExprNodeDesc>>> getKeys(
+ boolean leftInputJoin, String[] baseSrc, JoinOperator op) {
// Walk over all the sources (which are guaranteed to be reduce sink
// operators).
// The join outputs a concatenation of all the inputs.
- QBJoinTree leftSrc = joinTree.getJoinSrc();
List<ReduceSinkOperator> oldReduceSinkParentOps =
new ArrayList<ReduceSinkOperator>(op.getNumParent());
- if (leftSrc != null) {
+ if (leftInputJoin) {
// assert mapJoinPos == 0;
Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(0);
assert parentOp.getParentOperators().size() == 1;
@@ -1044,7 +1032,7 @@ public class MapJoinProcessor implements
}
byte pos = 0;
- for (String src : joinTree.getBaseSrc()) {
+ for (String src : baseSrc) {
if (src != null) {
Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos);
assert parentOp.getParentOperators().size() == 1;
@@ -1062,12 +1050,14 @@ public class MapJoinProcessor implements
keyExprMap.put(pos, keyCols);
}
- return new ObjectPair<List<ReduceSinkOperator>, Map<Byte,List<ExprNodeDesc>>>(oldReduceSinkParentOps, keyExprMap);
+ return new ObjectPair<List<ReduceSinkOperator>, Map<Byte,List<ExprNodeDesc>>>(
+ oldReduceSinkParentOps, keyExprMap);
}
public static MapJoinDesc getMapJoinDesc(HiveConf hconf,
LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
- JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException {
+ JoinOperator op, boolean leftInputJoin, String[] baseSrc, List<String> mapAliases,
+ int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException {
JoinDesc desc = op.getConf();
JoinCondDesc[] condns = desc.getConds();
Byte[] tagOrder = desc.getTagOrder();
@@ -1084,7 +1074,8 @@ public class MapJoinProcessor implements
Map<Byte, List<ExprNodeDesc>> valueExprs = op.getConf().getExprs();
Map<Byte, List<ExprNodeDesc>> newValueExprs = new HashMap<Byte, List<ExprNodeDesc>>();
- ObjectPair<List<ReduceSinkOperator>, Map<Byte,List<ExprNodeDesc>>> pair = getKeys(joinTree, op);
+ ObjectPair<List<ReduceSinkOperator>, Map<Byte,List<ExprNodeDesc>>> pair =
+ getKeys(leftInputJoin, baseSrc, op);
List<ReduceSinkOperator> oldReduceSinkParentOps = pair.getFirst();
for (Map.Entry<Byte, List<ExprNodeDesc>> entry : valueExprs.entrySet()) {
byte tag = entry.getKey();
@@ -1174,8 +1165,8 @@ public class MapJoinProcessor implements
// create dumpfile prefix needed to create descriptor
String dumpFilePrefix = "";
- if (joinTree.getMapAliases() != null) {
- for (String mapAlias : joinTree.getMapAliases()) {
+ if (mapAliases != null) {
+ for (String mapAlias : mapAliases) {
dumpFilePrefix = dumpFilePrefix + mapAlias;
}
dumpFilePrefix = dumpFilePrefix + "-" + PlanUtils.getCountForMapJoinDumpFilePrefix();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java Thu Jan 22 05:05:05 2015
@@ -30,6 +30,7 @@ import java.util.Stack;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
@@ -42,7 +43,6 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.lib.Rule;
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -55,15 +55,12 @@ import org.apache.hadoop.hive.ql.plan.Op
*/
public class NonBlockingOpDeDupProc implements Transform {
- private ParseContext pctx;
-
@Override
public ParseContext transform(ParseContext pctx) throws SemanticException {
- this.pctx = pctx;
String SEL = SelectOperator.getOperatorName();
String FIL = FilterOperator.getOperatorName();
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- opRules.put(new RuleRegExp("R1", SEL + "%" + SEL + "%"), new SelectDedup());
+ opRules.put(new RuleRegExp("R1", SEL + "%" + SEL + "%"), new SelectDedup(pctx));
opRules.put(new RuleRegExp("R2", FIL + "%" + FIL + "%"), new FilterDedup());
Dispatcher disp = new DefaultRuleDispatcher(null, opRules, null);
@@ -76,6 +73,13 @@ public class NonBlockingOpDeDupProc impl
}
private class SelectDedup implements NodeProcessor {
+
+ private ParseContext pctx;
+
+ public SelectDedup (ParseContext pctx) {
+ this.pctx = pctx;
+ }
+
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
@@ -178,28 +182,37 @@ public class NonBlockingOpDeDupProc impl
}
return true;
}
- }
- /**
- * Change existing references in the context to point from child to parent operator.
- * @param cSEL child operator (to be removed, and merged into parent)
- * @param pSEL parent operator
- */
- private void fixContextReferences(SelectOperator cSEL, SelectOperator pSEL) {
- Collection<QBJoinTree> qbJoinTrees = new ArrayList<QBJoinTree>();
- qbJoinTrees.addAll(pctx.getJoinContext().values());
- qbJoinTrees.addAll(pctx.getMapJoinContext().values());
- for (QBJoinTree qbJoinTree : qbJoinTrees) {
- Map<String, Operator<? extends OperatorDesc>> aliasToOpInfo = qbJoinTree.getAliasToOpInfo();
- for (Map.Entry<String, Operator<? extends OperatorDesc>> entry : aliasToOpInfo.entrySet()) {
- if (entry.getValue() == cSEL) {
- aliasToOpInfo.put(entry.getKey(), pSEL);
+ /**
+ * Change existing references in the context to point from child to parent operator.
+ * @param cSEL child operator (to be removed, and merged into parent)
+ * @param pSEL parent operator
+ */
+ private void fixContextReferences(SelectOperator cSEL, SelectOperator pSEL) {
+ Collection<Map<String, Operator<? extends OperatorDesc>>> mapsAliasToOpInfo =
+ new ArrayList<Map<String, Operator<? extends OperatorDesc>>>();
+ for (JoinOperator joinOp : pctx.getJoinOps()) {
+ if (joinOp.getConf().getAliasToOpInfo() != null) {
+ mapsAliasToOpInfo.add(joinOp.getConf().getAliasToOpInfo());
+ }
+ }
+ for (MapJoinOperator mapJoinOp : pctx.getMapJoinOps()) {
+ if (mapJoinOp.getConf().getAliasToOpInfo() != null) {
+ mapsAliasToOpInfo.add(mapJoinOp.getConf().getAliasToOpInfo());
+ }
+ }
+ for (Map<String, Operator<? extends OperatorDesc>> aliasToOpInfo : mapsAliasToOpInfo) {
+ for (Map.Entry<String, Operator<? extends OperatorDesc>> entry : aliasToOpInfo.entrySet()) {
+ if (entry.getValue() == cSEL) {
+ aliasToOpInfo.put(entry.getKey(), pSEL);
+ }
}
}
}
}
private class FilterDedup implements NodeProcessor {
+
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java Thu Jan 22 05:05:05 2015
@@ -143,8 +143,15 @@ public class SimpleFetchOptimizer implem
}
private boolean checkThreshold(FetchData data, int limit, ParseContext pctx) throws Exception {
- if (limit > 0 && data.hasOnlyPruningFilter()) {
- return true;
+ if (limit > 0) {
+ if (data.hasOnlyPruningFilter()) {
+ /* partitioned table + query has only pruning filters */
+ return true;
+ } else if (data.isPartitioned() == false && data.isFiltered() == false) {
+ /* unpartitioned table + no filters */
+ return true;
+ }
+ /* fall through */
}
long threshold = HiveConf.getLongVar(pctx.getConf(),
HiveConf.ConfVars.HIVEFETCHTASKCONVERSIONTHRESHOLD);
@@ -176,7 +183,7 @@ public class SimpleFetchOptimizer implem
if (!aggressive && qb.hasTableSample(alias)) {
return null;
}
- Table table = pctx.getTopToTable().get(ts);
+ Table table = ts.getConf().getTableMetadata();
if (table == null) {
return null;
}
@@ -228,6 +235,10 @@ public class SimpleFetchOptimizer implem
if (op.getChildOperators() == null || op.getChildOperators().size() != 1) {
return null;
}
+
+ if (op instanceof FilterOperator) {
+ fetch.setFiltered(true);
+ }
}
if (op instanceof FileSinkOperator) {
@@ -279,6 +290,11 @@ public class SimpleFetchOptimizer implem
|| operator instanceof ScriptOperator) {
return false;
}
+
+ if (operator instanceof FilterOperator) {
+ fetch.setFiltered(true);
+ }
+
if (!traversed.add(operator)) {
return true;
}
@@ -314,6 +330,7 @@ public class SimpleFetchOptimizer implem
// this is always non-null when conversion is completed
private Operator<?> fileSink;
+ private boolean filtered;
private FetchData(TableScanOperator scanOp, ReadEntity parent, Table table, SplitSample splitSample) {
this.scanOp = scanOp;
@@ -337,10 +354,23 @@ public class SimpleFetchOptimizer implem
/*
* all filters were executed during partition pruning
*/
- public boolean hasOnlyPruningFilter() {
+ public final boolean hasOnlyPruningFilter() {
return this.onlyPruningFilter;
}
+ public final boolean isPartitioned() {
+ return this.table.isPartitioned();
+ }
+
+ /* there are filter operators in the pipeline */
+ public final boolean isFiltered() {
+ return this.filtered;
+ }
+
+ public final void setFiltered(boolean filtered) {
+ this.filtered = filtered;
+ }
+
private FetchWork convertToWork() throws HiveException {
inputs.clear();
if (!table.isPartitioned()) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java Thu Jan 22 05:05:05 2015
@@ -79,11 +79,13 @@ import org.apache.hadoop.hive.serde2.typ
public class SkewJoinOptimizer implements Transform {
private static final Log LOG = LogFactory.getLog(SkewJoinOptimizer.class.getName());
- private static ParseContext parseContext;
public static class SkewJoinProc implements NodeProcessor {
- public SkewJoinProc() {
+ private ParseContext parseContext;
+
+ public SkewJoinProc(ParseContext parseContext) {
super();
+ this.parseContext = parseContext;
}
@Override
@@ -165,23 +167,14 @@ public class SkewJoinOptimizer implement
return null;
}
- // have to create a QBJoinTree for the cloned join operator
- QBJoinTree originJoinTree = parseContext.getJoinContext().get(joinOp);
- QBJoinTree newJoinTree;
- try {
- newJoinTree = originJoinTree.clone();
- } catch (CloneNotSupportedException e) {
- LOG.debug("QBJoinTree could not be cloned: ", e);
- return null;
- }
-
JoinOperator joinOpClone;
if (processSelect) {
joinOpClone = (JoinOperator)(currOpClone.getParentOperators().get(0));
} else {
joinOpClone = (JoinOperator)currOpClone;
}
- parseContext.getJoinContext().put(joinOpClone, newJoinTree);
+ joinOpClone.getConf().cloneQBJoinTreeProps(joinOp.getConf());
+ parseContext.getJoinOps().add(joinOpClone);
List<TableScanOperator> tableScanCloneOpsForJoin =
new ArrayList<TableScanOperator>();
@@ -202,9 +195,7 @@ public class SkewJoinOptimizer implement
for (Entry<String, Operator<? extends OperatorDesc>> topOp : topOps.entrySet()) {
TableScanOperator tso = (TableScanOperator) topOp.getValue();
- Table origTable = parseContext.getTopToTable().get(ctx.getCloneTSOpMap().get(tso));
String tabAlias = tso.getConf().getAlias();
- parseContext.getTopToTable().put(tso, origTable);
int initCnt = 1;
String newAlias = "subquery" + initCnt + ":" + tabAlias;
while (origTopOps.containsKey(newAlias)) {
@@ -213,7 +204,7 @@ public class SkewJoinOptimizer implement
}
parseContext.getTopOps().put(newAlias, tso);
- setUpAlias(originJoinTree, newJoinTree, tabAlias, newAlias, tso);
+ setUpAlias(joinOp, joinOpClone, tabAlias, newAlias, tso);
}
// Now do a union of the select operators: selectOp and selectOpClone
@@ -410,7 +401,7 @@ public class SkewJoinOptimizer implement
if (op instanceof TableScanOperator) {
TableScanOperator tsOp = (TableScanOperator)op;
if (tableScanOpsForJoin.contains(tsOp)) {
- return parseContext.getTopToTable().get(tsOp);
+ return tsOp.getConf().getTableMetadata();
}
}
if ((op.getParentOperators() == null) || (op.getParentOperators().isEmpty()) ||
@@ -629,19 +620,19 @@ public class SkewJoinOptimizer implement
/**
* Set alias in the cloned join tree
*/
- private static void setUpAlias(QBJoinTree origin, QBJoinTree cloned, String origAlias,
+ private static void setUpAlias(JoinOperator origin, JoinOperator cloned, String origAlias,
String newAlias, Operator<? extends OperatorDesc> topOp) {
- cloned.getAliasToOpInfo().remove(origAlias);
- cloned.getAliasToOpInfo().put(newAlias, topOp);
- if (origin.getLeftAlias().equals(origAlias)) {
- cloned.setLeftAlias(null);
- cloned.setLeftAlias(newAlias);
- }
- replaceAlias(origin.getLeftAliases(), cloned.getLeftAliases(), origAlias, newAlias);
- replaceAlias(origin.getRightAliases(), cloned.getRightAliases(), origAlias, newAlias);
- replaceAlias(origin.getBaseSrc(), cloned.getBaseSrc(), origAlias, newAlias);
- replaceAlias(origin.getMapAliases(), cloned.getMapAliases(), origAlias, newAlias);
- replaceAlias(origin.getStreamAliases(), cloned.getStreamAliases(), origAlias, newAlias);
+ cloned.getConf().getAliasToOpInfo().remove(origAlias);
+ cloned.getConf().getAliasToOpInfo().put(newAlias, topOp);
+ if (origin.getConf().getLeftAlias().equals(origAlias)) {
+ cloned.getConf().setLeftAlias(null);
+ cloned.getConf().setLeftAlias(newAlias);
+ }
+ replaceAlias(origin.getConf().getLeftAliases(), cloned.getConf().getLeftAliases(), origAlias, newAlias);
+ replaceAlias(origin.getConf().getRightAliases(), cloned.getConf().getRightAliases(), origAlias, newAlias);
+ replaceAlias(origin.getConf().getBaseSrc(), cloned.getConf().getBaseSrc(), origAlias, newAlias);
+ replaceAlias(origin.getConf().getMapAliases(), cloned.getConf().getMapAliases(), origAlias, newAlias);
+ replaceAlias(origin.getConf().getStreamAliases(), cloned.getConf().getStreamAliases(), origAlias, newAlias);
}
private static void replaceAlias(String[] origin, String[] cloned,
@@ -677,7 +668,7 @@ public class SkewJoinOptimizer implement
public ParseContext transform(ParseContext pctx) throws SemanticException {
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- opRules.put(new RuleRegExp("R1", "TS%.*RS%JOIN%"), getSkewJoinProc());
+ opRules.put(new RuleRegExp("R1", "TS%.*RS%JOIN%"), getSkewJoinProc(pctx));
SkewJoinOptProcCtx skewJoinOptProcCtx = new SkewJoinOptProcCtx(pctx);
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
@@ -692,8 +683,8 @@ public class SkewJoinOptimizer implement
return pctx;
}
- private NodeProcessor getSkewJoinProc() {
- return new SkewJoinProc();
+ private NodeProcessor getSkewJoinProc(ParseContext parseContext) {
+ return new SkewJoinProc(parseContext);
}
/**
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java Thu Jan 22 05:05:05 2015
@@ -143,7 +143,7 @@ public class SortedDynPartitionOptimizer
return null;
}
- Table destTable = parseCtx.getFsopToTable().get(fsOp);
+ Table destTable = fsOp.getConf().getTable();
if (destTable == null) {
LOG.debug("Bailing out of sort dynamic partition optimization as destination table is null");
return null;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java Thu Jan 22 05:05:05 2015
@@ -60,7 +60,7 @@ public class SortedMergeBucketMapjoinPro
}
if (convert) {
- convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext, pGraphContext);
+ convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext);
}
return null;
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java Thu Jan 22 05:05:05 2015
@@ -44,10 +44,10 @@ public class SortedMergeJoinProc extends
SortBucketJoinProcCtx smbJoinContext = (SortBucketJoinProcCtx) procCtx;
boolean convert =
canConvertJoinToSMBJoin(
- joinOp, smbJoinContext, pGraphContext);
+ joinOp, smbJoinContext);
if (convert) {
- convertJoinToSMBJoin(joinOp, smbJoinContext, pGraphContext);
+ convertJoinToSMBJoin(joinOp, smbJoinContext);
}
return null;
}