You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2012/07/17 09:42:49 UTC

svn commit: r1362391 [1/3] - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/optimizer/ java/org/apache/hadoop/hive/ql/parse/ test/queries/clientpositive/ test/results/clientpositive/

Author: namit
Date: Tue Jul 17 07:42:48 2012
New Revision: 1362391

URL: http://svn.apache.org/viewvc?rev=1362391&view=rev
Log:
HIVE-3210 Support Bucketed mapjoin on partitioned table which has two or more partitions
(Navis via namit)


Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java
    hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin2.q
    hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin2.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin_negative2.q.out

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java?rev=1362391&r1=1362390&r2=1362391&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java Tue Jul 17 07:42:48 2012
@@ -19,8 +19,9 @@ package org.apache.hadoop.hive.ql.optimi
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.net.URI;
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -141,6 +142,7 @@ public class BucketMapJoinOptimizer impl
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
 
@@ -179,11 +181,11 @@ public class BucketMapJoinOptimizer impl
       }
 
       MapJoinDesc mjDecs = mapJoinOp.getConf();
-      LinkedHashMap<String, Integer> aliasToBucketNumberMapping = new LinkedHashMap<String, Integer>();
-      LinkedHashMap<String, List<String>> aliasToBucketFileNamesMapping = new LinkedHashMap<String, List<String>>();
-      // right now this code does not work with "a join b on a.key = b.key and
-      // a.ds = b.ds", where ds is a partition column. It only works with joins
-      // with only one partition presents in each join source tables.
+      LinkedHashMap<String, List<Integer>> aliasToPartitionBucketNumberMapping =
+          new LinkedHashMap<String, List<Integer>>();
+      LinkedHashMap<String, List<List<String>>> aliasToPartitionBucketFileNamesMapping =
+          new LinkedHashMap<String, List<List<String>>>();
+
       Map<String, Operator<? extends Serializable>> topOps = this.pGraphContext.getTopOps();
       Map<TableScanOperator, Table> topToTable = this.pGraphContext.getTopToTable();
 
