You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2017/02/07 23:36:08 UTC

kudu git commit: [java client] KUDU-1643 Prune hash partitions based on IN-list predicates

Repository: kudu
Updated Branches:
  refs/heads/master e7e65954b -> 4f0677f84


[java client] KUDU-1643 Prune hash partitions based on IN-list predicates

PartitionPruner is updated to search all combinations of in-list column
values and prune hash partitions where possible.

This also fixes a small issue in the C++ version of the algorithm:
previously the implementation would always consider the final hash
component to be constrained. The bug (as far as I can tell) doesn't lead
to erroneous results, but does cause more partition ranges to be
created, which results in more memory overhead and compute overhead to
compute the pruning.

Change-Id: I8a793c23ff00d19b3d3d062bb222d2c725a93724
Reviewed-on: http://gerrit.cloudera.org:8080/5677
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4f0677f8
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4f0677f8
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4f0677f8

Branch: refs/heads/master
Commit: 4f0677f84d1c74fe69d53ef0531d0460e63f3a37
Parents: e7e6595
Author: honghaijei <ho...@gmail.com>
Authored: Wed Jan 11 07:30:59 2017 +0000
Committer: Dan Burkert <da...@apache.org>
Committed: Tue Feb 7 23:35:48 2017 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/PartitionPruner.java | 141 +++--
 .../apache/kudu/client/TestPartitionPruner.java | 579 ++++++++++++-------
 src/kudu/common/partial_row.h                   |   4 +-
 src/kudu/common/partition_pruner-test.cc        | 135 +++--
 src/kudu/common/partition_pruner.cc             |  18 +-
 src/kudu/common/partition_pruner.h              |   5 +
 src/kudu/common/scan_spec.cc                    |   2 +-
 7 files changed, 540 insertions(+), 344 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/4f0677f8/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionPruner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionPruner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionPruner.java
index e5786f3..165f370 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionPruner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionPruner.java
@@ -17,10 +17,10 @@
 
 package org.apache.kudu.client;
 
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
 import java.util.Deque;
 import java.util.Iterator;
 import java.util.List;
