You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2015/11/17 21:19:42 UTC

[31/43] hive git commit: HIVE-11525: Tez Bucket pruning (Gopal V, reviewed by Sergey Shelukhin)

HIVE-11525: Tez Bucket pruning (Gopal V, reviewed by Sergey Shelukhin)

Signed-off-by: Gopal V <go...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/619ff6e9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/619ff6e9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/619ff6e9

Branch: refs/heads/master-fixed
Commit: 619ff6e99ab1814a31da52b743c4a7fc5d9b938a
Parents: 04d92dd
Author: Gopal V <go...@apache.org>
Authored: Thu Nov 12 18:35:50 2015 -0800
Committer: Owen O'Malley <om...@apache.org>
Committed: Tue Nov 17 12:18:33 2015 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |    5 +
 .../apache/hadoop/hive/ql/exec/Utilities.java   |   28 +
 .../hive/ql/exec/tez/HiveSplitGenerator.java    |   28 +-
 .../optimizer/FixedBucketPruningOptimizer.java  |  319 +++
 .../hadoop/hive/ql/optimizer/Optimizer.java     |    7 +
 .../hadoop/hive/ql/parse/GenTezUtils.java       |    4 +
 .../org/apache/hadoop/hive/ql/plan/MapWork.java |   14 +
 .../hadoop/hive/ql/plan/TableScanDesc.java      |   39 +
 .../queries/clientpositive/bucketpruning1.q     |   97 +
 .../results/clientpositive/bucketpruning1.q.out | 2282 +++++++++++++++++
 .../clientpositive/tez/bucketpruning1.q.out     | 2360 ++++++++++++++++++
 11 files changed, 5181 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/619ff6e9/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 7a8517b..01cd731 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2277,6 +2277,11 @@ public class HiveConf extends Configuration {
     TEZ_MIN_PARTITION_FACTOR("hive.tez.min.partition.factor", 0.25f,
         "When auto reducer parallelism is enabled this factor will be used to put a lower limit to the number\n" +
         "of reducers that tez specifies."),
+    TEZ_OPTIMIZE_BUCKET_PRUNING(
+        "hive.tez.bucket.pruning", false,
+         "When pruning is enabled, filters on bucket columns will be processed by \n" +
+         "filtering the splits against a bitset of included buckets. This needs predicates \n"+
+         "produced by hive.optimize.ppd and hive.optimize.index.filters."),
     TEZ_DYNAMIC_PARTITION_PRUNING(
         "hive.tez.dynamic.partition.pruning", true,
         "When dynamic pruning is enabled, joins on partition keys will be processed by sending\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/619ff6e9/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index fc04f18..de2eb98 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -184,7 +184,9 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
@@ -1716,6 +1718,11 @@ public final class Utilities {
       Pattern.compile("^(.*?\\(.*\\))?([0-9]+)$");
 
   /**
+   * This breaks a prefixed bucket number out into a single integer
+   */
+  private static final Pattern PREFIXED_BUCKET_ID_REGEX =
+      Pattern.compile("^(0*([0-9]+))_([0-9]+).*");
+  /**
    * Get the task id from the filename. It is assumed that the filename is derived from the output
    * of getTaskId
    *
@@ -2138,6 +2145,27 @@ public final class Utilities {
     }
   }
 
+  /* compute bucket id from from Split */
+  public static int parseSplitBucket(InputSplit split) {
+    if (split instanceof FileSplit) {
+      return getBucketIdFromFile(((FileSplit) split).getPath().getName());
+    }
+    // cannot get this for combined splits
+    return -1;
+  }
+
+  public static int getBucketIdFromFile(String bucketName) {
+    Matcher m = PREFIXED_BUCKET_ID_REGEX.matcher(bucketName);
+    if (m.matches()) {
+      if (m.group(2).isEmpty()) {
+        // all zeros
+        return m.group(1).isEmpty() ? -1 : 0;
+      }
+      return Integer.parseInt(m.group(2));
+    }
+    return -1;
+  }
+
   public static String getNameMessage(Exception e) {
     return e.getClass().getName() + "(" + e.getMessage() + ")";
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/619ff6e9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
index 2ab3328..c370381 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
@@ -19,11 +19,14 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.Comparator;
 import java.util.List;
 
 import com.google.common.base.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -163,6 +166,10 @@ public class HiveSplitGenerator extends InputInitializer {
         LOG.info("Number of input splits: " + splits.length + ". " + availableSlots
             + " available slots, " + waves + " waves. Input format is: " + realInputFormatName);
 
+        if (work.getIncludedBuckets() != null) {
+          splits = pruneBuckets(work, splits);
+        }
+
         Multimap<Integer, InputSplit> groupedSplits =
             splitGrouper.generateGroupedSplits(jobConf, conf, splits, waves, availableSlots);
         // And finally return them in a flat array
@@ -190,8 +197,25 @@ public class HiveSplitGenerator extends InputInitializer {
     }
   }
 
-
-
+  private InputSplit[] pruneBuckets(MapWork work, InputSplit[] splits) {
+    final BitSet buckets = work.getIncludedBuckets();
+    final String bucketIn = buckets.toString();
+    List<InputSplit> filteredSplits = new ArrayList<InputSplit>(splits.length / 2);
+    for (InputSplit split : splits) {
+      final int bucket = Utilities.parseSplitBucket(split);
+      if (bucket < 0 || buckets.get(bucket)) {
+        // match or UNKNOWN
+        filteredSplits.add(split);
+      } else {
+        LOG.info("Pruning with IN ({}) - removing {}", bucketIn, split);
+      }
+    }
+    if (filteredSplits.size() < splits.length) {
+      // reallocate only if any filters pruned
+      splits = filteredSplits.toArray(new InputSplit[filteredSplits.size()]);
+    }
+    return splits;
+  }
 
   private List<Event> createEventList(boolean sendSerializedEvents, InputSplitInfoMem inputSplitInfo) {
 

http://git-wip-us.apache.org/repos/asf/hive/blob/619ff6e9/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java
new file mode 100644
index 0000000..c036db0
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
+import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
+import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree.Operator;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.PrunerOperatorFactory.FilterPruner;
+import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Fixed bucket pruning optimizer goes through all the table scans and annotates them
+ * with a bucketing inclusion bit-set.
+ */
+public class FixedBucketPruningOptimizer implements Transform {
+
+  private static final Log LOG = LogFactory
+      .getLog(FixedBucketPruningOptimizer.class.getName());
+
+  public class NoopWalker implements NodeProcessor {
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      // do nothing
+      return null;
+    }
+  }
+
+  public class FixedBucketPartitionWalker extends FilterPruner {
+
+    @Override
+    protected void generatePredicate(NodeProcessorCtx procCtx,
+        FilterOperator fop, TableScanOperator top) throws SemanticException,
+        UDFArgumentException {
+      FixedBucketPruningOptimizerCtxt ctxt = ((FixedBucketPruningOptimizerCtxt) procCtx);
+      Table tbl = top.getConf().getTableMetadata();
+      if (tbl.getNumBuckets() > 0) {
+        final int nbuckets = tbl.getNumBuckets();
+        ctxt.setNumBuckets(nbuckets);
+        ctxt.setBucketCols(tbl.getBucketCols());
+        ctxt.setSchema(tbl.getFields());
+        if (tbl.isPartitioned()) {
+          // Run partition pruner to get partitions
+          ParseContext parseCtx = ctxt.pctx;
+          PrunedPartitionList prunedPartList;
+          try {
+            String alias = (String) parseCtx.getTopOps().keySet().toArray()[0];
+            prunedPartList = PartitionPruner.prune(top, parseCtx, alias);
+          } catch (HiveException e) {
+            throw new SemanticException(e.getMessage(), e);
+          }
+          if (prunedPartList != null) {
+            ctxt.setPartitions(prunedPartList);
+            for (Partition p : prunedPartList.getPartitions()) {
+              if (nbuckets != p.getBucketCount()) {
+                // disable feature
+                ctxt.setNumBuckets(-1);
+                break;
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  public static class BucketBitsetGenerator extends FilterPruner {
+
+    @Override
+    protected void generatePredicate(NodeProcessorCtx procCtx,
+        FilterOperator fop, TableScanOperator top) throws SemanticException,
+        UDFArgumentException {
+      FixedBucketPruningOptimizerCtxt ctxt = ((FixedBucketPruningOptimizerCtxt) procCtx);
+      if (ctxt.getNumBuckets() <= 0 || ctxt.getBucketCols().size() != 1) {
+        // bucketing isn't consistent or there are >1 bucket columns
+        // optimizer does not extract multiple column predicates for this
+        return;
+      }
+      ExprNodeGenericFuncDesc filter = top.getConf().getFilterExpr();
+      if (filter == null) {
+        return;
+      }
+      // the sargs are closely tied to hive.optimize.index.filter
+      SearchArgument sarg = ConvertAstToSearchArg.create(filter);
+      if (sarg == null) {
+        return;
+      }
+      final String bucketCol = ctxt.getBucketCols().get(0);
+      StructField bucketField = null;
+      for (StructField fs : ctxt.getSchema()) {
+        if(fs.getFieldName().equals(bucketCol)) {
+          bucketField = fs;
+        }
+      }
+      Preconditions.checkArgument(bucketField != null);
+      List<Object> literals = new ArrayList<Object>();
+      List<PredicateLeaf> leaves = sarg.getLeaves();
+      Set<PredicateLeaf> bucketLeaves = new HashSet<PredicateLeaf>();
+      for (PredicateLeaf l : leaves) {
+        if (bucketCol.equals(l.getColumnName())) {
+          switch (l.getOperator()) {
+          case EQUALS:
+          case IN:
+            // supported
+            break;
+          case IS_NULL:
+            // TODO: (a = 1) and NOT (a is NULL) can be potentially folded earlier into a NO-OP
+            // fall through
+          case BETWEEN:
+            // TODO: for ordinal types you can produce a range (BETWEEN 1444442100 1444442107)
+            // fall through
+          default:
+            // cannot optimize any others
+            return;
+          }
+          bucketLeaves.add(l);
+        }
+      }
+      if (bucketLeaves.size() == 0) {
+        return;
+      }
+      // TODO: Add support for AND clauses under OR clauses
+      // first-cut takes a known minimal tree and no others.
+      // $expr = (a=1)
+      //         (a=1 or a=2)
+      //         (a in (1,2))
+      //         ($expr and *)
+      //         (* and $expr)
+      ExpressionTree expr = sarg.getExpression();
+      if (expr.getOperator() == Operator.LEAF) {
+        PredicateLeaf l = leaves.get(expr.getLeaf());
+        if (!addLiteral(literals, l)) {
+          return;
+        }
+      } else if (expr.getOperator() == Operator.AND) {
+        boolean found = false;
+        for (ExpressionTree subExpr : expr.getChildren()) {
+          if (subExpr.getOperator() != Operator.LEAF) {
+            return;
+          }
+          // one of the branches is definitely a bucket-leaf
+          PredicateLeaf l = leaves.get(subExpr.getLeaf());
+          if (bucketLeaves.contains(l)) {
+            if (!addLiteral(literals, l)) {
+              return;
+            }
+            found = true;
+          }
+        }
+        if (!found) {
+          return;
+        }
+      } else if (expr.getOperator() == Operator.OR) {
+        for (ExpressionTree subExpr : expr.getChildren()) {
+          if (subExpr.getOperator() != Operator.LEAF) {
+            return;
+          }
+          PredicateLeaf l = leaves.get(subExpr.getLeaf());
+          if (bucketLeaves.contains(l)) {
+            if (!addLiteral(literals, l)) {
+              return;
+            }
+          } else {
+            // all of the OR branches need to be bucket-leaves
+            return;
+          }
+        }
+      }
+      // invariant: bucket-col IN literals of type bucketField
+      BitSet bs = new BitSet(ctxt.getNumBuckets());
+      bs.clear();
+      PrimitiveObjectInspector bucketOI = (PrimitiveObjectInspector)bucketField.getFieldObjectInspector();
+      PrimitiveObjectInspector constOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(bucketOI.getPrimitiveCategory());
+      for (Object literal: literals) {
+        PrimitiveObjectInspector origOI = PrimitiveObjectInspectorFactory.getPrimitiveObjectInspectorFromClass(literal.getClass());
+        Converter conv = ObjectInspectorConverters.getConverter(origOI, constOI);
+        // exact type conversion or get out
+        if (conv == null) {
+          return;
+        }
+        Object convCols[] = new Object[] {conv.convert(literal)};
+        int n = ObjectInspectorUtils.getBucketNumber(convCols, new ObjectInspector[]{constOI}, ctxt.getNumBuckets());
+        bs.set(n);
+      }
+      if (bs.cardinality() < ctxt.getNumBuckets()) {
+        // there is a valid bucket pruning filter
+        top.getConf().setIncludedBuckets(bs);
+        top.getConf().setNumBuckets(ctxt.getNumBuckets());
+      }
+    }
+
+    private boolean addLiteral(List<Object> literals, PredicateLeaf leaf) {
+      switch (leaf.getOperator()) {
+      case EQUALS:
+        return literals.add(leaf.getLiteral());
+      case IN:
+        return literals.addAll(leaf.getLiteralList());
+      default:
+        return false;
+      }
+    }
+  }
+
+  public final class FixedBucketPruningOptimizerCtxt implements
+      NodeProcessorCtx {
+    public final ParseContext pctx;
+    private int numBuckets;
+    private PrunedPartitionList partitions;
+    private List<String> bucketCols;
+    private List<StructField> schema;
+
+    public FixedBucketPruningOptimizerCtxt(ParseContext pctx) {
+      this.pctx = pctx;
+    }
+
+    public void setSchema(ArrayList<StructField> fields) {
+      this.schema = fields;
+    }
+
+    public List<StructField> getSchema() {
+      return this.schema;
+    }
+
+    public void setBucketCols(List<String> bucketCols) {
+      this.bucketCols = bucketCols;
+    }
+
+    public List<String> getBucketCols() {
+      return this.bucketCols;
+    }
+
+    public void setPartitions(PrunedPartitionList partitions) {
+      this.partitions = partitions;
+    }
+
+    public PrunedPartitionList getPartitions() {
+      return this.partitions;
+    }
+
+    public int getNumBuckets() {
+      return numBuckets;
+    }
+
+    public void setNumBuckets(int numBuckets) {
+      this.numBuckets = numBuckets;
+    }
+  }
+
+  @Override
+  public ParseContext transform(ParseContext pctx) throws SemanticException {
+    // create a the context for walking operators
+    FixedBucketPruningOptimizerCtxt opPartWalkerCtx = new FixedBucketPruningOptimizerCtxt(
+        pctx);
+
+    // Retrieve all partitions generated from partition pruner and partition
+    // column pruner
+    PrunerUtils.walkOperatorTree(pctx, opPartWalkerCtx,
+        new FixedBucketPartitionWalker(), new NoopWalker());
+
+    if (opPartWalkerCtx.getNumBuckets() < 0) {
+      // bail out
+      return pctx;
+    } else {
+      // walk operator tree to create expression tree for filter buckets
+      PrunerUtils.walkOperatorTree(pctx, opPartWalkerCtx,
+          new BucketBitsetGenerator(), new NoopWalker());
+    }
+
+    return pctx;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/619ff6e9/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
index 6347872..4a7fc0d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
@@ -166,6 +166,13 @@ public class Optimizer {
       transformations.add(new JoinReorder());
     }
 
+    if (HiveConf.getBoolVar(hiveConf,
+        HiveConf.ConfVars.TEZ_OPTIMIZE_BUCKET_PRUNING)
+        && HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)
+        && HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTINDEXFILTER)) {
+      transformations.add(new FixedBucketPruningOptimizer());
+    }
+
     if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.DYNAMICPARTITIONING) &&
         HiveConf.getVar(hiveConf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equals("nonstrict") &&
         HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTDYNAMICPARTITION) &&

http://git-wip-us.apache.org/repos/asf/hive/blob/619ff6e9/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index c5f7426..27d7276 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -184,6 +184,10 @@ public class GenTezUtils {
       mapWork.setDummyTableScan(true);
     }
 
+    if (ts.getConf().getNumBuckets() > 0) {
+      mapWork.setIncludedBuckets(ts.getConf().getIncludedBuckets());
+    }
+
     // add new item to the tez work
     tezWork.add(mapWork);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/619ff6e9/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index d349934..f4e5873 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.plan;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -131,6 +132,10 @@ public class MapWork extends BaseWork {
 
   private boolean doSplitsGrouping = true;
 
+  // bitsets can't be correctly serialized by Kryo's default serializer
+  // BitSet::wordsInUse is transient, so force dumping into a lower form
+  private byte[] includedBuckets;
+
   /** Whether LLAP IO will be used for inputs. */
   private String llapIoDesc;
 
@@ -617,4 +622,13 @@ public class MapWork extends BaseWork {
   public void setMapAliases(List<String> mapAliases) {
     this.mapAliases = mapAliases;
   }
+
+  public BitSet getIncludedBuckets() {
+    return includedBuckets != null ? BitSet.valueOf(includedBuckets) : null;
+  }
+
+  public void setIncludedBuckets(BitSet includedBuckets) {
+    // see comment next to the field
+    this.includedBuckets = includedBuckets.toByteArray();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/619ff6e9/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
index be7139c..b354f98 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.plan;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 
@@ -107,6 +108,10 @@ public class TableScanDesc extends AbstractOperatorDesc {
 
   private transient Table tableMetadata;
 
+  private BitSet includedBuckets;
+
+  private int numBuckets = -1;
+
   public TableScanDesc() {
     this(null, null);
   }
@@ -319,4 +324,38 @@ public class TableScanDesc extends AbstractOperatorDesc {
   public void setSerializedFilterObject(String serializedFilterObject) {
     this.serializedFilterObject = serializedFilterObject;
   }
+
+  public void setIncludedBuckets(BitSet bitset) {
+    this.includedBuckets = bitset;
+  }
+
+  public BitSet getIncludedBuckets() {
+    return this.includedBuckets;
+  }
+
+  @Explain(displayName = "buckets included", explainLevels = { Level.EXTENDED })
+  public String getIncludedBucketExplain() {
+    if (this.includedBuckets == null) {
+      return null;
+    }
+
+    StringBuilder sb = new StringBuilder();
+    sb.append("[");
+    for (int i = 0; i < this.includedBuckets.size(); i++) {
+      if (this.includedBuckets.get(i)) {
+        sb.append(i);
+        sb.append(',');
+      }
+    }
+    sb.append(String.format("] of %d", numBuckets));
+    return sb.toString();
+  }
+
+  public int getNumBuckets() {
+    return numBuckets;
+  }
+
+  public void setNumBuckets(int numBuckets) {
+    this.numBuckets = numBuckets;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/619ff6e9/ql/src/test/queries/clientpositive/bucketpruning1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/bucketpruning1.q b/ql/src/test/queries/clientpositive/bucketpruning1.q
new file mode 100644
index 0000000..be403a5
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/bucketpruning1.q
@@ -0,0 +1,97 @@
+set hive.optimize.ppd=true;
+set hive.optimize.index.filter=true;
+set hive.tez.bucket.pruning=true;
+set hive.explain.user=false;
+set hive.fetch.task.conversion=none;
+
+CREATE TABLE srcbucket_pruned(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 16 BUCKETS STORED AS TEXTFILE;
+
+-- cannot prune 2-key scenarios without a smarter optimizer
+CREATE TABLE srcbucket_unpruned(key int, value string) partitioned by (ds string) CLUSTERED BY (key,value) INTO 16 BUCKETS STORED AS TEXTFILE;
+
+-- good cases
+
+explain extended
+select * from srcbucket_pruned where key = 1;
+
+explain extended
+select * from srcbucket_pruned where key = 16;
+
+explain extended
+select * from srcbucket_pruned where key = 17;
+
+explain extended
+select * from srcbucket_pruned where key = 16+1;
+
+explain extended
+select * from srcbucket_pruned where key = '11';
+
+explain extended
+select * from srcbucket_pruned where key = 1 and ds='2008-04-08';
+
+explain extended
+select * from srcbucket_pruned where key = 1 and ds='2008-04-08' and value='One';
+
+explain extended
+select * from srcbucket_pruned where value='One' and key = 1 and ds='2008-04-08';
+
+explain extended
+select * from srcbucket_pruned where key in (2,3);
+
+explain extended
+select * from srcbucket_pruned where key in (2,3) and ds='2008-04-08';
+
+explain extended
+select * from srcbucket_pruned where key in (2,3) and ds='2008-04-08' and value='One';
+
+explain extended
+select * from srcbucket_pruned where value='One' and key in (2,3) and ds='2008-04-08';
+
+explain extended
+select * from srcbucket_pruned where (key=1 or key=2) and ds='2008-04-08';
+
+explain extended
+select * from srcbucket_pruned where (key=1 or key=2) and value = 'One' and ds='2008-04-08';
+
+-- valid but irrelevant case (all buckets selected)
+
+explain extended
+select * from srcbucket_pruned where key in (1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17);
+
+explain extended
+select * from srcbucket_pruned where key in (1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17) and ds='2008-04-08';
+
+explain extended
+select * from srcbucket_pruned where key in (1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17) and ds='2008-04-08' and value='One';
+
+explain extended
+select * from srcbucket_pruned where value='One' and key in (1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17) and ds='2008-04-08';
+
+-- valid, but unimplemented cases
+
+explain extended
+select * from srcbucket_pruned where key = 1 and ds='2008-04-08' or key = 2;
+
+explain extended
+select * from srcbucket_pruned where key = 1 and ds='2008-04-08' and (value='One' or value = 'Two');
+
+explain extended
+select * from srcbucket_pruned where key = 1 or value = "One" or key = 2 and value = "Two";
+
+-- Invalid cases
+
+explain extended
+select * from srcbucket_pruned where key = 'x11';
+
+explain extended
+select * from srcbucket_pruned where key = 1 or value = "One";
+
+explain extended
+select * from srcbucket_pruned where key = 1 or value = "One" or key = 2;
+
+explain extended
+select * from srcbucket_unpruned where key in (3, 5);
+
+explain extended
+select * from srcbucket_unpruned where key = 1;
+