@@ -200,7 +202,7 @@ public class BucketMapJoinOptimizer impl
         }
         Table tbl = topToTable.get(tso);
         if(tbl.isPartitioned()) {
-          PrunedPartitionList prunedParts = null;
+          PrunedPartitionList prunedParts;
           try {
             prunedParts = pGraphContext.getOpToPartList().get(tso);
             if (prunedParts == null) {
@@ -214,109 +216,50 @@ public class BucketMapJoinOptimizer impl
             LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
             throw new SemanticException(e.getMessage(), e);
           }
-          int partNumber = prunedParts.getConfirmedPartns().size()
-              + prunedParts.getUnknownPartns().size();
-
-          if (partNumber > 1) {
-            // only allow one partition for small tables
-            if(alias != baseBigAlias) {
-              return null;
-            }
-            // here is the big table,and we get more than one partitions.
-            // construct a mapping of (Partition->bucket file names) and
-            // (Partition -> bucket number)
-            Iterator<Partition> iter = prunedParts.getConfirmedPartns()
-                .iterator();
-            while (iter.hasNext()) {
-              Partition p = iter.next();
+          List<Partition> partitions = prunedParts.getNotDeniedPartns();
+          // construct a mapping of (Partition->bucket file names) and (Partition -> bucket number)
+          if (partitions.size() >= 1) {
+            List<Integer> buckets = new ArrayList<Integer>();
+            List<List<String>> files = new ArrayList<List<String>>();
+            for (Partition p : partitions) {
               if (!checkBucketColumns(p.getBucketCols(), mjDecs, index)) {
                 return null;
               }
-              List<String> fileNames = getOnePartitionBucketFileNames(p);
-              bigTblPartsToBucketFileNames.put(p, fileNames);
-              bigTblPartsToBucketNumber.put(p, p.getBucketCount());
-            }
-            iter = prunedParts.getUnknownPartns().iterator();
-            while (iter.hasNext()) {
-              Partition p = iter.next();
-              if (!checkBucketColumns(p.getBucketCols(), mjDecs, index)) {
-                return null;
+              List<String> fileNames = getOnePartitionBucketFileNames(p.getDataLocation());
+              if (alias.equals(baseBigAlias)) {
+                bigTblPartsToBucketFileNames.put(p, fileNames);
+                bigTblPartsToBucketNumber.put(p, p.getBucketCount());
+              } else {
+                files.add(fileNames);
+                buckets.add(p.getBucketCount());
               }
-              List<String> fileNames = getOnePartitionBucketFileNames(p);
-              bigTblPartsToBucketFileNames.put(p, fileNames);
-              bigTblPartsToBucketNumber.put(p, p.getBucketCount());
             }
-            // If there are more than one partition for the big
-            // table,aliasToBucketFileNamesMapping and partsToBucketNumber will
-            // not contain mappings for the big table. Instead, the mappings are
-            // contained in bigTblPartsToBucketFileNames and
-            // bigTblPartsToBucketNumber
-
-          } else {
-            Partition part = null;
-            Iterator<Partition> iter = prunedParts.getConfirmedPartns()
-                .iterator();
-            if (iter.hasNext()) {
-              part = iter.next();
-            }
-            if (part == null) {
-              iter = prunedParts.getUnknownPartns().iterator();
-              if (iter.hasNext()) {
-                part = iter.next();
-              }
-            }
-            assert part != null;
-            Integer num = new Integer(part.getBucketCount());
-            aliasToBucketNumberMapping.put(alias, num);
-            if (!checkBucketColumns(part.getBucketCols(), mjDecs, index)) {
-              return null;
-            }
-            List<String> fileNames = getOnePartitionBucketFileNames(part);
-            aliasToBucketFileNamesMapping.put(alias, fileNames);
-            if (alias == baseBigAlias) {
-              bigTblPartsToBucketFileNames.put(part, fileNames);
-              bigTblPartsToBucketNumber.put(part, num);
+            if (!alias.equals(baseBigAlias)) {
+              aliasToPartitionBucketNumberMapping.put(alias, buckets);
+              aliasToPartitionBucketFileNamesMapping.put(alias, files);
             }
           }
         } else {
           if (!checkBucketColumns(tbl.getBucketCols(), mjDecs, index)) {
             return null;
           }
+          List<String> fileNames = getOnePartitionBucketFileNames(tbl.getDataLocation());
           Integer num = new Integer(tbl.getNumBuckets());
-          aliasToBucketNumberMapping.put(alias, num);
-          List<String> fileNames = new ArrayList<String>();
-          try {
-            FileSystem fs = FileSystem.get(tbl.getDataLocation(), this.pGraphContext.getConf());
-            FileStatus[] files = fs.listStatus(new Path(tbl.getDataLocation().toString()));
-            if(files != null) {
-              for(FileStatus file : files) {
-                fileNames.add(file.getPath().toString());
-              }
-            }
-          } catch (IOException e) {
-            throw new SemanticException(e);
+          if (alias.equals(baseBigAlias)) {
+            bigTblPartsToBucketFileNames.put(null, fileNames);
+            bigTblPartsToBucketNumber.put(null, tbl.getNumBuckets());
+          } else {
+            aliasToPartitionBucketNumberMapping.put(alias, Arrays.asList(num));
+            aliasToPartitionBucketFileNamesMapping.put(alias, Arrays.asList(fileNames));
           }
-          aliasToBucketFileNamesMapping.put(alias, fileNames);
         }
       }
 
       // All tables or partitions are bucketed, and their bucket number is
       // stored in 'bucketNumbers', we need to check if the number of buckets in
       // the big table can be divided by no of buckets in small tables.
-      if (bigTblPartsToBucketNumber.size() > 0) {
-        Iterator<Entry<Partition, Integer>> bigTblPartToBucketNumber = bigTblPartsToBucketNumber
-            .entrySet().iterator();
-        while (bigTblPartToBucketNumber.hasNext()) {
-          int bucketNumberInPart = bigTblPartToBucketNumber.next().getValue();
-          if (!checkBucketNumberAgainstBigTable(aliasToBucketNumberMapping,
-              bucketNumberInPart)) {
-            return null;
-          }
-        }
-      } else {
-        int bucketNoInBigTbl = aliasToBucketNumberMapping.get(baseBigAlias).intValue();
-        if (!checkBucketNumberAgainstBigTable(aliasToBucketNumberMapping,
-            bucketNoInBigTbl)) {
+      for (Integer bucketNumber : bigTblPartsToBucketNumber.values()) {
+        if (!checkBucketNumberAgainstBigTable(aliasToPartitionBucketNumberMapping, bucketNumber)) {
           return null;
         }
       }
@@ -327,13 +270,8 @@ public class BucketMapJoinOptimizer impl
         new LinkedHashMap<String, LinkedHashMap<String, ArrayList<String>>>();
 
       //sort bucket names for the big table
-      if(bigTblPartsToBucketNumber.size() > 0) {
-        Collection<List<String>> bucketNamesAllParts = bigTblPartsToBucketFileNames.values();
-        for(List<String> partBucketNames : bucketNamesAllParts) {
-          Collections.sort(partBucketNames);
-        }
-      } else {
-        Collections.sort(aliasToBucketFileNamesMapping.get(baseBigAlias));
+      for(List<String> partBucketNames : bigTblPartsToBucketFileNames.values()) {
+        Collections.sort(partBucketNames);
       }
 
       // go through all small tables and get the mapping from bucket file name
@@ -343,32 +281,28 @@ public class BucketMapJoinOptimizer impl
         if(alias.equals(baseBigAlias)) {
           continue;
         }
-        Collections.sort(aliasToBucketFileNamesMapping.get(alias));
+        for (List<String> names : aliasToPartitionBucketFileNamesMapping.get(alias)) {
+          Collections.sort(names);
+        }
+        List<Integer> smallTblBucketNums = aliasToPartitionBucketNumberMapping.get(alias);
+        List<List<String>> smallTblFilesList = aliasToPartitionBucketFileNamesMapping.get(alias);
+
         LinkedHashMap<String, ArrayList<String>> mapping = new LinkedHashMap<String, ArrayList<String>>();
         aliasBucketFileNameMapping.put(alias, mapping);
 
         // for each bucket file in big table, get the corresponding bucket file
         // name in the small table.
-        if (bigTblPartsToBucketNumber.size() > 0) {
-          //more than 1 partition in the big table, do the mapping for each partition
-          Iterator<Entry<Partition, List<String>>> bigTblPartToBucketNames = bigTblPartsToBucketFileNames
-              .entrySet().iterator();
-          Iterator<Entry<Partition, Integer>> bigTblPartToBucketNum = bigTblPartsToBucketNumber
-              .entrySet().iterator();
-          while (bigTblPartToBucketNames.hasNext()) {
-            assert bigTblPartToBucketNum.hasNext();
-            int bigTblBucketNum = bigTblPartToBucketNum.next().getValue().intValue();
-            List<String> bigTblBucketNameList = bigTblPartToBucketNames.next().getValue();
-            fillMapping(baseBigAlias, aliasToBucketNumberMapping,
-                aliasToBucketFileNamesMapping, alias, mapping, bigTblBucketNum,
-                bigTblBucketNameList, desc.getBucketFileNameMapping());
-          }
-        } else {
-          List<String> bigTblBucketNameList = aliasToBucketFileNamesMapping.get(baseBigAlias);
-          int bigTblBucketNum =  aliasToBucketNumberMapping.get(baseBigAlias);
-          fillMapping(baseBigAlias, aliasToBucketNumberMapping,
-              aliasToBucketFileNamesMapping, alias, mapping, bigTblBucketNum,
-              bigTblBucketNameList, desc.getBucketFileNameMapping());
+        //more than 1 partition in the big table, do the mapping for each partition
+        Iterator<Entry<Partition, List<String>>> bigTblPartToBucketNames =
+            bigTblPartsToBucketFileNames.entrySet().iterator();
+        Iterator<Entry<Partition, Integer>> bigTblPartToBucketNum = bigTblPartsToBucketNumber
+            .entrySet().iterator();
+        while (bigTblPartToBucketNames.hasNext()) {
+          assert bigTblPartToBucketNum.hasNext();
+          int bigTblBucketNum = bigTblPartToBucketNum.next().getValue();
+          List<String> bigTblBucketNameList = bigTblPartToBucketNames.next().getValue();
+          fillMapping(smallTblBucketNums, smallTblFilesList,
+              mapping, bigTblBucketNum, bigTblBucketNameList, desc.getBucketFileNameMapping());
         }
       }
       desc.setAliasBucketFileNameMapping(aliasBucketFileNameMapping);
@@ -376,61 +310,59 @@ public class BucketMapJoinOptimizer impl
       return null;
     }
 
-    private void fillMapping(String baseBigAlias,
-        LinkedHashMap<String, Integer> aliasToBucketNumberMapping,
-        LinkedHashMap<String, List<String>> aliasToBucketFileNamesMapping,
-        String alias, LinkedHashMap<String, ArrayList<String>> mapping,
+    // called for each partition of big table and populates mapping for each file in the partition
+    private void fillMapping(
+        List<Integer> smallTblBucketNums,
+        List<List<String>> smallTblFilesList,
+        LinkedHashMap<String, ArrayList<String>> mapping,
         int bigTblBucketNum, List<String> bigTblBucketNameList,
         LinkedHashMap<String, Integer> bucketFileNameMapping) {
-      for (int index = 0; index < bigTblBucketNameList.size(); index++) {
-        String inputBigTBLBucket = bigTblBucketNameList.get(index);
-        int smallTblBucketNum = aliasToBucketNumberMapping.get(alias);
+
+      for (int bindex = 0; bindex < bigTblBucketNameList.size(); bindex++) {
         ArrayList<String> resultFileNames = new ArrayList<String>();
-        if (bigTblBucketNum >= smallTblBucketNum) {
-          // if the big table has more buckets than the current small table,
-          // use "MOD" to get small table bucket names. For example, if the big
-          // table has 4 buckets and the small table has 2 buckets, then the
-          // mapping should be 0->0, 1->1, 2->0, 3->1.
-          int toAddSmallIndex = index % smallTblBucketNum;
-          if(toAddSmallIndex < aliasToBucketFileNamesMapping.get(alias).size()) {
-            resultFileNames.add(aliasToBucketFileNamesMapping.get(alias).get(toAddSmallIndex));
-          }
-        } else {
-          int jump = smallTblBucketNum / bigTblBucketNum;
-          List<String> bucketNames = aliasToBucketFileNamesMapping.get(alias);
-          for (int i = index; i < aliasToBucketFileNamesMapping.get(alias).size(); i = i + jump) {
-            if(i <= aliasToBucketFileNamesMapping.get(alias).size()) {
-              resultFileNames.add(bucketNames.get(i));
+        for (int sindex = 0 ; sindex < smallTblBucketNums.size(); sindex++) {
+          int smallTblBucketNum = smallTblBucketNums.get(sindex);
+          List<String> smallTblFileNames = smallTblFilesList.get(sindex);
+          if (bigTblBucketNum >= smallTblBucketNum) {
+            // if the big table has more buckets than the current small table,
+            // use "MOD" to get small table bucket names. For example, if the big
+            // table has 4 buckets and the small table has 2 buckets, then the
+            // mapping should be 0->0, 1->1, 2->0, 3->1.
+            int toAddSmallIndex = bindex % smallTblBucketNum;
+            resultFileNames.add(smallTblFileNames.get(toAddSmallIndex));
+          } else {
+            int jump = smallTblBucketNum / bigTblBucketNum;
+            for (int i = bindex; i < smallTblFileNames.size(); i = i + jump) {
+              resultFileNames.add(smallTblFileNames.get(i));
             }
           }
         }
+        String inputBigTBLBucket = bigTblBucketNameList.get(bindex);
         mapping.put(inputBigTBLBucket, resultFileNames);
-        bucketFileNameMapping.put(inputBigTBLBucket, index);
+        bucketFileNameMapping.put(inputBigTBLBucket, bindex);
       }
     }
 
     private boolean checkBucketNumberAgainstBigTable(
-        LinkedHashMap<String, Integer> aliasToBucketNumber,
-        int bucketNumberInPart) {
-      Iterator<Integer> iter = aliasToBucketNumber.values().iterator();
-      while(iter.hasNext()) {
-        int nxt = iter.next().intValue();
-        boolean ok = (nxt >= bucketNumberInPart) ? nxt % bucketNumberInPart == 0
-            : bucketNumberInPart % nxt == 0;
-        if(!ok) {
-          return false;
+        Map<String, List<Integer>> aliasToBucketNumber, int bucketNumberInPart) {
+      for (List<Integer> bucketNums : aliasToBucketNumber.values()) {
+        for (int nxt : bucketNums) {
+          boolean ok = (nxt >= bucketNumberInPart) ? nxt % bucketNumberInPart == 0
+              : bucketNumberInPart % nxt == 0;
+          if (!ok) {
+            return false;
+          }
         }
       }
       return true;
     }
 
-    private List<String> getOnePartitionBucketFileNames(Partition part)
+    private List<String> getOnePartitionBucketFileNames(URI location)
         throws SemanticException {
       List<String> fileNames = new ArrayList<String>();
       try {
-        FileSystem fs = FileSystem.get(part.getDataLocation(), this.pGraphContext.getConf());
-        FileStatus[] files = fs.listStatus(new Path(part.getDataLocation()
-            .toString()));
+        FileSystem fs = FileSystem.get(location, this.pGraphContext.getConf());
+        FileStatus[] files = fs.listStatus(new Path(location.toString()));
         if (files != null) {
           for (FileStatus file : files) {
             fileNames.add(file.getPath().toString());

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java?rev=1362391&r1=1362390&r2=1362391&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java Tue Jul 17 07:42:48 2012
@@ -229,6 +229,11 @@ public class SortedMergeBucketMapJoinOpt
           LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
           throw new SemanticException(e.getMessage(), e);
         }
+        int partNumber = prunedParts.getConfirmedPartns().size()
+              + prunedParts.getUnknownPartns().size();
+        if (partNumber > 1) {
+          return false;
+        }
         boolean ret = true;
         for (Partition p : prunedParts.getConfirmedPartns()) {
           ret = ret && checkSortColsAndJoinCols(p.getSortCols(), joinCols);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java?rev=1362391&r1=1362390&r2=1362391&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java Tue Jul 17 07:42:48 2012
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.ql.parse;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -76,6 +78,16 @@ public class PrunedPartitionList {
   }
 
   /**
+   * return all not-denied(confirmed + unknown) partitions.
+   */
+  public List<Partition> getNotDeniedPartns() {
+    List<Partition> partitions = new ArrayList<Partition>();
+    partitions.addAll(unknownPartns);
+    partitions.addAll(confirmedPartns);
+    return partitions;
+  }
+
+  /**
    * set confirmed partitions.
    * 
    * @param confirmedPartns

Modified: hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin2.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin2.q?rev=1362391&r1=1362390&r2=1362391&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin2.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin2.q Tue Jul 17 07:42:48 2012
@@ -1,7 +1,3 @@
-CREATE TABLE srcbucket_mapjoin(key int, value string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
-load data local inpath '../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin;
-load data local inpath '../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin;
-
 CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
 load data local inpath '../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
 load data local inpath '../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
@@ -21,12 +17,12 @@ create table bucketmapjoin_tmp_result (k
 explain extended
 insert overwrite table bucketmapjoin_tmp_result 
 select /*+mapjoin(b)*/ a.key, a.value, b.value 
-from srcbucket_mapjoin a join srcbucket_mapjoin_part_2 b 
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
 on a.key=b.key and b.ds="2008-04-08";
 
 insert overwrite table bucketmapjoin_tmp_result 
 select /*+mapjoin(b)*/ a.key, a.value, b.value 
-from srcbucket_mapjoin a join srcbucket_mapjoin_part_2 b 
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
 on a.key=b.key and b.ds="2008-04-08";
 
 select count(1) from bucketmapjoin_tmp_result;
@@ -37,7 +33,7 @@ select sum(hash(key)), sum(hash(value1))
 set hive.optimize.bucketmapjoin = false;
 insert overwrite table bucketmapjoin_tmp_result 
 select /*+mapjoin(b)*/ a.key, a.value, b.value 
-from srcbucket_mapjoin a join srcbucket_mapjoin_part_2 b 
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
 on a.key=b.key and b.ds="2008-04-08";
 
 select count(1) from bucketmapjoin_tmp_result;
@@ -52,12 +48,12 @@ set hive.optimize.bucketmapjoin = true;
 explain extended
 insert overwrite table bucketmapjoin_tmp_result 
 select /*+mapjoin(a)*/ a.key, a.value, b.value 
-from srcbucket_mapjoin a join srcbucket_mapjoin_part_2 b 
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
 on a.key=b.key and b.ds="2008-04-08";
 
 insert overwrite table bucketmapjoin_tmp_result 
 select /*+mapjoin(a)*/ a.key, a.value, b.value 
-from srcbucket_mapjoin a join srcbucket_mapjoin_part_2 b 
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
 on a.key=b.key and b.ds="2008-04-08";
 
 select count(1) from bucketmapjoin_tmp_result;
@@ -68,7 +64,7 @@ select sum(hash(key)), sum(hash(value1))
 set hive.optimize.bucketmapjoin = false;
 insert overwrite table bucketmapjoin_tmp_result 
 select /*+mapjoin(a)*/ a.key, a.value, b.value 
-from srcbucket_mapjoin a join srcbucket_mapjoin_part_2 b 
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
 on a.key=b.key and b.ds="2008-04-08";
 
 select count(1) from bucketmapjoin_tmp_result;
@@ -78,3 +74,39 @@ select sum(hash(key)), sum(hash(value1))
 select a.key-b.key, a.value1-b.value1, a.value2-b.value2
 from bucketmapjoin_hash_result_1 a left outer join bucketmapjoin_hash_result_2 b
 on a.key = b.key;
+
+-- HIVE-3210
+load data local inpath '../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-09');
+load data local inpath '../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-09');
+
+set hive.optimize.bucketmapjoin = true;
+
+explain extended
+insert overwrite table bucketmapjoin_tmp_result
+select /*+mapjoin(b)*/ a.key, a.value, b.value
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
+on a.key=b.key;
+
+insert overwrite table bucketmapjoin_tmp_result
+select /*+mapjoin(b)*/ a.key, a.value, b.value
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
+on a.key=b.key;
+
+select count(1) from bucketmapjoin_tmp_result;
+insert overwrite table bucketmapjoin_hash_result_1
+select sum(hash(key)), sum(hash(value1)), sum(hash(value2)) from bucketmapjoin_tmp_result;
+
+set hive.optimize.bucketmapjoin = false;
+
+insert overwrite table bucketmapjoin_tmp_result
+select /*+mapjoin(b)*/ a.key, a.value, b.value
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
+on a.key=b.key;
+
+select count(1) from bucketmapjoin_tmp_result;
+insert overwrite table bucketmapjoin_hash_result_2
+select sum(hash(key)), sum(hash(value1)), sum(hash(value2)) from bucketmapjoin_tmp_result;
+
+select a.key-b.key, a.value1-b.value1, a.value2-b.value2
+from bucketmapjoin_hash_result_1 a left outer join bucketmapjoin_hash_result_2 b
+on a.key = b.key;