You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2012/07/17 09:42:49 UTC
svn commit: r1362391 [1/3] - in /hive/trunk/ql/src:
java/org/apache/hadoop/hive/ql/optimizer/
java/org/apache/hadoop/hive/ql/parse/ test/queries/clientpositive/
test/results/clientpositive/
Author: namit
Date: Tue Jul 17 07:42:48 2012
New Revision: 1362391
URL: http://svn.apache.org/viewvc?rev=1362391&view=rev
Log:
HIVE-3210 Support Bucketed mapjoin on partitioned table which has two or more partitions
(Navis via namit)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java
hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin2.q
hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin2.q.out
hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin_negative2.q.out
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java?rev=1362391&r1=1362390&r2=1362391&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java Tue Jul 17 07:42:48 2012
@@ -19,8 +19,9 @@ package org.apache.hadoop.hive.ql.optimi
import java.io.IOException;
import java.io.Serializable;
+import java.net.URI;
import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
@@ -141,6 +142,7 @@ public class BucketMapJoinOptimizer impl
}
@Override
+ @SuppressWarnings("unchecked")
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
@@ -179,11 +181,11 @@ public class BucketMapJoinOptimizer impl
}
MapJoinDesc mjDecs = mapJoinOp.getConf();
- LinkedHashMap<String, Integer> aliasToBucketNumberMapping = new LinkedHashMap<String, Integer>();
- LinkedHashMap<String, List<String>> aliasToBucketFileNamesMapping = new LinkedHashMap<String, List<String>>();
- // right now this code does not work with "a join b on a.key = b.key and
- // a.ds = b.ds", where ds is a partition column. It only works with joins
- // with only one partition presents in each join source tables.
+ LinkedHashMap<String, List<Integer>> aliasToPartitionBucketNumberMapping =
+ new LinkedHashMap<String, List<Integer>>();
+ LinkedHashMap<String, List<List<String>>> aliasToPartitionBucketFileNamesMapping =
+ new LinkedHashMap<String, List<List<String>>>();
+
Map<String, Operator<? extends Serializable>> topOps = this.pGraphContext.getTopOps();
Map<TableScanOperator, Table> topToTable = this.pGraphContext.getTopToTable();
@@ -200,7 +202,7 @@ public class BucketMapJoinOptimizer impl
}
Table tbl = topToTable.get(tso);
if(tbl.isPartitioned()) {
- PrunedPartitionList prunedParts = null;
+ PrunedPartitionList prunedParts;
try {
prunedParts = pGraphContext.getOpToPartList().get(tso);
if (prunedParts == null) {
@@ -214,109 +216,50 @@ public class BucketMapJoinOptimizer impl
LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
throw new SemanticException(e.getMessage(), e);
}
- int partNumber = prunedParts.getConfirmedPartns().size()
- + prunedParts.getUnknownPartns().size();
-
- if (partNumber > 1) {
- // only allow one partition for small tables
- if(alias != baseBigAlias) {
- return null;
- }
- // here is the big table,and we get more than one partitions.
- // construct a mapping of (Partition->bucket file names) and
- // (Partition -> bucket number)
- Iterator<Partition> iter = prunedParts.getConfirmedPartns()
- .iterator();
- while (iter.hasNext()) {
- Partition p = iter.next();
+ List<Partition> partitions = prunedParts.getNotDeniedPartns();
+ // construct a mapping of (Partition->bucket file names) and (Partition -> bucket number)
+ if (partitions.size() >= 1) {
+ List<Integer> buckets = new ArrayList<Integer>();
+ List<List<String>> files = new ArrayList<List<String>>();
+ for (Partition p : partitions) {
if (!checkBucketColumns(p.getBucketCols(), mjDecs, index)) {
return null;
}
- List<String> fileNames = getOnePartitionBucketFileNames(p);
- bigTblPartsToBucketFileNames.put(p, fileNames);
- bigTblPartsToBucketNumber.put(p, p.getBucketCount());
- }
- iter = prunedParts.getUnknownPartns().iterator();
- while (iter.hasNext()) {
- Partition p = iter.next();
- if (!checkBucketColumns(p.getBucketCols(), mjDecs, index)) {
- return null;
+ List<String> fileNames = getOnePartitionBucketFileNames(p.getDataLocation());
+ if (alias.equals(baseBigAlias)) {
+ bigTblPartsToBucketFileNames.put(p, fileNames);
+ bigTblPartsToBucketNumber.put(p, p.getBucketCount());
+ } else {
+ files.add(fileNames);
+ buckets.add(p.getBucketCount());
}
- List<String> fileNames = getOnePartitionBucketFileNames(p);
- bigTblPartsToBucketFileNames.put(p, fileNames);
- bigTblPartsToBucketNumber.put(p, p.getBucketCount());
}
- // If there are more than one partition for the big
- // table,aliasToBucketFileNamesMapping and partsToBucketNumber will
- // not contain mappings for the big table. Instead, the mappings are
- // contained in bigTblPartsToBucketFileNames and
- // bigTblPartsToBucketNumber
-
- } else {
- Partition part = null;
- Iterator<Partition> iter = prunedParts.getConfirmedPartns()
- .iterator();
- if (iter.hasNext()) {
- part = iter.next();
- }
- if (part == null) {
- iter = prunedParts.getUnknownPartns().iterator();
- if (iter.hasNext()) {
- part = iter.next();
- }
- }
- assert part != null;
- Integer num = new Integer(part.getBucketCount());
- aliasToBucketNumberMapping.put(alias, num);
- if (!checkBucketColumns(part.getBucketCols(), mjDecs, index)) {
- return null;
- }
- List<String> fileNames = getOnePartitionBucketFileNames(part);
- aliasToBucketFileNamesMapping.put(alias, fileNames);
- if (alias == baseBigAlias) {
- bigTblPartsToBucketFileNames.put(part, fileNames);
- bigTblPartsToBucketNumber.put(part, num);
+ if (!alias.equals(baseBigAlias)) {
+ aliasToPartitionBucketNumberMapping.put(alias, buckets);
+ aliasToPartitionBucketFileNamesMapping.put(alias, files);
}
}
} else {
if (!checkBucketColumns(tbl.getBucketCols(), mjDecs, index)) {
return null;
}
+ List<String> fileNames = getOnePartitionBucketFileNames(tbl.getDataLocation());
Integer num = new Integer(tbl.getNumBuckets());
- aliasToBucketNumberMapping.put(alias, num);
- List<String> fileNames = new ArrayList<String>();
- try {
- FileSystem fs = FileSystem.get(tbl.getDataLocation(), this.pGraphContext.getConf());
- FileStatus[] files = fs.listStatus(new Path(tbl.getDataLocation().toString()));
- if(files != null) {
- for(FileStatus file : files) {
- fileNames.add(file.getPath().toString());
- }
- }
- } catch (IOException e) {
- throw new SemanticException(e);
+ if (alias.equals(baseBigAlias)) {
+ bigTblPartsToBucketFileNames.put(null, fileNames);
+ bigTblPartsToBucketNumber.put(null, tbl.getNumBuckets());
+ } else {
+ aliasToPartitionBucketNumberMapping.put(alias, Arrays.asList(num));
+ aliasToPartitionBucketFileNamesMapping.put(alias, Arrays.asList(fileNames));
}
- aliasToBucketFileNamesMapping.put(alias, fileNames);
}
}
// All tables or partitions are bucketed, and their bucket number is
// stored in 'bucketNumbers', we need to check if the number of buckets in
// the big table can be divided by no of buckets in small tables.
- if (bigTblPartsToBucketNumber.size() > 0) {
- Iterator<Entry<Partition, Integer>> bigTblPartToBucketNumber = bigTblPartsToBucketNumber
- .entrySet().iterator();
- while (bigTblPartToBucketNumber.hasNext()) {
- int bucketNumberInPart = bigTblPartToBucketNumber.next().getValue();
- if (!checkBucketNumberAgainstBigTable(aliasToBucketNumberMapping,
- bucketNumberInPart)) {
- return null;
- }
- }
- } else {
- int bucketNoInBigTbl = aliasToBucketNumberMapping.get(baseBigAlias).intValue();
- if (!checkBucketNumberAgainstBigTable(aliasToBucketNumberMapping,
- bucketNoInBigTbl)) {
+ for (Integer bucketNumber : bigTblPartsToBucketNumber.values()) {
+ if (!checkBucketNumberAgainstBigTable(aliasToPartitionBucketNumberMapping, bucketNumber)) {
return null;
}
}
@@ -327,13 +270,8 @@ public class BucketMapJoinOptimizer impl
new LinkedHashMap<String, LinkedHashMap<String, ArrayList<String>>>();
//sort bucket names for the big table
- if(bigTblPartsToBucketNumber.size() > 0) {
- Collection<List<String>> bucketNamesAllParts = bigTblPartsToBucketFileNames.values();
- for(List<String> partBucketNames : bucketNamesAllParts) {
- Collections.sort(partBucketNames);
- }
- } else {
- Collections.sort(aliasToBucketFileNamesMapping.get(baseBigAlias));
+ for(List<String> partBucketNames : bigTblPartsToBucketFileNames.values()) {
+ Collections.sort(partBucketNames);
}
// go through all small tables and get the mapping from bucket file name
@@ -343,32 +281,28 @@ public class BucketMapJoinOptimizer impl
if(alias.equals(baseBigAlias)) {
continue;
}
- Collections.sort(aliasToBucketFileNamesMapping.get(alias));
+ for (List<String> names : aliasToPartitionBucketFileNamesMapping.get(alias)) {
+ Collections.sort(names);
+ }
+ List<Integer> smallTblBucketNums = aliasToPartitionBucketNumberMapping.get(alias);
+ List<List<String>> smallTblFilesList = aliasToPartitionBucketFileNamesMapping.get(alias);
+
LinkedHashMap<String, ArrayList<String>> mapping = new LinkedHashMap<String, ArrayList<String>>();
aliasBucketFileNameMapping.put(alias, mapping);
// for each bucket file in big table, get the corresponding bucket file
// name in the small table.
- if (bigTblPartsToBucketNumber.size() > 0) {
- //more than 1 partition in the big table, do the mapping for each partition
- Iterator<Entry<Partition, List<String>>> bigTblPartToBucketNames = bigTblPartsToBucketFileNames
- .entrySet().iterator();
- Iterator<Entry<Partition, Integer>> bigTblPartToBucketNum = bigTblPartsToBucketNumber
- .entrySet().iterator();
- while (bigTblPartToBucketNames.hasNext()) {
- assert bigTblPartToBucketNum.hasNext();
- int bigTblBucketNum = bigTblPartToBucketNum.next().getValue().intValue();
- List<String> bigTblBucketNameList = bigTblPartToBucketNames.next().getValue();
- fillMapping(baseBigAlias, aliasToBucketNumberMapping,
- aliasToBucketFileNamesMapping, alias, mapping, bigTblBucketNum,
- bigTblBucketNameList, desc.getBucketFileNameMapping());
- }
- } else {
- List<String> bigTblBucketNameList = aliasToBucketFileNamesMapping.get(baseBigAlias);
- int bigTblBucketNum = aliasToBucketNumberMapping.get(baseBigAlias);
- fillMapping(baseBigAlias, aliasToBucketNumberMapping,
- aliasToBucketFileNamesMapping, alias, mapping, bigTblBucketNum,
- bigTblBucketNameList, desc.getBucketFileNameMapping());
+ //more than 1 partition in the big table, do the mapping for each partition
+ Iterator<Entry<Partition, List<String>>> bigTblPartToBucketNames =
+ bigTblPartsToBucketFileNames.entrySet().iterator();
+ Iterator<Entry<Partition, Integer>> bigTblPartToBucketNum = bigTblPartsToBucketNumber
+ .entrySet().iterator();
+ while (bigTblPartToBucketNames.hasNext()) {
+ assert bigTblPartToBucketNum.hasNext();
+ int bigTblBucketNum = bigTblPartToBucketNum.next().getValue();
+ List<String> bigTblBucketNameList = bigTblPartToBucketNames.next().getValue();
+ fillMapping(smallTblBucketNums, smallTblFilesList,
+ mapping, bigTblBucketNum, bigTblBucketNameList, desc.getBucketFileNameMapping());
}
}
desc.setAliasBucketFileNameMapping(aliasBucketFileNameMapping);
@@ -376,61 +310,59 @@ public class BucketMapJoinOptimizer impl
return null;
}
- private void fillMapping(String baseBigAlias,
- LinkedHashMap<String, Integer> aliasToBucketNumberMapping,
- LinkedHashMap<String, List<String>> aliasToBucketFileNamesMapping,
- String alias, LinkedHashMap<String, ArrayList<String>> mapping,
+ // called for each partition of big table and populates mapping for each file in the partition
+ private void fillMapping(
+ List<Integer> smallTblBucketNums,
+ List<List<String>> smallTblFilesList,
+ LinkedHashMap<String, ArrayList<String>> mapping,
int bigTblBucketNum, List<String> bigTblBucketNameList,
LinkedHashMap<String, Integer> bucketFileNameMapping) {
- for (int index = 0; index < bigTblBucketNameList.size(); index++) {
- String inputBigTBLBucket = bigTblBucketNameList.get(index);
- int smallTblBucketNum = aliasToBucketNumberMapping.get(alias);
+
+ for (int bindex = 0; bindex < bigTblBucketNameList.size(); bindex++) {
ArrayList<String> resultFileNames = new ArrayList<String>();
- if (bigTblBucketNum >= smallTblBucketNum) {
- // if the big table has more buckets than the current small table,
- // use "MOD" to get small table bucket names. For example, if the big
- // table has 4 buckets and the small table has 2 buckets, then the
- // mapping should be 0->0, 1->1, 2->0, 3->1.
- int toAddSmallIndex = index % smallTblBucketNum;
- if(toAddSmallIndex < aliasToBucketFileNamesMapping.get(alias).size()) {
- resultFileNames.add(aliasToBucketFileNamesMapping.get(alias).get(toAddSmallIndex));
- }
- } else {
- int jump = smallTblBucketNum / bigTblBucketNum;
- List<String> bucketNames = aliasToBucketFileNamesMapping.get(alias);
- for (int i = index; i < aliasToBucketFileNamesMapping.get(alias).size(); i = i + jump) {
- if(i <= aliasToBucketFileNamesMapping.get(alias).size()) {
- resultFileNames.add(bucketNames.get(i));
+ for (int sindex = 0 ; sindex < smallTblBucketNums.size(); sindex++) {
+ int smallTblBucketNum = smallTblBucketNums.get(sindex);
+ List<String> smallTblFileNames = smallTblFilesList.get(sindex);
+ if (bigTblBucketNum >= smallTblBucketNum) {
+ // if the big table has more buckets than the current small table,
+ // use "MOD" to get small table bucket names. For example, if the big
+ // table has 4 buckets and the small table has 2 buckets, then the
+ // mapping should be 0->0, 1->1, 2->0, 3->1.
+ int toAddSmallIndex = bindex % smallTblBucketNum;
+ resultFileNames.add(smallTblFileNames.get(toAddSmallIndex));
+ } else {
+ int jump = smallTblBucketNum / bigTblBucketNum;
+ for (int i = bindex; i < smallTblFileNames.size(); i = i + jump) {
+ resultFileNames.add(smallTblFileNames.get(i));
}
}
}
+ String inputBigTBLBucket = bigTblBucketNameList.get(bindex);
mapping.put(inputBigTBLBucket, resultFileNames);
- bucketFileNameMapping.put(inputBigTBLBucket, index);
+ bucketFileNameMapping.put(inputBigTBLBucket, bindex);
}
}
private boolean checkBucketNumberAgainstBigTable(
- LinkedHashMap<String, Integer> aliasToBucketNumber,
- int bucketNumberInPart) {
- Iterator<Integer> iter = aliasToBucketNumber.values().iterator();
- while(iter.hasNext()) {
- int nxt = iter.next().intValue();
- boolean ok = (nxt >= bucketNumberInPart) ? nxt % bucketNumberInPart == 0
- : bucketNumberInPart % nxt == 0;
- if(!ok) {
- return false;
+ Map<String, List<Integer>> aliasToBucketNumber, int bucketNumberInPart) {
+ for (List<Integer> bucketNums : aliasToBucketNumber.values()) {
+ for (int nxt : bucketNums) {
+ boolean ok = (nxt >= bucketNumberInPart) ? nxt % bucketNumberInPart == 0
+ : bucketNumberInPart % nxt == 0;
+ if (!ok) {
+ return false;
+ }
}
}
return true;
}
- private List<String> getOnePartitionBucketFileNames(Partition part)
+ private List<String> getOnePartitionBucketFileNames(URI location)
throws SemanticException {
List<String> fileNames = new ArrayList<String>();
try {
- FileSystem fs = FileSystem.get(part.getDataLocation(), this.pGraphContext.getConf());
- FileStatus[] files = fs.listStatus(new Path(part.getDataLocation()
- .toString()));
+ FileSystem fs = FileSystem.get(location, this.pGraphContext.getConf());
+ FileStatus[] files = fs.listStatus(new Path(location.toString()));
if (files != null) {
for (FileStatus file : files) {
fileNames.add(file.getPath().toString());
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java?rev=1362391&r1=1362390&r2=1362391&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java Tue Jul 17 07:42:48 2012
@@ -229,6 +229,11 @@ public class SortedMergeBucketMapJoinOpt
LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
throw new SemanticException(e.getMessage(), e);
}
+ int partNumber = prunedParts.getConfirmedPartns().size()
+ + prunedParts.getUnknownPartns().size();
+ if (partNumber > 1) {
+ return false;
+ }
boolean ret = true;
for (Partition p : prunedParts.getConfirmedPartns()) {
ret = ret && checkSortColsAndJoinCols(p.getSortCols(), joinCols);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java?rev=1362391&r1=1362390&r2=1362391&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java Tue Jul 17 07:42:48 2012
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.ql.parse;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -76,6 +78,16 @@ public class PrunedPartitionList {
}
/**
+ * return all not-denied(confirmed + unknown) partitions.
+ */
+ public List<Partition> getNotDeniedPartns() {
+ List<Partition> partitions = new ArrayList<Partition>();
+ partitions.addAll(unknownPartns);
+ partitions.addAll(confirmedPartns);
+ return partitions;
+ }
+
+ /**
* set confirmed partitions.
*
* @param confirmedPartns
Modified: hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin2.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin2.q?rev=1362391&r1=1362390&r2=1362391&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin2.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin2.q Tue Jul 17 07:42:48 2012
@@ -1,7 +1,3 @@
-CREATE TABLE srcbucket_mapjoin(key int, value string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
-load data local inpath '../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin;
-load data local inpath '../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin;
-
CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
load data local inpath '../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
load data local inpath '../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
@@ -21,12 +17,12 @@ create table bucketmapjoin_tmp_result (k
explain extended
insert overwrite table bucketmapjoin_tmp_result
select /*+mapjoin(b)*/ a.key, a.value, b.value
-from srcbucket_mapjoin a join srcbucket_mapjoin_part_2 b
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
on a.key=b.key and b.ds="2008-04-08";
insert overwrite table bucketmapjoin_tmp_result
select /*+mapjoin(b)*/ a.key, a.value, b.value
-from srcbucket_mapjoin a join srcbucket_mapjoin_part_2 b
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
on a.key=b.key and b.ds="2008-04-08";
select count(1) from bucketmapjoin_tmp_result;
@@ -37,7 +33,7 @@ select sum(hash(key)), sum(hash(value1))
set hive.optimize.bucketmapjoin = false;
insert overwrite table bucketmapjoin_tmp_result
select /*+mapjoin(b)*/ a.key, a.value, b.value
-from srcbucket_mapjoin a join srcbucket_mapjoin_part_2 b
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
on a.key=b.key and b.ds="2008-04-08";
select count(1) from bucketmapjoin_tmp_result;
@@ -52,12 +48,12 @@ set hive.optimize.bucketmapjoin = true;
explain extended
insert overwrite table bucketmapjoin_tmp_result
select /*+mapjoin(a)*/ a.key, a.value, b.value
-from srcbucket_mapjoin a join srcbucket_mapjoin_part_2 b
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
on a.key=b.key and b.ds="2008-04-08";
insert overwrite table bucketmapjoin_tmp_result
select /*+mapjoin(a)*/ a.key, a.value, b.value
-from srcbucket_mapjoin a join srcbucket_mapjoin_part_2 b
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
on a.key=b.key and b.ds="2008-04-08";
select count(1) from bucketmapjoin_tmp_result;
@@ -68,7 +64,7 @@ select sum(hash(key)), sum(hash(value1))
set hive.optimize.bucketmapjoin = false;
insert overwrite table bucketmapjoin_tmp_result
select /*+mapjoin(a)*/ a.key, a.value, b.value
-from srcbucket_mapjoin a join srcbucket_mapjoin_part_2 b
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
on a.key=b.key and b.ds="2008-04-08";
select count(1) from bucketmapjoin_tmp_result;
@@ -78,3 +74,39 @@ select sum(hash(key)), sum(hash(value1))
select a.key-b.key, a.value1-b.value1, a.value2-b.value2
from bucketmapjoin_hash_result_1 a left outer join bucketmapjoin_hash_result_2 b
on a.key = b.key;
+
+-- HIVE-3210
+load data local inpath '../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-09');
+load data local inpath '../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-09');
+
+set hive.optimize.bucketmapjoin = true;
+
+explain extended
+insert overwrite table bucketmapjoin_tmp_result
+select /*+mapjoin(b)*/ a.key, a.value, b.value
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
+on a.key=b.key;
+
+insert overwrite table bucketmapjoin_tmp_result
+select /*+mapjoin(b)*/ a.key, a.value, b.value
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
+on a.key=b.key;
+
+select count(1) from bucketmapjoin_tmp_result;
+insert overwrite table bucketmapjoin_hash_result_1
+select sum(hash(key)), sum(hash(value1)), sum(hash(value2)) from bucketmapjoin_tmp_result;
+
+set hive.optimize.bucketmapjoin = false;
+
+insert overwrite table bucketmapjoin_tmp_result
+select /*+mapjoin(b)*/ a.key, a.value, b.value
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
+on a.key=b.key;
+
+select count(1) from bucketmapjoin_tmp_result;
+insert overwrite table bucketmapjoin_hash_result_2
+select sum(hash(key)), sum(hash(value1)), sum(hash(value2)) from bucketmapjoin_tmp_result;
+
+select a.key-b.key, a.value1-b.value1, a.value2-b.value2
+from bucketmapjoin_hash_result_1 a left outer join bucketmapjoin_hash_result_2 b
+on a.key = b.key;