You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2015/11/17 21:19:42 UTC
[31/43] hive git commit: HIVE-11525: Tez Bucket pruning (Gopal V,
reviewed by Sergey Shelukhin)
HIVE-11525: Tez Bucket pruning (Gopal V, reviewed by Sergey Shelukhin)
Signed-off-by: Gopal V <go...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/619ff6e9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/619ff6e9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/619ff6e9
Branch: refs/heads/master-fixed
Commit: 619ff6e99ab1814a31da52b743c4a7fc5d9b938a
Parents: 04d92dd
Author: Gopal V <go...@apache.org>
Authored: Thu Nov 12 18:35:50 2015 -0800
Committer: Owen O'Malley <om...@apache.org>
Committed: Tue Nov 17 12:18:33 2015 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 5 +
.../apache/hadoop/hive/ql/exec/Utilities.java | 28 +
.../hive/ql/exec/tez/HiveSplitGenerator.java | 28 +-
.../optimizer/FixedBucketPruningOptimizer.java | 319 +++
.../hadoop/hive/ql/optimizer/Optimizer.java | 7 +
.../hadoop/hive/ql/parse/GenTezUtils.java | 4 +
.../org/apache/hadoop/hive/ql/plan/MapWork.java | 14 +
.../hadoop/hive/ql/plan/TableScanDesc.java | 39 +
.../queries/clientpositive/bucketpruning1.q | 97 +
.../results/clientpositive/bucketpruning1.q.out | 2282 +++++++++++++++++
.../clientpositive/tez/bucketpruning1.q.out | 2360 ++++++++++++++++++
11 files changed, 5181 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/619ff6e9/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 7a8517b..01cd731 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2277,6 +2277,11 @@ public class HiveConf extends Configuration {
TEZ_MIN_PARTITION_FACTOR("hive.tez.min.partition.factor", 0.25f,
"When auto reducer parallelism is enabled this factor will be used to put a lower limit to the number\n" +
"of reducers that tez specifies."),
+ TEZ_OPTIMIZE_BUCKET_PRUNING(
+ "hive.tez.bucket.pruning", false,
+ "When pruning is enabled, filters on bucket columns will be processed by \n" +
+ "filtering the splits against a bitset of included buckets. This needs predicates \n"+
+ "produced by hive.optimize.ppd and hive.optimize.index.filters."),
TEZ_DYNAMIC_PARTITION_PRUNING(
"hive.tez.dynamic.partition.pruning", true,
"When dynamic pruning is enabled, joins on partition keys will be processed by sending\n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/619ff6e9/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index fc04f18..de2eb98 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -184,7 +184,9 @@ import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
@@ -1716,6 +1718,11 @@ public final class Utilities {
Pattern.compile("^(.*?\\(.*\\))?([0-9]+)$");
/**
+ * This breaks a prefixed bucket number out into a single integer
+ */
+ private static final Pattern PREFIXED_BUCKET_ID_REGEX =
+ Pattern.compile("^(0*([0-9]+))_([0-9]+).*");
+ /**
* Get the task id from the filename. It is assumed that the filename is derived from the output
* of getTaskId
*
@@ -2138,6 +2145,27 @@ public final class Utilities {
}
}
+ /* compute bucket id from from Split */
+ public static int parseSplitBucket(InputSplit split) {
+ if (split instanceof FileSplit) {
+ return getBucketIdFromFile(((FileSplit) split).getPath().getName());
+ }
+ // cannot get this for combined splits
+ return -1;
+ }
+
+ public static int getBucketIdFromFile(String bucketName) {
+ Matcher m = PREFIXED_BUCKET_ID_REGEX.matcher(bucketName);
+ if (m.matches()) {
+ if (m.group(2).isEmpty()) {
+ // all zeros
+ return m.group(1).isEmpty() ? -1 : 0;
+ }
+ return Integer.parseInt(m.group(2));
+ }
+ return -1;
+ }
+
public static String getNameMessage(Exception e) {
return e.getClass().getName() + "(" + e.getMessage() + ")";
}
http://git-wip-us.apache.org/repos/asf/hive/blob/619ff6e9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
index 2ab3328..c370381 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
@@ -19,11 +19,14 @@
package org.apache.hadoop.hive.ql.exec.tez;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.BitSet;
import java.util.Comparator;
import java.util.List;
import com.google.common.base.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -163,6 +166,10 @@ public class HiveSplitGenerator extends InputInitializer {
LOG.info("Number of input splits: " + splits.length + ". " + availableSlots
+ " available slots, " + waves + " waves. Input format is: " + realInputFormatName);
+ if (work.getIncludedBuckets() != null) {
+ splits = pruneBuckets(work, splits);
+ }
+
Multimap<Integer, InputSplit> groupedSplits =
splitGrouper.generateGroupedSplits(jobConf, conf, splits, waves, availableSlots);
// And finally return them in a flat array
@@ -190,8 +197,25 @@ public class HiveSplitGenerator extends InputInitializer {
}
}
-
-
+ private InputSplit[] pruneBuckets(MapWork work, InputSplit[] splits) {
+ final BitSet buckets = work.getIncludedBuckets();
+ final String bucketIn = buckets.toString();
+ List<InputSplit> filteredSplits = new ArrayList<InputSplit>(splits.length / 2);
+ for (InputSplit split : splits) {
+ final int bucket = Utilities.parseSplitBucket(split);
+ if (bucket < 0 || buckets.get(bucket)) {
+ // match or UNKNOWN
+ filteredSplits.add(split);
+ } else {
+ LOG.info("Pruning with IN ({}) - removing {}", bucketIn, split);
+ }
+ }
+ if (filteredSplits.size() < splits.length) {
+ // reallocate only if any filters pruned
+ splits = filteredSplits.toArray(new InputSplit[filteredSplits.size()]);
+ }
+ return splits;
+ }
private List<Event> createEventList(boolean sendSerializedEvents, InputSplitInfoMem inputSplitInfo) {
http://git-wip-us.apache.org/repos/asf/hive/blob/619ff6e9/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java
new file mode 100644
index 0000000..c036db0
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
+import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
+import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree.Operator;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.PrunerOperatorFactory.FilterPruner;
+import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Fixed bucket pruning optimizer goes through all the table scans and annotates them
+ * with a bucketing inclusion bit-set.
+ */
+public class FixedBucketPruningOptimizer implements Transform {
+
+ private static final Log LOG = LogFactory
+ .getLog(FixedBucketPruningOptimizer.class.getName());
+
+ public class NoopWalker implements NodeProcessor {
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ // do nothing
+ return null;
+ }
+ }
+
+ public class FixedBucketPartitionWalker extends FilterPruner {
+
+ @Override
+ protected void generatePredicate(NodeProcessorCtx procCtx,
+ FilterOperator fop, TableScanOperator top) throws SemanticException,
+ UDFArgumentException {
+ FixedBucketPruningOptimizerCtxt ctxt = ((FixedBucketPruningOptimizerCtxt) procCtx);
+ Table tbl = top.getConf().getTableMetadata();
+ if (tbl.getNumBuckets() > 0) {
+ final int nbuckets = tbl.getNumBuckets();
+ ctxt.setNumBuckets(nbuckets);
+ ctxt.setBucketCols(tbl.getBucketCols());
+ ctxt.setSchema(tbl.getFields());
+ if (tbl.isPartitioned()) {
+ // Run partition pruner to get partitions
+ ParseContext parseCtx = ctxt.pctx;
+ PrunedPartitionList prunedPartList;
+ try {
+ String alias = (String) parseCtx.getTopOps().keySet().toArray()[0];
+ prunedPartList = PartitionPruner.prune(top, parseCtx, alias);
+ } catch (HiveException e) {
+ throw new SemanticException(e.getMessage(), e);
+ }
+ if (prunedPartList != null) {
+ ctxt.setPartitions(prunedPartList);
+ for (Partition p : prunedPartList.getPartitions()) {
+ if (nbuckets != p.getBucketCount()) {
+ // disable feature
+ ctxt.setNumBuckets(-1);
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public static class BucketBitsetGenerator extends FilterPruner {
+
+ @Override
+ protected void generatePredicate(NodeProcessorCtx procCtx,
+ FilterOperator fop, TableScanOperator top) throws SemanticException,
+ UDFArgumentException {
+ FixedBucketPruningOptimizerCtxt ctxt = ((FixedBucketPruningOptimizerCtxt) procCtx);
+ if (ctxt.getNumBuckets() <= 0 || ctxt.getBucketCols().size() != 1) {
+ // bucketing isn't consistent or there are >1 bucket columns
+ // optimizer does not extract multiple column predicates for this
+ return;
+ }
+ ExprNodeGenericFuncDesc filter = top.getConf().getFilterExpr();
+ if (filter == null) {
+ return;
+ }
+ // the sargs are closely tied to hive.optimize.index.filter
+ SearchArgument sarg = ConvertAstToSearchArg.create(filter);
+ if (sarg == null) {
+ return;
+ }
+ final String bucketCol = ctxt.getBucketCols().get(0);
+ StructField bucketField = null;
+ for (StructField fs : ctxt.getSchema()) {
+ if(fs.getFieldName().equals(bucketCol)) {
+ bucketField = fs;
+ }
+ }
+ Preconditions.checkArgument(bucketField != null);
+ List<Object> literals = new ArrayList<Object>();
+ List<PredicateLeaf> leaves = sarg.getLeaves();
+ Set<PredicateLeaf> bucketLeaves = new HashSet<PredicateLeaf>();
+ for (PredicateLeaf l : leaves) {
+ if (bucketCol.equals(l.getColumnName())) {
+ switch (l.getOperator()) {
+ case EQUALS:
+ case IN:
+ // supported
+ break;
+ case IS_NULL:
+ // TODO: (a = 1) and NOT (a is NULL) can be potentially folded earlier into a NO-OP
+ // fall through
+ case BETWEEN:
+ // TODO: for ordinal types you can produce a range (BETWEEN 1444442100 1444442107)
+ // fall through
+ default:
+ // cannot optimize any others
+ return;
+ }
+ bucketLeaves.add(l);
+ }
+ }
+ if (bucketLeaves.size() == 0) {
+ return;
+ }
+ // TODO: Add support for AND clauses under OR clauses
+ // first-cut takes a known minimal tree and no others.
+ // $expr = (a=1)
+ // (a=1 or a=2)
+ // (a in (1,2))
+ // ($expr and *)
+ // (* and $expr)
+ ExpressionTree expr = sarg.getExpression();
+ if (expr.getOperator() == Operator.LEAF) {
+ PredicateLeaf l = leaves.get(expr.getLeaf());
+ if (!addLiteral(literals, l)) {
+ return;
+ }
+ } else if (expr.getOperator() == Operator.AND) {
+ boolean found = false;
+ for (ExpressionTree subExpr : expr.getChildren()) {
+ if (subExpr.getOperator() != Operator.LEAF) {
+ return;
+ }
+ // one of the branches is definitely a bucket-leaf
+ PredicateLeaf l = leaves.get(subExpr.getLeaf());
+ if (bucketLeaves.contains(l)) {
+ if (!addLiteral(literals, l)) {
+ return;
+ }
+ found = true;
+ }
+ }
+ if (!found) {
+ return;
+ }
+ } else if (expr.getOperator() == Operator.OR) {
+ for (ExpressionTree subExpr : expr.getChildren()) {
+ if (subExpr.getOperator() != Operator.LEAF) {
+ return;
+ }
+ PredicateLeaf l = leaves.get(subExpr.getLeaf());
+ if (bucketLeaves.contains(l)) {
+ if (!addLiteral(literals, l)) {
+ return;
+ }
+ } else {
+ // all of the OR branches need to be bucket-leaves
+ return;
+ }
+ }
+ }
+ // invariant: bucket-col IN literals of type bucketField
+ BitSet bs = new BitSet(ctxt.getNumBuckets());
+ bs.clear();
+ PrimitiveObjectInspector bucketOI = (PrimitiveObjectInspector)bucketField.getFieldObjectInspector();
+ PrimitiveObjectInspector constOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(bucketOI.getPrimitiveCategory());
+ for (Object literal: literals) {
+ PrimitiveObjectInspector origOI = PrimitiveObjectInspectorFactory.getPrimitiveObjectInspectorFromClass(literal.getClass());
+ Converter conv = ObjectInspectorConverters.getConverter(origOI, constOI);
+ // exact type conversion or get out
+ if (conv == null) {
+ return;
+ }
+ Object convCols[] = new Object[] {conv.convert(literal)};
+ int n = ObjectInspectorUtils.getBucketNumber(convCols, new ObjectInspector[]{constOI}, ctxt.getNumBuckets());
+ bs.set(n);
+ }
+ if (bs.cardinality() < ctxt.getNumBuckets()) {
+ // there is a valid bucket pruning filter
+ top.getConf().setIncludedBuckets(bs);
+ top.getConf().setNumBuckets(ctxt.getNumBuckets());
+ }
+ }
+
+ private boolean addLiteral(List<Object> literals, PredicateLeaf leaf) {
+ switch (leaf.getOperator()) {
+ case EQUALS:
+ return literals.add(leaf.getLiteral());
+ case IN:
+ return literals.addAll(leaf.getLiteralList());
+ default:
+ return false;
+ }
+ }
+ }
+
+ public final class FixedBucketPruningOptimizerCtxt implements
+ NodeProcessorCtx {
+ public final ParseContext pctx;
+ private int numBuckets;
+ private PrunedPartitionList partitions;
+ private List<String> bucketCols;
+ private List<StructField> schema;
+
+ public FixedBucketPruningOptimizerCtxt(ParseContext pctx) {
+ this.pctx = pctx;
+ }
+
+ public void setSchema(ArrayList<StructField> fields) {
+ this.schema = fields;
+ }
+
+ public List<StructField> getSchema() {
+ return this.schema;
+ }
+
+ public void setBucketCols(List<String> bucketCols) {
+ this.bucketCols = bucketCols;
+ }
+
+ public List<String> getBucketCols() {
+ return this.bucketCols;
+ }
+
+ public void setPartitions(PrunedPartitionList partitions) {
+ this.partitions = partitions;
+ }
+
+ public PrunedPartitionList getPartitions() {
+ return this.partitions;
+ }
+
+ public int getNumBuckets() {
+ return numBuckets;
+ }
+
+ public void setNumBuckets(int numBuckets) {
+ this.numBuckets = numBuckets;
+ }
+ }
+
+ @Override
+ public ParseContext transform(ParseContext pctx) throws SemanticException {
+ // create a the context for walking operators
+ FixedBucketPruningOptimizerCtxt opPartWalkerCtx = new FixedBucketPruningOptimizerCtxt(
+ pctx);
+
+ // Retrieve all partitions generated from partition pruner and partition
+ // column pruner
+ PrunerUtils.walkOperatorTree(pctx, opPartWalkerCtx,
+ new FixedBucketPartitionWalker(), new NoopWalker());
+
+ if (opPartWalkerCtx.getNumBuckets() < 0) {
+ // bail out
+ return pctx;
+ } else {
+ // walk operator tree to create expression tree for filter buckets
+ PrunerUtils.walkOperatorTree(pctx, opPartWalkerCtx,
+ new BucketBitsetGenerator(), new NoopWalker());
+ }
+
+ return pctx;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/619ff6e9/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
index 6347872..4a7fc0d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
@@ -166,6 +166,13 @@ public class Optimizer {
transformations.add(new JoinReorder());
}
+ if (HiveConf.getBoolVar(hiveConf,
+ HiveConf.ConfVars.TEZ_OPTIMIZE_BUCKET_PRUNING)
+ && HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)
+ && HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTINDEXFILTER)) {
+ transformations.add(new FixedBucketPruningOptimizer());
+ }
+
if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.DYNAMICPARTITIONING) &&
HiveConf.getVar(hiveConf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equals("nonstrict") &&
HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTDYNAMICPARTITION) &&
http://git-wip-us.apache.org/repos/asf/hive/blob/619ff6e9/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index c5f7426..27d7276 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -184,6 +184,10 @@ public class GenTezUtils {
mapWork.setDummyTableScan(true);
}
+ if (ts.getConf().getNumBuckets() > 0) {
+ mapWork.setIncludedBuckets(ts.getConf().getIncludedBuckets());
+ }
+
// add new item to the tez work
tezWork.add(mapWork);
http://git-wip-us.apache.org/repos/asf/hive/blob/619ff6e9/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index d349934..f4e5873 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.plan;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.BitSet;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
@@ -131,6 +132,10 @@ public class MapWork extends BaseWork {
private boolean doSplitsGrouping = true;
+ // bitsets can't be correctly serialized by Kryo's default serializer
+ // BitSet::wordsInUse is transient, so force dumping into a lower form
+ private byte[] includedBuckets;
+
/** Whether LLAP IO will be used for inputs. */
private String llapIoDesc;
@@ -617,4 +622,13 @@ public class MapWork extends BaseWork {
public void setMapAliases(List<String> mapAliases) {
this.mapAliases = mapAliases;
}
+
+ public BitSet getIncludedBuckets() {
+ return includedBuckets != null ? BitSet.valueOf(includedBuckets) : null;
+ }
+
+ public void setIncludedBuckets(BitSet includedBuckets) {
+ // see comment next to the field
+ this.includedBuckets = includedBuckets.toByteArray();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/619ff6e9/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
index be7139c..b354f98 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.plan;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.List;
import java.util.Map;
@@ -107,6 +108,10 @@ public class TableScanDesc extends AbstractOperatorDesc {
private transient Table tableMetadata;
+ private BitSet includedBuckets;
+
+ private int numBuckets = -1;
+
public TableScanDesc() {
this(null, null);
}
@@ -319,4 +324,38 @@ public class TableScanDesc extends AbstractOperatorDesc {
public void setSerializedFilterObject(String serializedFilterObject) {
this.serializedFilterObject = serializedFilterObject;
}
+
+ public void setIncludedBuckets(BitSet bitset) {
+ this.includedBuckets = bitset;
+ }
+
+ public BitSet getIncludedBuckets() {
+ return this.includedBuckets;
+ }
+
+ @Explain(displayName = "buckets included", explainLevels = { Level.EXTENDED })
+ public String getIncludedBucketExplain() {
+ if (this.includedBuckets == null) {
+ return null;
+ }
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ for (int i = 0; i < this.includedBuckets.size(); i++) {
+ if (this.includedBuckets.get(i)) {
+ sb.append(i);
+ sb.append(',');
+ }
+ }
+ sb.append(String.format("] of %d", numBuckets));
+ return sb.toString();
+ }
+
+ public int getNumBuckets() {
+ return numBuckets;
+ }
+
+ public void setNumBuckets(int numBuckets) {
+ this.numBuckets = numBuckets;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/619ff6e9/ql/src/test/queries/clientpositive/bucketpruning1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/bucketpruning1.q b/ql/src/test/queries/clientpositive/bucketpruning1.q
new file mode 100644
index 0000000..be403a5
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/bucketpruning1.q
@@ -0,0 +1,97 @@
+set hive.optimize.ppd=true;
+set hive.optimize.index.filter=true;
+set hive.tez.bucket.pruning=true;
+set hive.explain.user=false;
+set hive.fetch.task.conversion=none;
+
+CREATE TABLE srcbucket_pruned(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 16 BUCKETS STORED AS TEXTFILE;
+
+-- cannot prune 2-key scenarios without a smarter optimizer
+CREATE TABLE srcbucket_unpruned(key int, value string) partitioned by (ds string) CLUSTERED BY (key,value) INTO 16 BUCKETS STORED AS TEXTFILE;
+
+-- good cases
+
+explain extended
+select * from srcbucket_pruned where key = 1;
+
+explain extended
+select * from srcbucket_pruned where key = 16;
+
+explain extended
+select * from srcbucket_pruned where key = 17;
+
+explain extended
+select * from srcbucket_pruned where key = 16+1;
+
+explain extended
+select * from srcbucket_pruned where key = '11';
+
+explain extended
+select * from srcbucket_pruned where key = 1 and ds='2008-04-08';
+
+explain extended
+select * from srcbucket_pruned where key = 1 and ds='2008-04-08' and value='One';
+
+explain extended
+select * from srcbucket_pruned where value='One' and key = 1 and ds='2008-04-08';
+
+explain extended
+select * from srcbucket_pruned where key in (2,3);
+
+explain extended
+select * from srcbucket_pruned where key in (2,3) and ds='2008-04-08';
+
+explain extended
+select * from srcbucket_pruned where key in (2,3) and ds='2008-04-08' and value='One';
+
+explain extended
+select * from srcbucket_pruned where value='One' and key in (2,3) and ds='2008-04-08';
+
+explain extended
+select * from srcbucket_pruned where (key=1 or key=2) and ds='2008-04-08';
+
+explain extended
+select * from srcbucket_pruned where (key=1 or key=2) and value = 'One' and ds='2008-04-08';
+
+-- valid but irrelevant case (all buckets selected)
+
+explain extended
+select * from srcbucket_pruned where key in (1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17);
+
+explain extended
+select * from srcbucket_pruned where key in (1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17) and ds='2008-04-08';
+
+explain extended
+select * from srcbucket_pruned where key in (1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17) and ds='2008-04-08' and value='One';
+
+explain extended
+select * from srcbucket_pruned where value='One' and key in (1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17) and ds='2008-04-08';
+
+-- valid, but unimplemented cases
+
+explain extended
+select * from srcbucket_pruned where key = 1 and ds='2008-04-08' or key = 2;
+
+explain extended
+select * from srcbucket_pruned where key = 1 and ds='2008-04-08' and (value='One' or value = 'Two');
+
+explain extended
+select * from srcbucket_pruned where key = 1 or value = "One" or key = 2 and value = "Two";
+
+-- Invalid cases
+
+explain extended
+select * from srcbucket_pruned where key = 'x11';
+
+explain extended
+select * from srcbucket_pruned where key = 1 or value = "One";
+
+explain extended
+select * from srcbucket_pruned where key = 1 or value = "One" or key = 2;
+
+explain extended
+select * from srcbucket_unpruned where key in (3, 5);
+
+explain extended
+select * from srcbucket_unpruned where key = 1;
+