@@ -48,6 +48,13 @@ public class PartitionPruner {
   }
 
   /**
+   * @return the number of remaining partition ranges for the scan
+   */
+  public int numRangesRemainingForTests() {
+    return rangePartitions.size();
+  }
+
+  /**
    * @return a partition pruner that will prune all partitions
    */
   private static PartitionPruner empty() {
@@ -127,10 +134,10 @@ public class PartitionPruner {
     // examples above:
     //
     // 1) The partition keys are truncated after the final constrained component
-    //    (hash bucket components are constrained when the scan is limited to a
-    //    single bucket via equality predicates on that component, while range
-    //    components are constrained if they have an upper or lower bound via
-    //    range or equality predicates on that component).
+    //    Hash bucket components are constrained when the scan is limited to a
+    //    subset of buckets via equality or in-list predicates on that component.
+    //    Range components are constrained if they have an upper or lower bound
+    //    via range or equality predicates on that component.
     //
     // 2) If the final constrained component is a hash bucket, then the
     //    corresponding bucket in the upper bound is incremented in order to make
@@ -139,7 +146,12 @@ public class PartitionPruner {
     // 3) The number of partition key ranges in the result is equal to the product
     //    of the number of buckets of each unconstrained hash component which come
     //    before a final constrained component. If there are no unconstrained hash
-    //    components, then the number of partition key ranges is one.
+    //    components, then the number of resulting partition key ranges is one. Note
+    //    that this can be a lot of ranges, and we may find we need to limit the
+    //    algorithm to give up on pruning if the number of ranges exceeds a limit.
+    //    Until this becomes a problem in practice, we'll continue always pruning,
+    //    since it is precisely these highly-hash-partitioned tables which get the
+    //    most benefit from pruning.
 
     // Step 1: Build the range portion of the partition key. If the range partition
     // columns match the primary key columns, then we can substitute the primary
@@ -159,11 +171,10 @@ public class PartitionPruner {
 
     // Step 2: Create the hash bucket portion of the partition key.
 
-    // The list of hash buckets per hash component, or null if the component is
-    // not constrained.
-    List<Integer> hashBuckets = new ArrayList<>(partitionSchema.getHashBucketSchemas().size());
+    // List of pruned hash buckets per hash component.
+    List<BitSet> hashComponents = new ArrayList<>(partitionSchema.getHashBucketSchemas().size());
     for (PartitionSchema.HashBucketSchema hashSchema : partitionSchema.getHashBucketSchemas()) {
-      hashBuckets.add(pushPredicatesIntoHashBucket(schema, hashSchema, predicates));
+      hashComponents.add(pruneHashComponent(schema, hashSchema, predicates));
     }
 
     // The index of the final constrained component in the partition key.
@@ -171,12 +182,14 @@ public class PartitionPruner {
     if (rangeLowerBound.length > 0 || rangeUpperBound.length > 0) {
       // The range component is constrained if either of the range bounds are
       // specified (non-empty).
-      constrainedIndex = hashBuckets.size();
+      constrainedIndex = partitionSchema.getHashBucketSchemas().size();
     } else {
       // Search the hash bucket constraints from right to left, looking for the
       // first constrained component.
-      for (int i = hashBuckets.size(); i > 0; i--) {
-        if (hashBuckets.get(i - 1) != null) {
+      for (int i = hashComponents.size(); i > 0; i--) {
+        int numBuckets = partitionSchema.getHashBucketSchemas().get(i - 1).getNumBuckets();
+        BitSet hashBuckets = hashComponents.get(i - 1);
+        if (hashBuckets.nextClearBit(0) < numBuckets) {
           constrainedIndex = i;
           break;
         }
@@ -185,52 +198,35 @@ public class PartitionPruner {
 
     // Build up a set of partition key ranges out of the hash components.
     //
-    // Each constrained hash component simply appends its bucket number to the
+    // Each hash component simply appends its bucket number to the
     // partition key ranges (possibly incrementing the upper bound by one bucket
     // number if this is the final constraint, see note 2 in the example above).
-    //
-    // Each unconstrained hash component results in creating a new partition key
-    // range for each bucket of the hash component.
     List<Pair<ByteVec, ByteVec>> partitionKeyRanges = new ArrayList<>();
     partitionKeyRanges.add(new Pair<>(ByteVec.create(), ByteVec.create()));
 
-    ByteBuffer bucketBuf = ByteBuffer.allocate(4);
-    bucketBuf.order(ByteOrder.BIG_ENDIAN);
     for (int hashIdx = 0; hashIdx < constrainedIndex; hashIdx++) {
       // This is the final partition key component if this is the final constrained
       // bucket, and the range upper bound is empty. In this case we need to
       // increment the bucket on the upper bound to convert from inclusive to
       // exclusive.
       boolean isLast = hashIdx + 1 == constrainedIndex && rangeUpperBound.length == 0;
-
-      if (hashBuckets.get(hashIdx) != null) {
-        // This hash component is constrained by equality predicates to a single
-        // hash bucket.
-        int bucket = hashBuckets.get(hashIdx);
-        int bucketUpper = isLast ? bucket + 1 : bucket;
-
-        for (Pair<ByteVec, ByteVec> partitionKeyRange : partitionKeyRanges) {
-          KeyEncoder.encodeHashBucket(bucket, partitionKeyRange.getFirst());
-          KeyEncoder.encodeHashBucket(bucketUpper, partitionKeyRange.getSecond());
+      BitSet hashBuckets = hashComponents.get(hashIdx);
+
+      List<Pair<ByteVec, ByteVec>> newPartitionKeyRanges =
+          new ArrayList<>(partitionKeyRanges.size() * hashBuckets.cardinality());
+      for (Pair<ByteVec, ByteVec> partitionKeyRange : partitionKeyRanges) {
+        for (int bucket = hashBuckets.nextSetBit(0);
+             bucket != -1;
+             bucket = hashBuckets.nextSetBit(bucket + 1)) {
+          int bucketUpper = isLast ? bucket + 1 : bucket;
+          ByteVec lower = partitionKeyRange.getFirst().clone();
+          ByteVec upper = partitionKeyRange.getFirst().clone();
+          KeyEncoder.encodeHashBucket(bucket, lower);
+          KeyEncoder.encodeHashBucket(bucketUpper, upper);
+          newPartitionKeyRanges.add(new Pair<>(lower, upper));
         }
-      } else {
-        PartitionSchema.HashBucketSchema hashSchema =
-            partitionSchema.getHashBucketSchemas().get(hashIdx);
-        // Add a partition key range for each possible hash bucket.
-        List<Pair<ByteVec, ByteVec>> newPartitionKeyRanges =
-            new ArrayList<>(partitionKeyRanges.size() * hashSchema.getNumBuckets());
-        for (Pair<ByteVec, ByteVec> partitionKeyRange : partitionKeyRanges) {
-          for (int bucket = 0; bucket < hashSchema.getNumBuckets(); bucket++) {
-            int bucketUpper = isLast ? bucket + 1 : bucket;
-            ByteVec lower = partitionKeyRange.getFirst().clone();
-            ByteVec upper = partitionKeyRange.getFirst().clone();
-            KeyEncoder.encodeHashBucket(bucket, lower);
-            KeyEncoder.encodeHashBucket(bucketUpper, upper);
-            newPartitionKeyRanges.add(new Pair<>(lower, upper));
-          }
-        }
-        partitionKeyRanges = newPartitionKeyRanges;
       }
+      partitionKeyRanges = newPartitionKeyRanges;
     }
 
     // Step 3: append the (possibly empty) range bounds to the partition key ranges.
@@ -309,7 +305,7 @@ public class PartitionPruner {
    * @param partition to prune
    * @return {@code true} if the partition should be pruned
    */
-  boolean shouldPrune(Partition partition) {
+  boolean shouldPruneForTests(Partition partition) {
     // The C++ version uses binary search to do this with fewer key comparisons,
     // but the algorithm isn't easily translatable, so this just uses a linear
     // search.
@@ -492,22 +488,51 @@ public class PartitionPruner {
   }
 
   /**
-   * Determines if the provided predicates can constrain the hash component to a
-   * single bucket, and if so, returns the bucket number. Otherwise returns null.
+   * Search all combination of in-list and equality predicates for pruneable hash partitions.
+   * @return a bitset containing {@code false} bits for hash buckets which may be pruned
    */
-  private static Integer pushPredicatesIntoHashBucket(Schema schema,
-                                                      PartitionSchema.HashBucketSchema hashSchema,
-                                                      Map<String, KuduPredicate> predicates) {
+  private static BitSet pruneHashComponent(Schema schema,
+                                           PartitionSchema.HashBucketSchema hashSchema,
+                                           Map<String, KuduPredicate> predicates) {
+    BitSet hashBuckets = new BitSet(hashSchema.getNumBuckets());
     List<Integer> columnIdxs = idsToIndexes(schema, hashSchema.getColumnIds());
-    PartialRow row = schema.newPartialRow();
     for (int idx : columnIdxs) {
       ColumnSchema column = schema.getColumnByIndex(idx);
       KuduPredicate predicate = predicates.get(column.getName());
-      if (predicate == null || predicate.getType() != KuduPredicate.PredicateType.EQUALITY) {
-        return null;
+      if (predicate == null ||
+          (predicate.getType() != KuduPredicate.PredicateType.EQUALITY &&
+           predicate.getType() != KuduPredicate.PredicateType.IN_LIST)) {
+        hashBuckets.set(0, hashSchema.getNumBuckets());
+        return hashBuckets;
       }
-      row.setRaw(idx, predicate.getLower());
     }
-    return KeyEncoder.getHashBucket(row, hashSchema);
+
+    List<PartialRow> rows = Arrays.asList(schema.newPartialRow());
+    for (int idx : columnIdxs) {
+      List<PartialRow> newRows = new ArrayList<>();
+      ColumnSchema column = schema.getColumnByIndex(idx);
+      KuduPredicate predicate = predicates.get(column.getName());
+      List<byte[]> predicateValues;
+      if (predicate.getType() == KuduPredicate.PredicateType.EQUALITY) {
+        predicateValues = Arrays.asList(predicate.getLower());
+      } else {
+        predicateValues = Arrays.asList(predicate.getInListValues());
+      }
+      // For each of the encoded string, replicate it by the number of values in
+      // equality and in-list predicate.
+      for (PartialRow row : rows) {
+        for (byte[] predicateValue : predicateValues) {
+          PartialRow newRow = new PartialRow(row);
+          newRow.setRaw(idx, predicateValue);
+          newRows.add(newRow);
+        }
+      }
+      rows = newRows;
+    }
+    for (PartialRow row : rows) {
+      int hash = KeyEncoder.getHashBucket(row, hashSchema);
+      hashBuckets.set(hash);
+    }
+    return hashBuckets;
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f0677f8/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java
index 7b9c51e..7aec3a0 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java
@@ -35,16 +35,17 @@ public class TestPartitionPruner extends BaseKuduTest {
    * Counts the partitions touched by a scan with optional primary key bounds.
    * The table is assumed to have three INT8 columns as the primary key.
    *
-   * @param table the table to scan
-   * @param partitions the partitions of the table
+   * @param expectedTablets the expected number of tablets to satisfy the scan
+   * @param table                the table to scan
+   * @param partitions           the partitions of the table
    * @param lowerBoundPrimaryKey the optional lower bound primary key
    * @param upperBoundPrimaryKey the optional upper bound primary key
-   * @return the number of partitions touched by the scan
    */
-  private int countPartitionsPrimaryKey(KuduTable table,
-                                        List<Partition> partitions,
-                                        byte[] lowerBoundPrimaryKey,
-                                        byte[] upperBoundPrimaryKey) throws Exception {
+  private void checkPartitionsPrimaryKey(int expectedTablets,
+                                         KuduTable table,
+                                         List<Partition> partitions,
+                                         byte[] lowerBoundPrimaryKey,
+                                         byte[] upperBoundPrimaryKey) throws Exception {
     KuduScanToken.KuduScanTokenBuilder scanBuilder = syncClient.newScanTokenBuilder(table);
 
     if (lowerBoundPrimaryKey != null) {
@@ -67,58 +68,57 @@ public class TestPartitionPruner extends BaseKuduTest {
 
     int scannedPartitions = 0;
     for (Partition partition : partitions) {
-      if (!pruner.shouldPrune(partition)) scannedPartitions++;
+      if (!pruner.shouldPruneForTests(partition)) scannedPartitions++;
     }
 
     // Check that the number of ScanTokens built for the scan matches.
+    assertEquals(expectedTablets, scannedPartitions);
     assertEquals(scannedPartitions, scanBuilder.build().size());
-    return scannedPartitions;
+    assertEquals(expectedTablets == 0 ? 0 : 1, pruner.numRangesRemainingForTests());
   }
 
   /**
-   * Counts the partitions touched by a scan with predicates.
+   * Checks the number of tablets and pruner ranges generated for a scan.
    *
+   * @param expectedTablets the expected number of tablets to satisfy the scan
+   * @param expectedPrunerRanges the expected number of generated partition pruner ranges
    * @param table the table to scan
    * @param partitions the partitions of the table
    * @param predicates the predicates to apply to the scan
-   * @return the number of partitions touched by the scan
    */
-  private int countPartitions(KuduTable table,
-                              List<Partition> partitions,
-                              KuduPredicate... predicates) throws Exception {
-    KuduScanToken.KuduScanTokenBuilder scanBuilder = syncClient.newScanTokenBuilder(table);
-
-    for (KuduPredicate predicate : predicates) {
-      scanBuilder.addPredicate(predicate);
-    }
-
-    PartitionPruner pruner = PartitionPruner.create(scanBuilder);
-
-    int scannedPartitions = 0;
-    for (Partition partition : partitions) {
-      if (!pruner.shouldPrune(partition)) scannedPartitions++;
-    }
-
-    // Check that the number of ScanTokens built for the scan matches.
-    assertEquals(scannedPartitions, scanBuilder.build().size());
-    return scannedPartitions;
+  private void checkPartitions(int expectedTablets,
+                               int expectedPrunerRanges,
+                               KuduTable table,
+                               List<Partition> partitions,
+                               KuduPredicate... predicates) {
+    checkPartitions(expectedTablets,
+                    expectedPrunerRanges,
+                    table,
+                    partitions,
+                    null,
+                    null,
+                    predicates);
   }
 
   /**
-   * Counts the partitions touched by a scan with predicates and optional partition key bounds.
+   * Checks the number of tablets and pruner ranges generated for a scan with
+   * predicates and optional partition key bounds.
    *
+   * @param expectedTablets the expected number of tablets to satisfy the scan
+   * @param expectedPrunerRanges the expected number of generated partition pruner ranges
    * @param table the table to scan
    * @param partitions the partitions of the table
    * @param lowerBoundPartitionKey an optional lower bound partition key
    * @param upperBoundPartitionKey an optional upper bound partition key
    * @param predicates the predicates to apply to the scan
-   * @return the number of partitions touched by the scan
    */
-  private int countPartitions(KuduTable table,
-                              List<Partition> partitions,
-                              byte[] lowerBoundPartitionKey,
-                              byte[] upperBoundPartitionKey,
-                              KuduPredicate... predicates) throws Exception {
+  private void checkPartitions(int expectedTablets,
+                               int expectedPrunerRanges,
+                               KuduTable table,
+                               List<Partition> partitions,
+                               byte[] lowerBoundPartitionKey,
+                               byte[] upperBoundPartitionKey,
+                               KuduPredicate... predicates) {
     // Partition key bounds can't be applied to the ScanTokenBuilder.
     KuduScanner.KuduScannerBuilder scanBuilder = syncClient.newScannerBuilder(table);
 
@@ -137,10 +137,24 @@ public class TestPartitionPruner extends BaseKuduTest {
 
     int scannedPartitions = 0;
     for (Partition partition : partitions) {
-      if (!pruner.shouldPrune(partition)) scannedPartitions++;
+      if (!pruner.shouldPruneForTests(partition)) scannedPartitions++;
     }
 
-    return scannedPartitions;
+    assertEquals(expectedTablets, scannedPartitions);
+    assertEquals(expectedPrunerRanges, pruner.numRangesRemainingForTests());
+
+    // Check that the scan token builder comes up with the same amount.
+    // The scan token builder does not allow for upper/lower partition keys.
+    if (lowerBoundPartitionKey == null && upperBoundPartitionKey == null) {
+      KuduScanToken.KuduScanTokenBuilder tokenBuilder = syncClient.newScanTokenBuilder(table);
+
+      for (KuduPredicate predicate : predicates) {
+        tokenBuilder.addPredicate(predicate);
+      }
+
+      // Check that the number of ScanTokens built for the scan matches.
+      assertEquals(expectedTablets, tokenBuilder.build().size());
+    }
   }
 
   /**
@@ -194,51 +208,55 @@ public class TestPartitionPruner extends BaseKuduTest {
 
     byte min = Byte.MIN_VALUE;
 
+
     // No bounds
-    assertEquals(3, countPartitionsPrimaryKey(table, partitions, null, null));
+    checkPartitionsPrimaryKey(3, table, partitions,
+                              null, null);
 
     // PK < (-1, min, min)
-    assertEquals(1, countPartitionsPrimaryKey(table, partitions, null,
-                                              new byte[] { -1, min, min }));
+    checkPartitionsPrimaryKey(1, table, partitions,
+                              null, new byte[] { -1, min, min });
 
     // PK < (10, 10, 10)
-    assertEquals(2, countPartitionsPrimaryKey(table, partitions, null, new byte[] { 10, 10, 10 }));
+    checkPartitionsPrimaryKey(2, table, partitions,
+                              null, new byte[] { 10, 10, 10 });
 
     // PK < (100, min, min)
-    assertEquals(3, countPartitionsPrimaryKey(table, partitions, null, new byte[] { 100, min, min }));
+    checkPartitionsPrimaryKey(3, table, partitions,
+                              null, new byte[] { 100, min, min });
 
     // PK >= (-10, -10, -10)
-    assertEquals(3, countPartitionsPrimaryKey(table, partitions, new byte[] { -10, -10, -10 }, null));
+    checkPartitionsPrimaryKey(3, table, partitions,
+                              new byte[] { -10, -10, -10 }, null);
 
     // PK >= (0, 0, 0)
-    assertEquals(2, countPartitionsPrimaryKey(table, partitions, new byte[] { 0, 0, 0 }, null));
+    checkPartitionsPrimaryKey(2, table, partitions,
+                              new byte[] { 0, 0, 0 }, null);
 
     // PK >= (100, 0, 0)
-    assertEquals(1, countPartitionsPrimaryKey(table, partitions, new byte[] { 100, 0, 0 }, null));
+    checkPartitionsPrimaryKey(1, table, partitions,
+                              new byte[] { 100, 0, 0 }, null);
 
     // PK >= (-10, 0, 0)
     // PK  < (100, 0, 0)
-    assertEquals(3, countPartitionsPrimaryKey(table, partitions,
-                                              new byte[] { -10, 0, 0 },
-                                              new byte[] { 100, 0, 0 }));
+    checkPartitionsPrimaryKey(3, table, partitions,
+                              new byte[] { -10, 0, 0 }, new byte[] { 100, 0, 0 });
 
     // PK >= (0, 0, 0)
     // PK  < (10, 10, 10)
-    assertEquals(1, countPartitionsPrimaryKey(table, partitions,
-                                              new byte[] { 0, 0, 0 },
-                                              new byte[] { 10, 0, 0 }));
+    checkPartitionsPrimaryKey(1, table, partitions,
+                              new byte[] { 0, 0, 0 }, new byte[] { 10, 0, 0 });
 
     // PK >= (0, 0, 0)
     // PK  < (10, 10, 11)
-    assertEquals(1, countPartitionsPrimaryKey(table, partitions,
-                                              new byte[] { 0, 0, 0 },
-                                              new byte[] { 10, 0, 0 }));
+    checkPartitionsPrimaryKey(1, table, partitions,
+                              new byte[] { 0, 0, 0 }, new byte[] { 10, 0, 0 });
 
     // PK < (0, 0, 0)
     // PK >= (10, 10, 11)
-    assertEquals(0, countPartitionsPrimaryKey(table, partitions,
-                                              new byte[] { 10, 0, 0 },
-                                              new byte[] { 0, 0, 0 }));
+    checkPartitionsPrimaryKey(0, table, partitions,
+                              new byte[] { 10, 0, 0 }, new byte[] { 0, 0, 0 });
+
   }
 
   @Test
@@ -273,168 +291,168 @@ public class TestPartitionPruner extends BaseKuduTest {
     List<Partition> partitions = getTablePartitions(table);
 
     // No Predicates
-    assertEquals(3, countPartitions(table, partitions));
+    checkPartitions(3, 1, table, partitions);
 
     // c < -10
-    assertEquals(1, countPartitions(table, partitions,
-        KuduPredicate.newComparisonPredicate(c, ComparisonOp.LESS, -10)));
+    checkPartitions(1, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.LESS, -10));
 
     // c = -10
-    assertEquals(1, countPartitions(table, partitions,
-        KuduPredicate.newComparisonPredicate(c, ComparisonOp.EQUAL, -10)));
+    checkPartitions(1, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.EQUAL, -10));
 
     // c < 10
-    assertEquals(2, countPartitions(table, partitions,
-        KuduPredicate.newComparisonPredicate(c, ComparisonOp.LESS, 10)));
+    checkPartitions(2, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.LESS, 10));
 
     // c < 100
-    assertEquals(3, countPartitions(table, partitions,
-        KuduPredicate.newComparisonPredicate(c, ComparisonOp.LESS, 100)));
+    checkPartitions(3, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.LESS, 100));
 
     // c < MIN
-    assertEquals(0, countPartitions(table, partitions,
-                                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.LESS, Byte.MIN_VALUE)));
+    checkPartitions(0, 0, table, partitions,
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.LESS, Byte.MIN_VALUE));
     // c < MAX
-    assertEquals(3, countPartitions(table, partitions,
-                                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.LESS, Byte.MAX_VALUE)));
+    checkPartitions(3, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.LESS, Byte.MAX_VALUE));
 
     // c >= -10
-    assertEquals(3, countPartitions(table, partitions,
-        KuduPredicate.newComparisonPredicate(c, ComparisonOp.GREATER_EQUAL, -10)));
+    checkPartitions(3, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.GREATER_EQUAL, -10));
 
     // c >= 0
-    assertEquals(3, countPartitions(table, partitions,
-        KuduPredicate.newComparisonPredicate(c, ComparisonOp.GREATER_EQUAL, -10)));
+    checkPartitions(3, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.GREATER_EQUAL, -10));
 
     // c >= 5
-    assertEquals(2, countPartitions(table, partitions,
-        KuduPredicate.newComparisonPredicate(c, ComparisonOp.GREATER_EQUAL, 5)));
+    checkPartitions(2, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.GREATER_EQUAL, 5));
 
     // c >= 10
-    assertEquals(2, countPartitions(table, partitions,
-        KuduPredicate.newComparisonPredicate(c, ComparisonOp.GREATER_EQUAL, 10)));
+    checkPartitions(2, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.GREATER_EQUAL, 10));
 
     // c >= 100
-    assertEquals(1, countPartitions(table, partitions,
-        KuduPredicate.newComparisonPredicate(c, ComparisonOp.GREATER_EQUAL, 100)));
+    checkPartitions(1, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.GREATER_EQUAL, 100));
 
     // c >= MIN
-    assertEquals(3, countPartitions(table, partitions,
-                                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.GREATER_EQUAL, Byte.MIN_VALUE)));
+    checkPartitions(3, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.GREATER_EQUAL, Byte.MIN_VALUE));
     // c >= MAX
-    assertEquals(1, countPartitions(table, partitions,
-                                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.GREATER_EQUAL, Byte.MAX_VALUE)));
+    checkPartitions(1, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.GREATER_EQUAL, Byte.MAX_VALUE));
 
     // c >= -10
     // c < 0
-    assertEquals(1, countPartitions(table, partitions,
-        KuduPredicate.newComparisonPredicate(c, ComparisonOp.GREATER_EQUAL, -10),
-        KuduPredicate.newComparisonPredicate(c, ComparisonOp.LESS, 0)));
+    checkPartitions(1, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.GREATER_EQUAL, -10),
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.LESS, 0));
 
     // c >= 5
     // c < 100
-    assertEquals(2, countPartitions(table, partitions,
-        KuduPredicate.newComparisonPredicate(c, ComparisonOp.GREATER_EQUAL, 5),
-        KuduPredicate.newComparisonPredicate(c, ComparisonOp.LESS, 100)));
+    checkPartitions(2, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.GREATER_EQUAL, 5),
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.LESS, 100));
 
     // b = ""
-    assertEquals(3, countPartitions(table, partitions,
-        KuduPredicate.newComparisonPredicate(b, ComparisonOp.EQUAL, "")));
+    checkPartitions(3, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(b, ComparisonOp.EQUAL, ""));
 
     // b >= "z"
-    assertEquals(3, countPartitions(table, partitions,
-        KuduPredicate.newComparisonPredicate(b, ComparisonOp.GREATER_EQUAL, "z")));
+    checkPartitions(3, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(b, ComparisonOp.GREATER_EQUAL, "z"));
 
     // b < "a"
-    assertEquals(3, countPartitions(table, partitions,
-        KuduPredicate.newComparisonPredicate(b, ComparisonOp.LESS, "a")));
+    checkPartitions(3, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(b, ComparisonOp.LESS, "a"));
 
     // b >= "m"
     // b < "z"
-    assertEquals(3, countPartitions(table, partitions,
-        KuduPredicate.newComparisonPredicate(b, ComparisonOp.GREATER_EQUAL, "m"),
-        KuduPredicate.newComparisonPredicate(b, ComparisonOp.LESS, "z")));
+    checkPartitions(3, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(b, ComparisonOp.GREATER_EQUAL, "m"),
+                    KuduPredicate.newComparisonPredicate(b, ComparisonOp.LESS, "z"));
 
     // c >= 10
     // b >= "r"
-    assertEquals(1, countPartitions(table, partitions,
-        KuduPredicate.newComparisonPredicate(c, ComparisonOp.GREATER_EQUAL, 10),
-        KuduPredicate.newComparisonPredicate(b, ComparisonOp.GREATER_EQUAL, "r")));
+    checkPartitions(1, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.GREATER_EQUAL, 10),
+                    KuduPredicate.newComparisonPredicate(b, ComparisonOp.GREATER_EQUAL, "r"));
 
     // c >= 10
     // b < "r"
-    assertEquals(2, countPartitions(table, partitions,
-        KuduPredicate.newComparisonPredicate(c, ComparisonOp.GREATER_EQUAL, 10),
-        KuduPredicate.newComparisonPredicate(b, ComparisonOp.LESS, "r")));
+    checkPartitions(2, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.GREATER_EQUAL, 10),
+                    KuduPredicate.newComparisonPredicate(b, ComparisonOp.LESS, "r"));
 
     // c = 10
     // b < "r"
-    assertEquals(1, countPartitions(table, partitions,
-        KuduPredicate.newComparisonPredicate(c, ComparisonOp.EQUAL, 10),
-        KuduPredicate.newComparisonPredicate(b, ComparisonOp.LESS, "r")));
+    checkPartitions(1, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.EQUAL, 10),
+                    KuduPredicate.newComparisonPredicate(b, ComparisonOp.LESS, "r"));
 
     // c < 0
     // b < "m"
-    assertEquals(1, countPartitions(table, partitions,
-        KuduPredicate.newComparisonPredicate(c, ComparisonOp.EQUAL, 0),
-        KuduPredicate.newComparisonPredicate(b, ComparisonOp.LESS, "m")));
+    checkPartitions(1, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.EQUAL, 0),
+                    KuduPredicate.newComparisonPredicate(b, ComparisonOp.LESS, "m"));
 
     // c < 0
     // b < "z"
-    assertEquals(1, countPartitions(table, partitions,
-        KuduPredicate.newComparisonPredicate(c, ComparisonOp.LESS, 0),
-        KuduPredicate.newComparisonPredicate(b, ComparisonOp.LESS, "z")));
+    checkPartitions(1, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.LESS, 0),
+                    KuduPredicate.newComparisonPredicate(b, ComparisonOp.LESS, "z"));
 
     // c = 0
     // b = "m\0"
-    assertEquals(1, countPartitions(table, partitions,
-        KuduPredicate.newComparisonPredicate(c, ComparisonOp.EQUAL, 0),
-        KuduPredicate.newComparisonPredicate(b, ComparisonOp.EQUAL, "m\0")));
+    checkPartitions(1, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.EQUAL, 0),
+                    KuduPredicate.newComparisonPredicate(b, ComparisonOp.EQUAL, "m\0"));
 
     // c = 0
     // b < "m"
-    assertEquals(1, countPartitions(table, partitions,
-        KuduPredicate.newComparisonPredicate(c, ComparisonOp.EQUAL, 0),
-        KuduPredicate.newComparisonPredicate(b, ComparisonOp.LESS, "m")));
+    checkPartitions(1, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.EQUAL, 0),
+                    KuduPredicate.newComparisonPredicate(b, ComparisonOp.LESS, "m"));
 
     // c = 0
     // b < "m\0"
-    assertEquals(2, countPartitions(table, partitions,
-        KuduPredicate.newComparisonPredicate(c, ComparisonOp.EQUAL, 0),
-        KuduPredicate.newComparisonPredicate(b, ComparisonOp.LESS, "m\0")));
+    checkPartitions(2, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.EQUAL, 0),
+                    KuduPredicate.newComparisonPredicate(b, ComparisonOp.LESS, "m\0"));
 
     // c = 0
     // c = 2
-    assertEquals(0, countPartitions(table, partitions,
-        KuduPredicate.newComparisonPredicate(c, ComparisonOp.EQUAL, 0),
-        KuduPredicate.newComparisonPredicate(c, ComparisonOp.EQUAL, 2)));
+    checkPartitions(0, 0, table, partitions,
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.EQUAL, 0),
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.EQUAL, 2));
 
     // c = MIN
-    assertEquals(1, countPartitions(table, partitions,
-                                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.EQUAL, Byte.MIN_VALUE)));
+    checkPartitions(1, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.EQUAL, Byte.MIN_VALUE));
     // c = MAX
-    assertEquals(1, countPartitions(table, partitions,
-                                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.EQUAL, Byte.MAX_VALUE)));
-
-    // a IN (1, 2)
-    assertEquals(1, countPartitions(table, partitions,
-        KuduPredicate.newInListPredicate(c, ImmutableList.of((byte) 1, (byte) 2))));
-
-    // a IN (0, 1, 2)
-    assertEquals(2, countPartitions(table, partitions,
-        KuduPredicate.newInListPredicate(c, ImmutableList.of((byte) 0, (byte) 1, (byte) 2))));
-
-    // a IN (-10, 0)
-    // B < "m"
-    assertEquals(1, countPartitions(table, partitions,
-        KuduPredicate.newInListPredicate(c, ImmutableList.of((byte) -10, (byte) 0)),
-        KuduPredicate.newComparisonPredicate(b, ComparisonOp.LESS, "m")));
-
-    // a IN (-10, 0)
-    // B < "m\0"
-    assertEquals(2, countPartitions(table, partitions,
-        KuduPredicate.newInListPredicate(c, ImmutableList.of((byte) -10, (byte) 0)),
-        KuduPredicate.newComparisonPredicate(b, ComparisonOp.LESS, "m\0")));
+    checkPartitions(1, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.EQUAL, Byte.MAX_VALUE));
+
+    // c IN (1, 2)
+    checkPartitions(1, 1, table, partitions,
+                    KuduPredicate.newInListPredicate(c, ImmutableList.of((byte) 1, (byte) 2)));
+
+    // c IN (0, 1, 2)
+    checkPartitions(2, 1, table, partitions,
+                    KuduPredicate.newInListPredicate(c, ImmutableList.of((byte) 0, (byte) 1, (byte) 2)));
+
+    // c IN (-10, 0)
+    // b < "m"
+    checkPartitions(1, 1, table, partitions,
+                    KuduPredicate.newInListPredicate(c, ImmutableList.of((byte) -10, (byte) 0)),
+                    KuduPredicate.newComparisonPredicate(b, ComparisonOp.LESS, "m"));
+
+    // c IN (-10, 0)
+    // b < "m\0"
+    checkPartitions(2, 1, table, partitions,
+                    KuduPredicate.newInListPredicate(c, ImmutableList.of((byte) -10, (byte) 0)),
+                    KuduPredicate.newComparisonPredicate(b, ComparisonOp.LESS, "m\0"));
   }
 
   @Test
@@ -461,49 +479,165 @@ public class TestPartitionPruner extends BaseKuduTest {
     List<Partition> partitions = getTablePartitions(table);
 
     // No Predicates
-    assertEquals(4, countPartitions(table, partitions));
+    checkPartitions(4, 1, table, partitions);
 
     // a = 0;
-    assertEquals(2, countPartitions(table, partitions,
-          KuduPredicate.newComparisonPredicate(a, ComparisonOp.EQUAL, 0)));
+    checkPartitions(2, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(a, ComparisonOp.EQUAL, 0));
 
     // a >= 0;
-    assertEquals(4, countPartitions(table, partitions,
-          KuduPredicate.newComparisonPredicate(a, ComparisonOp.GREATER_EQUAL, 0)));
+    checkPartitions(4, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(a, ComparisonOp.GREATER_EQUAL, 0));
 
     // a >= 0;
     // a < 1;
-    assertEquals(2, countPartitions(table, partitions,
-          KuduPredicate.newComparisonPredicate(a, ComparisonOp.GREATER_EQUAL, 0),
-          KuduPredicate.newComparisonPredicate(a, ComparisonOp.LESS, 1)));
+    checkPartitions(2, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(a, ComparisonOp.GREATER_EQUAL, 0),
+                    KuduPredicate.newComparisonPredicate(a, ComparisonOp.LESS, 1));
 
     // a >= 0;
     // a < 2;
-    assertEquals(4, countPartitions(table, partitions,
-          KuduPredicate.newComparisonPredicate(a, ComparisonOp.GREATER_EQUAL, 0),
-          KuduPredicate.newComparisonPredicate(a, ComparisonOp.LESS, 2)));
+    checkPartitions(4, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(a, ComparisonOp.GREATER_EQUAL, 0),
+                    KuduPredicate.newComparisonPredicate(a, ComparisonOp.LESS, 2));
 
     // b = 1;
-    assertEquals(4, countPartitions(table, partitions,
-          KuduPredicate.newComparisonPredicate(b, ComparisonOp.EQUAL, 1)));
+    checkPartitions(4, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(b, ComparisonOp.EQUAL, 1));
 
     // b = 1;
     // c = 2;
-    assertEquals(2, countPartitions(table, partitions,
-          KuduPredicate.newComparisonPredicate(b, ComparisonOp.EQUAL, 1),
-          KuduPredicate.newComparisonPredicate(c, ComparisonOp.EQUAL, 2)));
+    checkPartitions(2, 2, table, partitions,
+                    KuduPredicate.newComparisonPredicate(b, ComparisonOp.EQUAL, 1),
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.EQUAL, 2));
 
     // a = 0;
     // b = 1;
     // c = 2;
-    assertEquals(1, countPartitions(table, partitions,
-          KuduPredicate.newComparisonPredicate(a, ComparisonOp.EQUAL, 0),
-          KuduPredicate.newComparisonPredicate(b, ComparisonOp.EQUAL, 1),
-          KuduPredicate.newComparisonPredicate(c, ComparisonOp.EQUAL, 2)));
+    checkPartitions(1, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(a, ComparisonOp.EQUAL, 0),
+                    KuduPredicate.newComparisonPredicate(b, ComparisonOp.EQUAL, 1),
+                    KuduPredicate.newComparisonPredicate(c, ComparisonOp.EQUAL, 2));
 
     // a IN (0, 10)
-    assertEquals(4, countPartitions(table, partitions,
-          KuduPredicate.newInListPredicate(c, ImmutableList.of((byte) 0, (byte) 10))));
+    checkPartitions(4, 1, table, partitions,
+                    KuduPredicate.newInListPredicate(c, ImmutableList.of((byte) 0, (byte) 10)));
+  }
+
+  @Test
+  public void testInListHashPartitionPruning() throws Exception {
+    // CREATE TABLE t
+    // (a INT8, b INT8, c INT8)
+    // PRIMARY KEY (a, b, c)
+    // PARTITION BY HASH (a) PARTITIONS 3,
+    //              HASH (b) PARTITIONS 3,
+    //              HASH (c) PARTITIONS 3;
+    ColumnSchema a = new ColumnSchema.ColumnSchemaBuilder("a", Type.INT8).key(true).build();
+    ColumnSchema b = new ColumnSchema.ColumnSchemaBuilder("b", Type.INT8).key(true).build();
+    ColumnSchema c = new ColumnSchema.ColumnSchemaBuilder("c", Type.INT8).key(true).build();
+    Schema schema = new Schema(ImmutableList.of(a, b, c));
+
+    CreateTableOptions tableBuilder = new CreateTableOptions();
+    tableBuilder.setRangePartitionColumns(new ArrayList<String>());
+    tableBuilder.addHashPartitions(ImmutableList.of("a"), 3);
+    tableBuilder.addHashPartitions(ImmutableList.of("b"), 3);
+    tableBuilder.addHashPartitions(ImmutableList.of("c"), 3);
+
+    String tableName = "testInListHashPartitionPruning-" + System.currentTimeMillis();
+    syncClient.createTable(tableName, schema, tableBuilder);
+    KuduTable table = syncClient.openTable(tableName);
+    List<Partition> partitions = getTablePartitions(table);
+
+    // a in [0, 1];
+    checkPartitions(18, 2, table, partitions,
+        KuduPredicate.newInListPredicate(a, ImmutableList.of((byte) 0, (byte) 1)));
+
+    // a in [0, 1, 8];
+    checkPartitions(27, 1, table, partitions,
+        KuduPredicate.newInListPredicate(a, ImmutableList.of((byte) 0, (byte) 1, (byte) 8)));
+
+    // b in [0, 1];
+    checkPartitions(18, 6, table, partitions,
+        KuduPredicate.newInListPredicate(b, ImmutableList.of((byte) 0, (byte) 1)));
+
+    // c in [0, 1];
+    checkPartitions(18, 18, table, partitions,
+        KuduPredicate.newInListPredicate(c, ImmutableList.of((byte) 0, (byte) 1)));
+
+    // b in [0, 1], c in [0, 1];
+    checkPartitions(12, 12, table, partitions,
+        KuduPredicate.newInListPredicate(b, ImmutableList.of((byte) 0, (byte) 1)),
+        KuduPredicate.newInListPredicate(c, ImmutableList.of((byte) 0, (byte) 1)));
+
+    // a in [0, 1], b in [0, 1], c in [0, 1];
+    checkPartitions(8, 8, table, partitions,
+        KuduPredicate.newInListPredicate(a, ImmutableList.of((byte) 0, (byte) 1)),
+        KuduPredicate.newInListPredicate(b, ImmutableList.of((byte) 0, (byte) 1)),
+        KuduPredicate.newInListPredicate(c, ImmutableList.of((byte) 0, (byte) 1)));
+  }
+
+  @Test
+  public void TestMultiColumnInListHashPruning() throws Exception {
+    // CREATE TABLE t
+    // (a INT8, b INT8, c INT8)
+    // PRIMARY KEY (a, b, c)
+    // PARTITION BY HASH (a) PARTITIONS 3,
+    //              HASH (b, c) PARTITIONS 3;
+    ColumnSchema a = new ColumnSchema.ColumnSchemaBuilder("a", Type.INT8).key(true).build();
+    ColumnSchema b = new ColumnSchema.ColumnSchemaBuilder("b", Type.INT8).key(true).build();
+    ColumnSchema c = new ColumnSchema.ColumnSchemaBuilder("c", Type.INT8).key(true).build();
+    Schema schema = new Schema(ImmutableList.of(a, b, c));
+
+    CreateTableOptions tableBuilder = new CreateTableOptions();
+    tableBuilder.setRangePartitionColumns(new ArrayList<String>());
+    tableBuilder.addHashPartitions(ImmutableList.of("a"), 3);
+    tableBuilder.addHashPartitions(ImmutableList.of("b", "c"), 3);
+
+    String tableName = "testMultiColumnInListHashPartitionPruning-" + System.currentTimeMillis();
+    syncClient.createTable(tableName, schema, tableBuilder);
+    KuduTable table = syncClient.openTable(tableName);
+    List<Partition> partitions = getTablePartitions(table);
+
+    // a in [0, 1];
+    checkPartitions(6, 2, table, partitions,
+                    KuduPredicate.newInListPredicate(a, ImmutableList.of((byte) 0, (byte) 1)));
+
+    // a in [0, 1, 8];
+    checkPartitions(9, 1, table, partitions,
+                    KuduPredicate.newInListPredicate(a, ImmutableList.of((byte) 0, (byte) 1, (byte) 8)));
+
+    // b in [0, 1];
+    checkPartitions(9, 1, table, partitions,
+                    KuduPredicate.newInListPredicate(b, ImmutableList.of((byte) 0, (byte) 1)));
+
+    // c in [0, 1];
+    checkPartitions(9, 1, table, partitions,
+                    KuduPredicate.newInListPredicate(c, ImmutableList.of((byte) 0, (byte) 1)));
+
+    // b in [0, 1], c in [0, 1]
+    // (0, 0) in bucket 2
+    // (0, 1) in bucket 2
+    // (1, 0) in bucket 1
+    // (1, 1) in bucket 0
+    checkPartitions(9, 1, table, partitions,
+                    KuduPredicate.newInListPredicate(b, ImmutableList.of((byte) 0, (byte) 1)),
+                    KuduPredicate.newInListPredicate(c, ImmutableList.of((byte) 0, (byte) 1)));
+
+    // b = 0, c in [0, 1]
+    checkPartitions(3, 3, table, partitions,
+                    KuduPredicate.newComparisonPredicate(b, ComparisonOp.EQUAL,  0),
+                    KuduPredicate.newInListPredicate(c, ImmutableList.of((byte) 0, (byte) 1)));
+
+    // b = 1, c in [0, 1]
+    checkPartitions(6, 6, table, partitions,
+                    KuduPredicate.newComparisonPredicate(b, ComparisonOp.EQUAL,  1),
+                    KuduPredicate.newInListPredicate(c, ImmutableList.of((byte) 0, (byte) 1)));
+
+    // a in [0, 1], b in [0, 1], c in [0, 1];
+    checkPartitions(6, 2, table, partitions,
+                    KuduPredicate.newInListPredicate(a, ImmutableList.of((byte) 0, (byte) 1)),
+                    KuduPredicate.newInListPredicate(b, ImmutableList.of((byte) 0, (byte) 1)),
+                    KuduPredicate.newInListPredicate(c, ImmutableList.of((byte) 0, (byte) 1)));
   }
 
   @Test
@@ -538,101 +672,100 @@ public class TestPartitionPruner extends BaseKuduTest {
     List<Partition> partitions = getTablePartitions(table);
 
     // No Predicates
-    assertEquals(4, countPartitions(table, partitions));
+    checkPartitions(4, 1, table, partitions);
 
     // host = "a"
-    assertEquals(4, countPartitions(table, partitions,
-          KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a")));
+    checkPartitions(4, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"));
 
     // host = "a"
     // metric = "a"
-    assertEquals(2, countPartitions(table, partitions,
-          KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
-          KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a")));
+    checkPartitions(2, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+                    KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"));
 
     // host = "a"
     // metric = "a"
     // timestamp >= 9;
-    assertEquals(2, countPartitions(table, partitions,
-          KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
-          KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"),
-          KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.GREATER_EQUAL, 9)));
+    checkPartitions(2, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+                    KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"),
+                    KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.GREATER_EQUAL, 9));
 
     // host = "a"
     // metric = "a"
     // timestamp >= 10;
     // timestamp < 20;
-    assertEquals(1, countPartitions(table, partitions,
-          KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
-          KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"),
-          KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.GREATER_EQUAL, 10),
-          KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.LESS, 20)));
+    checkPartitions(1, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+                    KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"),
+                    KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.GREATER_EQUAL, 10),
+                    KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.LESS, 20));
 
     // host = "a"
     // metric = "a"
     // timestamp < 10;
-    assertEquals(1, countPartitions(table, partitions,
-          KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
-          KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"),
-          KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.LESS, 10)));
+    checkPartitions(1, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+                    KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"),
+                    KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.LESS, 10));
 
     // host = "a"
     // metric = "a"
     // timestamp >= 10;
-    assertEquals(1, countPartitions(table, partitions,
-          KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
-          KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"),
-          KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.GREATER_EQUAL, 10)));
+    checkPartitions(1, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+                    KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"),
+                    KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.GREATER_EQUAL, 10));
 
     // host = "a"
     // metric = "a"
     // timestamp = 10;
-    assertEquals(1, countPartitions(table, partitions,
-          KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
-          KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"),
-          KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.EQUAL, 10)));
+    checkPartitions(1, 1, table, partitions,
+                    KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+                    KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"),
+                    KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.EQUAL, 10));
+
+    byte[] hash1 = new byte[] { 0, 0, 0, 1 };
 
     // partition key < (hash=1)
-    assertEquals(2, countPartitions(table, partitions, new byte[] {}, new byte[] { 0, 0, 0, 1 }));
+    checkPartitions(2, 1, table, partitions, null, hash1);
 
     // partition key >= (hash=1)
-    assertEquals(2, countPartitions(table, partitions, new byte[] { 0, 0, 0, 1 }, new byte[] {}));
+    checkPartitions(2, 1, table, partitions, hash1, null);
 
     // timestamp = 10
     // partition key < (hash=1)
-    assertEquals(1, countPartitions(table, partitions, new byte[] {}, new byte[] { 0, 0, 0, 1 },
-          KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.EQUAL, 10)));
+    checkPartitions(1, 1, table, partitions, null, hash1,
+                    KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.EQUAL, 10));
 
     // timestamp = 10
     // partition key >= (hash=1)
-    assertEquals(1, countPartitions(table, partitions, new byte[] { 0, 0, 0, 1 }, new byte[] {},
-          KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.EQUAL, 10)));
+    checkPartitions(1, 1, table, partitions, hash1,null,
+                    KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.EQUAL, 10));
 
     // timestamp IN (0, 9)
     // host = "a"
-    // metric IN ("foo", "bar")
-    //
-    // We do not prune hash partitions based on IN list predicates (yet),
-    // so the IN list on the hash columns is really just testing that it doesn't fail.
-    assertEquals(2, countPartitions(table, partitions,
-          KuduPredicate.newInListPredicate(timestamp, ImmutableList.of(0L, 9L)),
-          KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
-          KuduPredicate.newInListPredicate(metric, ImmutableList.of("foo", "bar"))));
+    // metric IN ("foo", "baz")
+    checkPartitions(1, 1, table, partitions,
+                    KuduPredicate.newInListPredicate(timestamp, ImmutableList.of(0L, 9L)),
+                    KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+                    KuduPredicate.newInListPredicate(metric, ImmutableList.of("foo", "baz")));
 
     // timestamp IN (10, 100)
-    assertEquals(2, countPartitions(table, partitions,
-          KuduPredicate.newInListPredicate(timestamp, ImmutableList.of(10L, 100L))));
+    checkPartitions(2, 2, table, partitions,
+                    KuduPredicate.newInListPredicate(timestamp, ImmutableList.of(10L, 100L)));
 
     // timestamp IN (9, 10)
-    assertEquals(4, countPartitions(table, partitions,
-          KuduPredicate.newInListPredicate(timestamp, ImmutableList.of(9L, 10L))));
+    checkPartitions(4, 2, table, partitions,
+                    KuduPredicate.newInListPredicate(timestamp, ImmutableList.of(9L, 10L)));
 
     // timestamp IS NOT NULL
-    assertEquals(4, countPartitions(table, partitions,
-          KuduPredicate.newIsNotNullPredicate(timestamp)));
+    checkPartitions(4, 1, table, partitions,
+                    KuduPredicate.newIsNotNullPredicate(timestamp));
 
     // timestamp IS NULL
-    assertEquals(0, countPartitions(table, partitions,
-            KuduPredicate.newIsNullPredicate(timestamp)));
+    checkPartitions(0, 0, table, partitions,
+                    KuduPredicate.newIsNullPredicate(timestamp));
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f0677f8/src/kudu/common/partial_row.h
----------------------------------------------------------------------
diff --git a/src/kudu/common/partial_row.h b/src/kudu/common/partial_row.h
index e0e89c9..e3b01a4 100644
--- a/src/kudu/common/partial_row.h
+++ b/src/kudu/common/partial_row.h
@@ -478,8 +478,8 @@ class KUDU_EXPORT KuduPartialRow {
   friend class TestScanSpec;
   template<typename KeyTypeWrapper> friend struct client::SliceKeysTestSetup;
   template<typename KeyTypeWrapper> friend struct client::IntKeysTestSetup;
-  FRIEND_TEST(TestPartitionPruner, TestPrimaryKeyRangePruning);
-  FRIEND_TEST(TestPartitionPruner, TestPartialPrimaryKeyRangePruning);
+  FRIEND_TEST(PartitionPrunerTest, TestPrimaryKeyRangePruning);
+  FRIEND_TEST(PartitionPrunerTest, TestPartialPrimaryKeyRangePruning);
 
   template<typename T>
   Status Set(const Slice& col_name, const typename T::cpp_type& val,

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f0677f8/src/kudu/common/partition_pruner-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/partition_pruner-test.cc b/src/kudu/common/partition_pruner-test.cc
index 7da2127..7e2dac1 100644
--- a/src/kudu/common/partition_pruner-test.cc
+++ b/src/kudu/common/partition_pruner-test.cc
@@ -43,11 +43,15 @@ using std::vector;
 
 namespace kudu {
 
+class PartitionPrunerTest : public KuduTest {
+};
+
 void CheckPrunedPartitions(const Schema& schema,
                            const PartitionSchema& partition_schema,
-                           const vector<Partition> partitions,
+                           const vector<Partition>& partitions,
                            const ScanSpec& spec,
-                           size_t remaining_tablets) {
+                           size_t remaining_tablets,
+                           size_t pruner_ranges) {
 
   PartitionPruner pruner;
   pruner.Init(schema, partition_schema, spec);
@@ -63,9 +67,10 @@ void CheckPrunedPartitions(const Schema& schema,
                                      return pruner.ShouldPrune(partition);
                                    });
   ASSERT_EQ(remaining_tablets, partitions.size() - pruned_partitions);
+  ASSERT_EQ(pruner_ranges, pruner.NumRangesRemainingForTests());
 }
 
-TEST(TestPartitionPruner, TestPrimaryKeyRangePruning) {
+TEST_F(PartitionPrunerTest, TestPrimaryKeyRangePruning) {
   // CREATE TABLE t
   // (a INT8, b INT8, c INT8)
   // PRIMARY KEY (a, b, c)) SPLIT ROWS [(0, 0, 0), (10, 10, 10)]
@@ -119,7 +124,9 @@ TEST(TestPartitionPruner, TestPrimaryKeyRangePruning) {
       enc_upper_bound = EncodedKey::FromContiguousRow(row);
       spec.SetExclusiveUpperBoundKey(enc_upper_bound.get());
     }
-    CheckPrunedPartitions(schema, partition_schema, partitions, spec, remaining_tablets);
+    size_t pruner_ranges = remaining_tablets == 0 ? 0 : 1;
+    CheckPrunedPartitions(schema, partition_schema, partitions, spec,
+                          remaining_tablets, pruner_ranges);
   };
 
   // No bounds
@@ -172,9 +179,15 @@ TEST(TestPartitionPruner, TestPrimaryKeyRangePruning) {
   Check(make_tuple<int8_t, int8_t, int8_t>(0, 0, 0),
         make_tuple<int8_t, int8_t, int8_t>(10, 10, 11),
         2);
+
+  // PK  < (0, 0, 0)
+  // PK >= (10, 10, 11)
+  Check(make_tuple<int8_t, int8_t, int8_t>(10, 10, 11),
+        make_tuple<int8_t, int8_t, int8_t>(0, 0, 0),
+        0);
 }
 
-TEST(TestPartitionPruner, TestPartialPrimaryKeyRangePruning) {
+TEST_F(PartitionPrunerTest, TestPartialPrimaryKeyRangePruning) {
   // CREATE TABLE t
   // (a INT8, b STRING, c STRING, PRIMARY KEY (a, b, c))
   // DISTRIBUTE BY RANGE(a, b)
@@ -232,7 +245,9 @@ TEST(TestPartitionPruner, TestPartialPrimaryKeyRangePruning) {
       enc_upper_bound = EncodedKey::FromContiguousRow(row);
       spec.SetExclusiveUpperBoundKey(enc_upper_bound.get());
     }
-    CheckPrunedPartitions(schema, partition_schema, partitions, spec, remaining_tablets);
+    size_t pruner_ranges = remaining_tablets == 0 ? 0 : 1;
+    CheckPrunedPartitions(schema, partition_schema, partitions, spec,
+                          remaining_tablets, pruner_ranges);
   };
 
   // No bounds
@@ -280,7 +295,7 @@ TEST(TestPartitionPruner, TestPartialPrimaryKeyRangePruning) {
         make_tuple<int8_t, string>(10, "m"), 1);
 }
 
-TEST(TestPartitionPruner, TestRangePruning) {
+TEST_F(PartitionPrunerTest, TestRangePruning) {
   // CREATE TABLE t
   // (a INT8, b STRING, c INT8)
   // PRIMARY KEY (a, b, c))
@@ -319,7 +334,9 @@ TEST(TestPartitionPruner, TestRangePruning) {
       spec.AddPredicate(pred);
     }
 
-    CheckPrunedPartitions(schema, partition_schema, partitions, spec, remaining_tablets);
+    size_t pruner_ranges = remaining_tablets == 0 ? 0 : 1;
+    CheckPrunedPartitions(schema, partition_schema, partitions, spec,
+                          remaining_tablets, pruner_ranges);
   };
 
   int8_t neg_ten = -10;
@@ -458,7 +475,7 @@ TEST(TestPartitionPruner, TestRangePruning) {
   Check({ ColumnPredicate::IsNotNull(schema.column(2)) }, 3);
 }
 
-TEST(TestPartitionPruner, TestHashPruning) {
+TEST_F(PartitionPrunerTest, TestHashPruning) {
   // CREATE TABLE t
   // (a INT8, b INT8, c INT8)
   // PRIMARY KEY (a, b, c)
@@ -489,14 +506,17 @@ TEST(TestPartitionPruner, TestHashPruning) {
 
   // Applies the specified predicates to a scan and checks that the expected
   // number of partitions are pruned.
-  auto Check = [&] (const vector<ColumnPredicate>& predicates, size_t remaining_tablets) {
+  auto Check = [&] (const vector<ColumnPredicate>& predicates,
+                    size_t remaining_tablets,
+                    size_t pruner_ranges) {
     ScanSpec spec;
 
     for (const auto& pred : predicates) {
       spec.AddPredicate(pred);
     }
 
-    CheckPrunedPartitions(schema, partition_schema, partitions, spec, remaining_tablets);
+    CheckPrunedPartitions(schema, partition_schema, partitions, spec,
+                          remaining_tablets, pruner_ranges);
   };
 
   int8_t zero = 0;
@@ -504,30 +524,30 @@ TEST(TestPartitionPruner, TestHashPruning) {
   int8_t two = 2;
 
   // No Bounds
-  Check({}, 4);
+  Check({}, 4, 1);
 
   // a = 0;
-  Check({ ColumnPredicate::Equality(schema.column(0), &zero) }, 2);
+  Check({ ColumnPredicate::Equality(schema.column(0), &zero) }, 2, 1);
 
   // a >= 0;
-  Check({ ColumnPredicate::Range(schema.column(0), &zero, nullptr) }, 4);
+  Check({ ColumnPredicate::Range(schema.column(0), &zero, nullptr) }, 4, 1);
 
   // a >= 0;
   // a < 1;
-  Check({ ColumnPredicate::Range(schema.column(0), &zero, &one) }, 2);
+  Check({ ColumnPredicate::Range(schema.column(0), &zero, &one) }, 2, 1);
 
   // a >= 0;
   // a < 2;
-  Check({ ColumnPredicate::Range(schema.column(0), &zero, &two) }, 4);
+  Check({ ColumnPredicate::Range(schema.column(0), &zero, &two) }, 4, 1);
 
   // b = 1;
-  Check({ ColumnPredicate::Equality(schema.column(1), &one) }, 4);
+  Check({ ColumnPredicate::Equality(schema.column(1), &one) }, 4, 1);
 
   // b = 1;
   // c = 2;
   Check({ ColumnPredicate::Equality(schema.column(1), &one),
           ColumnPredicate::Equality(schema.column(2), &two) },
-        2);
+        2, 2);
 
   // a = 0;
   // b = 1;
@@ -535,10 +555,10 @@ TEST(TestPartitionPruner, TestHashPruning) {
   Check({ ColumnPredicate::Equality(schema.column(0), &zero),
           ColumnPredicate::Equality(schema.column(1), &one),
           ColumnPredicate::Equality(schema.column(2), &two) },
-        1);
+        1, 1);
 }
 
-TEST(TestPartitionPruner, TestInListHashPruning) {
+TEST_F(PartitionPrunerTest, TestInListHashPruning) {
   // CREATE TABLE t
   // (a INT8, b INT8, c INT8)
   // PRIMARY KEY (a, b, c)
@@ -574,14 +594,17 @@ TEST(TestPartitionPruner, TestInListHashPruning) {
 
   // Applies the specified predicates to a scan and checks that the expected
   // number of partitions are pruned.
-  auto Check = [&] (const vector<ColumnPredicate>& predicates, size_t remaining_tablets) {
+  auto Check = [&] (const vector<ColumnPredicate>& predicates,
+                    size_t remaining_tablets,
+                    size_t pruner_ranges) {
     ScanSpec spec;
 
     for (const auto& pred : predicates) {
       spec.AddPredicate(pred);
     }
 
-    CheckPrunedPartitions(schema, partition_schema, partitions, spec, remaining_tablets);
+    CheckPrunedPartitions(schema, partition_schema, partitions, spec,
+                          remaining_tablets, pruner_ranges);
   };
 
   // zero, one, eight are in different buckets when bucket number is 3 and seed is 0.
@@ -595,26 +618,26 @@ TEST(TestPartitionPruner, TestInListHashPruning) {
 
   // a in [0, 1];
   a_values = { &zero, &one };
-  Check({ ColumnPredicate::InList(schema.column(0), &a_values) }, 18);
+  Check({ ColumnPredicate::InList(schema.column(0), &a_values) }, 18, 2);
 
   // a in [0, 1, 8];
   a_values = { &zero, &one, &eight };
-  Check({ ColumnPredicate::InList(schema.column(0), &a_values) }, 27);
+  Check({ ColumnPredicate::InList(schema.column(0), &a_values) }, 27, 1);
 
   // b in [0, 1]
   b_values = { &zero, &one };
-  Check({ ColumnPredicate::InList(schema.column(1), &b_values) }, 18);
+  Check({ ColumnPredicate::InList(schema.column(1), &b_values) }, 18, 6);
 
   // c in [0, 1]
   c_values = { &zero, &one };
-  Check({ ColumnPredicate::InList(schema.column(2), &c_values) }, 18);
+  Check({ ColumnPredicate::InList(schema.column(2), &c_values) }, 18, 18);
 
   // b in [0, 1], c in [0, 1]
   b_values = { &zero, &one };
   c_values = { &zero, &one };
   Check({ ColumnPredicate::InList(schema.column(1), &b_values),
           ColumnPredicate::InList(schema.column(2), &c_values) },
-        12);
+        12, 12);
 
   //a in [0, 1], b in [0, 1], c in [0, 1]
   a_values = { &zero, &one };
@@ -623,10 +646,10 @@ TEST(TestPartitionPruner, TestInListHashPruning) {
   Check({ ColumnPredicate::InList(schema.column(0), &a_values),
           ColumnPredicate::InList(schema.column(1), &b_values),
           ColumnPredicate::InList(schema.column(2), &c_values) },
-        8);
+        8, 8);
 }
 
-TEST(TestPartitionPruner, TestMultiColumnInListHashPruning) {
+TEST_F(PartitionPrunerTest, TestMultiColumnInListHashPruning) {
   // CREATE TABLE t
   // (a INT8, b INT8, c INT8)
   // PRIMARY KEY (a, b, c)
@@ -658,14 +681,17 @@ TEST(TestPartitionPruner, TestMultiColumnInListHashPruning) {
 
   // Applies the specified predicates to a scan and checks that the expected
   // number of partitions are pruned.
-  auto Check = [&] (const vector<ColumnPredicate>& predicates, size_t remaining_tablets) {
+  auto Check = [&] (const vector<ColumnPredicate>& predicates,
+                    size_t remaining_tablets,
+                    size_t pruner_ranges) {
     ScanSpec spec;
 
     for (const auto& pred : predicates) {
       spec.AddPredicate(pred);
     }
 
-    CheckPrunedPartitions(schema, partition_schema, partitions, spec, remaining_tablets);
+    CheckPrunedPartitions(schema, partition_schema, partitions, spec,
+                          remaining_tablets, pruner_ranges);
   };
 
   // zero, one, eight are in different buckets when bucket number is 3 and seed is 0.
@@ -679,19 +705,19 @@ TEST(TestPartitionPruner, TestMultiColumnInListHashPruning) {
 
   // a in [0, 1];
   a_values = { &zero, &one };
-  Check({ ColumnPredicate::InList(schema.column(0), &a_values) }, 6);
+  Check({ ColumnPredicate::InList(schema.column(0), &a_values) }, 6, 2);
 
   // a in [0, 1, 8];
   a_values = { &zero, &one, &eight };
-  Check({ ColumnPredicate::InList(schema.column(0), &a_values) }, 9);
+  Check({ ColumnPredicate::InList(schema.column(0), &a_values) }, 9, 1);
 
   // b in [0, 1]
   b_values = { &zero, &one };
-  Check({ ColumnPredicate::InList(schema.column(1), &b_values) }, 9);
+  Check({ ColumnPredicate::InList(schema.column(1), &b_values) }, 9, 1);
 
   // c in [0, 1]
   c_values = { &zero, &one };
-  Check({ ColumnPredicate::InList(schema.column(2), &c_values) }, 9);
+  Check({ ColumnPredicate::InList(schema.column(2), &c_values) }, 9, 1);
 
   // b in [0, 1], c in [0, 1]
   // (0, 0) in bucket 2
@@ -702,19 +728,19 @@ TEST(TestPartitionPruner, TestMultiColumnInListHashPruning) {
   c_values = { &zero, &one };
   Check({ ColumnPredicate::InList(schema.column(1), &b_values),
           ColumnPredicate::InList(schema.column(2), &c_values) },
-        9);
+        9, 1);
 
   // b = 0, c in [0, 1]
   c_values = { &zero, &one };
   Check({ ColumnPredicate::Equality(schema.column(1), &zero),
           ColumnPredicate::InList(schema.column(2), &c_values) },
-        3);
+        3, 3);
 
   // b = 1, c in [0, 1]
   c_values = { &zero, &one };
   Check({ ColumnPredicate::Equality(schema.column(1), &one),
           ColumnPredicate::InList(schema.column(2), &c_values) },
-        6);
+        6, 6);
 
   //a in [0, 1], b in [0, 1], c in [0, 1]
   a_values = { &zero, &one };
@@ -723,15 +749,16 @@ TEST(TestPartitionPruner, TestMultiColumnInListHashPruning) {
   Check({ ColumnPredicate::InList(schema.column(0), &a_values),
           ColumnPredicate::InList(schema.column(1), &b_values),
           ColumnPredicate::InList(schema.column(2), &c_values) },
-        6);
+        6, 2);
 }
 
-TEST(TestPartitionPruner, TestPruning) {
+TEST_F(PartitionPrunerTest, TestPruning) {
   // CREATE TABLE timeseries
   // (host STRING, metric STRING, time UNIXTIME_MICROS, value DOUBLE)
   // PRIMARY KEY (host, metric, time)
-  // DISTRIBUTE BY RANGE(time) SPLIT ROWS [(10)],
-  //               HASH(host, metric) INTO 2 BUCKETS;
+  // PARTITION BY RANGE (time) (PARTITION VALUES < 10,
+  //                            PARTITION VALUES >= 10)
+  //              HASH (host, metric) 2 PARTITIONS;
   Schema schema({ ColumnSchema("host", STRING),
                   ColumnSchema("metric", STRING),
                   ColumnSchema("time", UNIXTIME_MICROS),
@@ -763,7 +790,8 @@ TEST(TestPartitionPruner, TestPruning) {
   auto Check = [&] (const vector<ColumnPredicate>& predicates,
                     string lower_bound_partition_key,
                     string upper_bound_partition_key,
-                    size_t remaining_tablets) {
+                    size_t remaining_tablets,
+                    size_t pruner_ranges) {
     ScanSpec spec;
 
     spec.SetLowerBoundPartitionKey(lower_bound_partition_key);
@@ -772,7 +800,8 @@ TEST(TestPartitionPruner, TestPruning) {
       spec.AddPredicate(pred);
     }
 
-    CheckPrunedPartitions(schema, partition_schema, partitions, spec, remaining_tablets);
+    CheckPrunedPartitions(schema, partition_schema, partitions, spec,
+                          remaining_tablets, pruner_ranges);
   };
 
   Slice a = "a";
@@ -788,7 +817,7 @@ TEST(TestPartitionPruner, TestPruning) {
           ColumnPredicate::Equality(schema.column(1), &a),
           ColumnPredicate::Range(schema.column(2), &nine, nullptr) },
         "", "",
-        2);
+        2, 1);
 
   // host = "a"
   // metric = "a"
@@ -798,7 +827,7 @@ TEST(TestPartitionPruner, TestPruning) {
           ColumnPredicate::Equality(schema.column(1), &a),
           ColumnPredicate::Range(schema.column(2), &ten, &twenty) },
         "", "",
-        1);
+        1, 1);
 
   // host = "a"
   // metric = "a"
@@ -807,7 +836,7 @@ TEST(TestPartitionPruner, TestPruning) {
           ColumnPredicate::Equality(schema.column(1), &a),
           ColumnPredicate::Range(schema.column(2), nullptr, &ten) },
         "", "",
-        1);
+        1, 1);
 
   // host = "a"
   // metric = "a"
@@ -816,7 +845,7 @@ TEST(TestPartitionPruner, TestPruning) {
           ColumnPredicate::Equality(schema.column(1), &a),
           ColumnPredicate::Range(schema.column(2), &ten, nullptr) },
         "", "",
-        1);
+        1, 1);
 
   // host = "a"
   // metric = "a"
@@ -825,22 +854,22 @@ TEST(TestPartitionPruner, TestPruning) {
           ColumnPredicate::Equality(schema.column(1), &a),
           ColumnPredicate::Equality(schema.column(2), &ten) },
         "", "",
-        1);
+        1, 1);
 
   // partition key < (hash=1)
-  Check({}, "", string("\0\0\0\1", 4), 2);
+  Check({}, "", string("\0\0\0\1", 4), 2, 1);
 
   // partition key >= (hash=1)
-  Check({}, string("\0\0\0\1", 4), "", 2);
+  Check({}, string("\0\0\0\1", 4), "", 2, 1);
 
   // timestamp = 10
   // partition key < (hash=1)
   Check({ ColumnPredicate::Equality(schema.column(2), &ten) },
-        "", string("\0\0\0\1", 4), 1);
+        "", string("\0\0\0\1", 4), 1, 1);
 
   // timestamp = 10
   // partition key >= (hash=1)
   Check({ ColumnPredicate::Equality(schema.column(2), &ten) },
-        string("\0\0\0\1", 4), "", 1);
+        string("\0\0\0\1", 4), "", 1, 1);
 }
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f0677f8/src/kudu/common/partition_pruner.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/partition_pruner.cc b/src/kudu/common/partition_pruner.cc
index f3b5b6f..ac157a9 100644
--- a/src/kudu/common/partition_pruner.cc
+++ b/src/kudu/common/partition_pruner.cc
@@ -250,10 +250,10 @@ void PartitionPruner::Init(const Schema& schema,
   // examples above:
   //
   // 1) The partition keys are truncated after the final constrained component
-  //    (hash bucket components are constrained when the scan is limited to some
-  //    bucket via equality or in-list predicates on that component, while range
-  //    components are constrained if they have an upper or lower bound via
-  //    range or equality predicates on that component).
+  //    Hash bucket components are constrained when the scan is limited to a
+  //    subset of buckets via equality or in-list predicates on that component.
+  //    Range components are constrained if they have an upper or lower bound
+  //    via range or equality predicates on that component.
   //
   // 2) If the final constrained component is a hash bucket, then the
   //    corresponding bucket in the upper bound is incremented in order to make
@@ -262,8 +262,12 @@ void PartitionPruner::Init(const Schema& schema,
   // 3) The number of partition key ranges in the result is equal to the product
   //    of the number of buckets of each unconstrained hash component which come
   //    before a final constrained component. If there are no unconstrained hash
-  //    components and no in-list predicates, then the number of
-  //    partition key ranges is one.
+  //    components, then the number of resulting partition key ranges is one. Note
+  //    that this can be a lot of ranges, and we may find we need to limit the
+  //    algorithm to give up on pruning if the number of ranges exceeds a limit.
+  //    Until this becomes a problem in practice, we'll continue always pruning,
+  //    since it is precisely these highly-hash-partitioned tables which get the
+  //    most benefit from pruning.
 
   // Step 1: Build the range portion of the partition key.
   string range_lower_bound;
@@ -327,7 +331,7 @@ void PartitionPruner::Init(const Schema& schema,
                                  find_if(hash_bucket_bitsets.rbegin(),
                                          hash_bucket_bitsets.rend(),
                                          [] (const vector<bool>& x) {
-                                           return std::find(x.begin(), x.end(), true) != x.end();
+                                           return std::find(x.begin(), x.end(), false) != x.end();
                                          }));
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f0677f8/src/kudu/common/partition_pruner.h
----------------------------------------------------------------------
diff --git a/src/kudu/common/partition_pruner.h b/src/kudu/common/partition_pruner.h
index 6413dd5..6effe31 100644
--- a/src/kudu/common/partition_pruner.h
+++ b/src/kudu/common/partition_pruner.h
@@ -61,6 +61,11 @@ class PartitionPruner {
   // Returns true if the provided partition should be pruned.
   bool ShouldPrune(const Partition& partition) const;
 
+  // Returns the number of partition key ranges remaining in the scan.
+  size_t NumRangesRemainingForTests() const {
+    return partition_key_ranges_.size();
+  }
+
   // Returns a text description of this partition pruner suitable for debug
   // printing.
   std::string ToString(const Schema& schema, const PartitionSchema& partition_schema) const;

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f0677f8/src/kudu/common/scan_spec.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/scan_spec.cc b/src/kudu/common/scan_spec.cc
index 46acc08..9251972 100644
--- a/src/kudu/common/scan_spec.cc
+++ b/src/kudu/common/scan_spec.cc
@@ -58,7 +58,7 @@ bool ScanSpec::CanShortCircuit() const {
   if (lower_bound_key_ &&
       exclusive_upper_bound_key_ &&
       lower_bound_key_->encoded_key().compare(exclusive_upper_bound_key_->encoded_key()) >= 0) {
-    return false;
+    return true;
   }
 
   return any_of(predicates_.begin(), predicates_.end(),