You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/11/09 16:22:35 UTC

svn commit: r1540328 - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/exec/ java/org/apache/hadoop/hive/ql/exec/vector/ java/org/apache/hadoop/hive/ql/io/ java/org/apache/hadoop/hive/ql/optimizer/ test/queries/clientpositive/ test/results/client...

Author: hashutosh
Date: Sat Nov  9 15:22:35 2013
New Revision: 1540328

URL: http://svn.apache.org/r1540328
Log:
HIVE-5657 : TopN produces incorrect results with count(distinct) (Sergey Shelukhin via Ashutosh Chauhan)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java
    hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown.q
    hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown_negative.q
    hive/trunk/ql/src/test/results/clientpositive/limit_pushdown.q.out
    hive/trunk/ql/src/test/results/clientpositive/limit_pushdown_negative.q.out
    hive/trunk/ql/src/test/results/clientpositive/vectorization_limit.q.out

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1540328&r1=1540327&r2=1540328&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Sat Nov  9 15:22:35 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.Serializer;
 import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -46,6 +47,7 @@ import org.apache.hadoop.io.BinaryCompar
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Text;
+// import org.apache.hadoop.util.StringUtils;
 
 /**
  * Reduce Sink Operator sends output to the reduce stage.
@@ -153,8 +155,8 @@ public class ReduceSinkOperator extends 
   transient InspectableObject tempInspectableObject = new InspectableObject();
   protected transient HiveKey keyWritable = new HiveKey();
 
-  transient StructObjectInspector keyObjectInspector;
-  transient StructObjectInspector valueObjectInspector;
+  protected transient ObjectInspector keyObjectInspector;
+  protected transient ObjectInspector valueObjectInspector;
   transient ObjectInspector[] partitionObjectInspectors;
 
   protected transient Object[] cachedValues;
@@ -173,6 +175,7 @@ public class ReduceSinkOperator extends 
    * in this case, child GBY evaluates distict values with expression like KEY.col2:0.dist1
    * see {@link ExprNodeColumnEvaluator}
    */
+  // TODO: we only ever use one row of these at a time. Why do we need to cache multiple?
   protected transient Object[][] cachedKeys;
   boolean firstRow;
   protected transient Random random;
@@ -237,51 +240,41 @@ public class ReduceSinkOperator extends 
             .getOutputValueColumnNames(), rowInspector);
         partitionObjectInspectors = initEvaluators(partitionEval, rowInspector);
         int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1;
-        int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 :
-          numDistributionKeys;
+        int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 : numDistributionKeys;
         cachedKeys = new Object[numKeys][keyLen];
         cachedValues = new Object[valueEval.length];
       }
 
-      // Evaluate the keys
-      for (int i = 0; i < numDistributionKeys; i++) {
-        cachedKeys[0][i] = keyEval[i].evaluate(row);
-      }
+      // Determine distKeyLength (w/o distincts), and then add the first if present.
+      populateCachedDistributionKeys(row, 0);
+      HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null);
+      int distKeyLength = firstKey.getDistKeyLength();
       if (numDistinctExprs > 0) {
-        // with distinct key(s)
-        for (int i = 0; i < numDistinctExprs; i++) {
-          if (i > 0) {
-            System.arraycopy(cachedKeys[0], 0, cachedKeys[i], 0, numDistributionKeys);
-          }
-          StandardUnion union = (StandardUnion) cachedKeys[i][numDistributionKeys];
-          if (union == null) {
-            cachedKeys[i][numDistributionKeys] =
-              union = new StandardUnion((byte)i, new Object[distinctColIndices.get(i).size()]);
-          }
-          Object[] distinctParameters = (Object[]) union.getObject();
-          for (int j = 0; j < distinctParameters.length; j++) {
-            distinctParameters[j] =
-              keyEval[distinctColIndices.get(i).get(j)].evaluate(row);
-          }
-          union.setTag((byte) i);
-        }
+        populateCachedDistinctKeys(row, 0);
+        firstKey = toHiveKey(cachedKeys[0], tag, distKeyLength);
       }
 
-      for (int i = 0; i < cachedKeys.length; i++) {
-        // Serialize the keys and append the tag
-        Object keyObj = keySerializer.serialize(cachedKeys[i], keyObjectInspector);
-        setKeyWritable(keyIsText ? (Text)keyObj : (BytesWritable)keyObj, tag);
-        int topNIndex = reducerHash.tryStoreKey(keyWritable);
-        if (TopNHash.EXCLUDED == topNIndex) continue;
-        int keyHashCode = computeHashCode(row);
-        BytesWritable valueWritable = getValue(row);
-        if (TopNHash.FORWARD == topNIndex) {
-          keyWritable.setHashCode(keyHashCode);
-          collect(keyWritable, valueWritable);
-          continue;
-        }
-        assert topNIndex >= 0;
-        reducerHash.storeValue(topNIndex, valueWritable, keyHashCode, false);
+      // Try to store the first key. If it's not excluded, we will proceed.
+      int firstIndex = reducerHash.tryStoreKey(firstKey);
+      if (firstIndex == TopNHash.EXCLUDE) return; // Nothing to do.
+      // Compute value and hashcode - we'd either store or forward them.
+      BytesWritable value = makeValueWritable(row);
+      int hashCode = computeHashCode(row);
+      if (firstIndex == TopNHash.FORWARD) {
+        firstKey.setHashCode(hashCode);
+        collect(firstKey, value);
+      } else {
+        assert firstIndex >= 0;
+        reducerHash.storeValue(firstIndex, value, hashCode, false);
+      }
+
+      // All other distinct keys will just be forwarded. This could be optimized...
+      for (int i = 1; i < numDistinctExprs; i++) {
+        System.arraycopy(cachedKeys[0], 0, cachedKeys[i], 0, numDistributionKeys);
+        populateCachedDistinctKeys(row, i);
+        HiveKey hiveKey = toHiveKey(cachedKeys[i], tag, distKeyLength);
+        hiveKey.setHashCode(hashCode);
+        collect(hiveKey, value);
       }
     } catch (HiveException e) {
       throw e;
@@ -290,14 +283,38 @@ public class ReduceSinkOperator extends 
     }
   }
 
+  private void populateCachedDistributionKeys(Object row, int index) throws HiveException {
+    for (int i = 0; i < numDistributionKeys; i++) {
+      cachedKeys[index][i] = keyEval[i].evaluate(row);
+    }
+    if (cachedKeys[0].length > numDistributionKeys) {
+      cachedKeys[index][numDistributionKeys] = null;
+    }
+  }
+
+  /**
+   * Populate distinct keys part of cachedKeys for a particular row.
+   * @param row the row
+   * @param index the cachedKeys index to write to
+   */
+  private void populateCachedDistinctKeys(Object row, int index) throws HiveException {
+    StandardUnion union;
+    cachedKeys[index][numDistributionKeys] = union = new StandardUnion(
+          (byte)index, new Object[distinctColIndices.get(index).size()]);
+    Object[] distinctParameters = (Object[]) union.getObject();
+    for (int distinctParamI = 0; distinctParamI < distinctParameters.length; distinctParamI++) {
+      distinctParameters[distinctParamI] =
+          keyEval[distinctColIndices.get(index).get(distinctParamI)].evaluate(row);
+    }
+    union.setTag((byte) index);
+  }
+
   private int computeHashCode(Object row) throws HiveException {
     // Evaluate the HashCode
     int keyHashCode = 0;
     if (partitionEval.length == 0) {
-      // If no partition cols, just distribute the data uniformly to provide
-      // better
-      // load balance. If the requirement is to have a single reducer, we
-      // should set
+      // If no partition cols, just distribute the data uniformly to provide better
+      // load balance. If the requirement is to have a single reducer, we should set
       // the number of reducers to 1.
       // Use a constant seed to make the code deterministic.
       if (random == null) {
@@ -314,15 +331,19 @@ public class ReduceSinkOperator extends 
     return keyHashCode;
   }
 
-  protected void setKeyWritable(BinaryComparable key, int tag) {
+  // Serialize the keys and append the tag
+  protected HiveKey toHiveKey(Object obj, int tag, Integer distLength) throws SerDeException {
+    BinaryComparable key = (BinaryComparable)keySerializer.serialize(obj, keyObjectInspector);
+    int keyLength = key.getLength();
     if (tag == -1) {
-      keyWritable.set(key.getBytes(), 0, key.getLength());
+      keyWritable.set(key.getBytes(), 0, keyLength);
     } else {
-      int keyLength = key.getLength();
       keyWritable.setSize(keyLength + 1);
       System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
       keyWritable.get()[keyLength] = tagByte[0];
     }
+    keyWritable.setDistKeyLength((distLength == null) ? keyLength : distLength);
+    return keyWritable;
   }
 
   public void collect(byte[] key, byte[] value, int hash) throws IOException {
@@ -331,11 +352,6 @@ public class ReduceSinkOperator extends 
     collect(keyWritable, valueWritable);
   }
 
-  protected void collect(byte[] key, Writable valueWritable, int hash) throws IOException {
-    HiveKey keyWritable = new HiveKey(key, hash);
-    collect(keyWritable, valueWritable);
-  }
-
   protected void collect(BytesWritable keyWritable, Writable valueWritable) throws IOException {
     // Since this is a terminal operator, update counters explicitly -
     // forward is not called
@@ -351,7 +367,7 @@ public class ReduceSinkOperator extends 
     }
   }
 
-  private BytesWritable getValue(Object row) throws Exception {
+  private BytesWritable makeValueWritable(Object row) throws Exception {
     // Evaluate the value
     for (int i = 0; i < valueEval.length; i++) {
       cachedValues[i] = valueEval[i].evaluate(row);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java?rev=1540328&r1=1540327&r2=1540328&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java Sat Nov  9 15:22:35 2013
@@ -31,6 +31,8 @@ import com.google.common.collect.MinMaxP
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.io.BinaryComparable;
 import org.apache.hadoop.io.BytesWritable;
@@ -51,11 +53,9 @@ public class TopNHash {
     public void collect(byte[] key, byte[] value, int hash) throws IOException;
   }
 
-  public static final int FORWARD = -1;
-  public static final int EXCLUDED = -2;
-  private static final int FLUSH = -3;
-  private static final int DISABLE = -4;
-  private static final int MAY_FORWARD = -5;
+  public static final int FORWARD = -1; // Forward the row to reducer as is.
+  public static final int EXCLUDE = -2; // Discard the row.
+  private static final int MAY_FORWARD = -3; // Vectorized - may forward the row, not sure yet.
 
   private BinaryCollector collector;
   private int topN;
@@ -67,14 +67,17 @@ public class TopNHash {
   private byte[][] keys;
   private byte[][] values;
   private int[] hashes;
+  private int[] distKeyLengths;
   private IndexStore indexes; // The heap over the keys, storing indexes in the array.
 
   private int evicted; // recently evicted index (used for next key/value)
   private int excluded; // count of excluded rows from previous flush
 
-  // temporary stuff used for vectorization
+  // temporary single-batch context used for vectorization
   private int batchNumForwards = 0; // whether current batch has any forwarded keys
   private int[] indexToBatchIndex; // mapping of index (lined up w/keys) to index in the batch
+  private int[] batchIndexToResult; // mapping of index in the batch (linear) to hash result
+  private int batchSize; // Size of the current batch.
 
   private boolean isEnabled = false;
 
@@ -82,7 +85,9 @@ public class TopNHash {
     public int compare(Integer o1, Integer o2) {
       byte[] key1 = keys[o1];
       byte[] key2 = keys[o2];
-      return WritableComparator.compareBytes(key1, 0, key1.length, key2, 0, key2.length);
+      int length1 = distKeyLengths[o1];
+      int length2 = distKeyLengths[o2];
+      return WritableComparator.compareBytes(key1, 0, length1, key2, 0, length2);
     }
   };
 
@@ -107,6 +112,7 @@ public class TopNHash {
     this.keys = new byte[topN + 1][];
     this.values = new byte[topN + 1][];
     this.hashes = new int[topN + 1];
+    this.distKeyLengths = new int[topN + 1];
     this.evicted = topN;
     this.isEnabled = true;
   }
@@ -118,12 +124,12 @@ public class TopNHash {
    *         TopNHash.EXCLUDED if the row should be discarded;
    *         any other number if the row is to be stored; the index should be passed to storeValue.
    */
-  public int tryStoreKey(BytesWritable key) throws HiveException, IOException {
+  public int tryStoreKey(HiveKey key) throws HiveException, IOException {
     if (!isEnabled) {
       return FORWARD; // short-circuit quickly - forward all rows
     }
     if (topN == 0) {
-      return EXCLUDED; // short-circuit quickly - eat all rows
+      return EXCLUDE; // short-circuit quickly - eat all rows
     }
     int index = insertKeyIntoHeap(key);
     if (index >= 0) {
@@ -132,21 +138,8 @@ public class TopNHash {
     }
     // IndexStore is trying to tell us something.
     switch (index) {
-      case DISABLE: {
-        LOG.info("Top-N hash is disabled");
-        flushInternal();
-        isEnabled = false;
-        return FORWARD;
-      }
-      case FLUSH: {
-        LOG.info("Top-N hash is flushed");
-        flushInternal();
-        // we can now retry adding key/value into hash, which is flushed.
-        // but for simplicity, just forward them
-        return FORWARD;
-      }
       case FORWARD:  return FORWARD;
-      case EXCLUDED: return EXCLUDED; // skip the row.
+      case EXCLUDE: return EXCLUDE; // skip the row.
       default: {
         assert false;
         throw new HiveException("Invalid result trying to store the key: " + index);
@@ -157,15 +150,16 @@ public class TopNHash {
 
   /**
    * Perform basic checks and initialize TopNHash for the new vectorized row batch.
+   * @param size batch size
    * @return TopNHash.FORWARD if all rows should be forwarded w/o trying to call TopN;
    *         TopNHash.EXCLUDED if all rows should be discarded w/o trying to call TopN;
    *         any other result means the batch has been started.
    */
-  public int startVectorizedBatch() throws IOException, HiveException {
+  public int startVectorizedBatch(int size) throws IOException, HiveException {
     if (!isEnabled) {
       return FORWARD; // short-circuit quickly - forward all rows
     } else if (topN == 0) {
-      return EXCLUDED; // short-circuit quickly - eat all rows
+      return EXCLUDE; // short-circuit quickly - eat all rows
     }
     // Flush here if the memory usage is too high. After that, we have the entire
     // batch already in memory anyway so we will bypass the memory checks.
@@ -179,8 +173,13 @@ public class TopNHash {
         return FORWARD; // Hash is ineffective, disable.
       }
     }
+    // Started ok; initialize context for new batch.
+    batchSize = size;
+    if (batchIndexToResult == null || batchIndexToResult.length < batchSize) {
+      batchIndexToResult = new int[Math.max(batchSize, VectorizedRowBatch.DEFAULT_SIZE)];
+    }
     if (indexToBatchIndex == null) {
-      indexToBatchIndex = new int[topN + 1]; // for current batch, contains key index in the batch
+      indexToBatchIndex = new int[topN + 1];
     }
     Arrays.fill(indexToBatchIndex, -1);
     batchNumForwards = 0;
@@ -191,33 +190,28 @@ public class TopNHash {
    * Try to put the key from the current vectorized batch into the heap.
    * @param key the key.
    * @param batchIndex The index of the key in the vectorized batch (sequential, not .selected).
-   * @param results The results; the number of elements equivalent to vrg.size, by kindex.
-   *   The result should be the same across the calls for the batch; in then end, for each k-index:
-   *     - TopNHash.EXCLUDED - discard the row.
-   *     - positive index - store the row using storeValue, same as tryStoreRow.
-   *     - negative index - forward the row. getVectorizedKeyToForward called w/this index will
-   *        return the key to use so it doesn't have to be rebuilt.
    */
-  public void tryStoreVectorizedKey(BytesWritable key, int batchIndex, int[] results)
-          throws HiveException, IOException {
+  public void tryStoreVectorizedKey(HiveKey key, int batchIndex)
+      throws HiveException, IOException {
     // Assumption - batchIndex is increasing; startVectorizedBatch was called
     int size = indexes.size();
     int index = size < topN ? size : evicted;
     keys[index] = Arrays.copyOf(key.getBytes(), key.getLength());
+    distKeyLengths[index] = key.getDistKeyLength();
     Integer collisionIndex = indexes.store(index);
     if (null != collisionIndex) {
       // forward conditional on the survival of the corresponding key currently in indexes.
       ++batchNumForwards;
-      results[batchIndex] = MAY_FORWARD - collisionIndex;
+      batchIndexToResult[batchIndex] = MAY_FORWARD - collisionIndex;
       return;
     }
     indexToBatchIndex[index] = batchIndex;
-    results[batchIndex] = index;
+    batchIndexToResult[batchIndex] = index;
     if (size != topN) return;
     evicted = indexes.removeBiggest();  // remove the biggest key
     if (index == evicted) {
       excluded++;
-      results[batchIndex] = EXCLUDED;
+      batchIndexToResult[batchIndex] = EXCLUDE;
       indexToBatchIndex[index] = -1;
       return; // input key is bigger than any of keys in hash
     }
@@ -225,36 +219,54 @@ public class TopNHash {
     int evictedBatchIndex = indexToBatchIndex[evicted];
     if (evictedBatchIndex >= 0) {
       // reset the result for the evicted index
-      results[evictedBatchIndex] = EXCLUDED;
+      batchIndexToResult[evictedBatchIndex] = EXCLUDE;
       indexToBatchIndex[evicted] = -1;
     }
-    // Also evict all results grouped with this index; cannot be current key or before it.
-    if (batchNumForwards > 0) {
-      int evictedForward = (MAY_FORWARD - evicted);
-      boolean forwardRemoved = false;
-      for (int i = evictedBatchIndex + 1; i < batchIndex; ++i) {
-        if (results[i] == evictedForward) {
-          results[i] = EXCLUDED;
-          forwardRemoved = true;
-        }
-      }
-      if (forwardRemoved) {
+    // Evict all results grouped with this index; it cannot be any key further in the batch.
+    // If we evict a key from this batch, the keys grouped with it cannot be earlier that that key.
+    // If we evict a key that is not from this batch, initial i = (-1) + 1 = 0, as intended.
+    int evictedForward = (MAY_FORWARD - evicted);
+    for (int i = evictedBatchIndex + 1; i < batchIndex && (batchNumForwards > 0); ++i) {
+      if (batchIndexToResult[i] == evictedForward) {
+        batchIndexToResult[i] = EXCLUDE;
         --batchNumForwards;
       }
     }
   }
 
   /**
+   * Get vectorized batch result for particular index.
+   * @param batchIndex index of the key in the batch.
+   * @return the result, same as from {@link #tryStoreKey(HiveKey)}
+   */
+  public int getVectorizedBatchResult(int batchIndex) {
+    int result = batchIndexToResult[batchIndex];
+    return (result <= MAY_FORWARD) ? FORWARD : result;
+  }
+
+  /**
    * After vectorized batch is processed, can return the key that caused a particular row
    * to be forwarded. Because the row could only be marked to forward because it has
    * the same key with some row already in the heap (for GBY), we can use that key from the
    * heap to emit the forwarded row.
-   * @param index Negative index from the vectorized result. See tryStoreVectorizedKey.
-   * @return The key corresponding to the row.
+   * @param batchIndex index of the key in the batch.
+   * @return The key corresponding to the index.
    */
-  public byte[] getVectorizedKeyToForward(int index) {
-    assert index <= MAY_FORWARD;
-    return keys[MAY_FORWARD - index];
+  public HiveKey getVectorizedKeyToForward(int batchIndex) {
+    int index = MAY_FORWARD - batchIndexToResult[batchIndex];
+    HiveKey hk = new HiveKey();
+    hk.set(keys[index], 0, keys[index].length);
+    hk.setDistKeyLength(distKeyLengths[index]);
+    return hk;
+  }
+
+  /**
+   * After vectorized batch is processed, can return distribution keys length of a key.
+   * @param batchIndex index of the key in the batch.
+   * @return The distribution length corresponding to the key.
+   */
+  public int getVectorizedKeyDistLength(int batchIndex) {
+    return distKeyLengths[batchIndexToResult[batchIndex]];
   }
 
   /**
@@ -289,16 +301,22 @@ public class TopNHash {
    * <p/>
    * -1 for FORWARD   : should be forwarded to output collector (for GBY)
    * -2 for EXCLUDED  : not in top-k. ignore it
-   * -3 for FLUSH     : memory is not enough. flush values (keep keys only)
-   * -4 for DISABLE   : hash is not effective. flush and disable it
    */
-  private int insertKeyIntoHeap(BinaryComparable key) {
+  private int insertKeyIntoHeap(HiveKey key) throws IOException, HiveException {
     if (usage > threshold) {
-      return excluded == 0 ? DISABLE : FLUSH;
+      flushInternal();
+      if (excluded == 0) {
+        LOG.info("Top-N hash is disabled");
+        isEnabled = false;
+      }
+      // we can now retry adding key/value into hash, which is flushed.
+      // but for simplicity, just forward them
+      return FORWARD;
     }
     int size = indexes.size();
     int index = size < topN ? size : evicted;
     keys[index] = Arrays.copyOf(key.getBytes(), key.getLength());
+    distKeyLengths[index] = key.getDistKeyLength();
     if (null != indexes.store(index)) {
       // it's only for GBY which should forward all values associated with the key in the range
       // of limit. new value should be attatched with the key but in current implementation,
@@ -310,7 +328,7 @@ public class TopNHash {
       evicted = indexes.removeBiggest();  // remove the biggest key
       if (index == evicted) {
         excluded++;
-        return EXCLUDED;          // input key is bigger than any of keys in hash
+        return EXCLUDE;          // input key is bigger than any of keys in hash
       }
       removed(evicted);
     }
@@ -326,6 +344,7 @@ public class TopNHash {
       values[index] = null;
     }
     hashes[index] = -1;
+    distKeyLengths[index] = -1;
   }
 
   private void flushInternal() throws IOException, HiveException {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java?rev=1540328&r1=1540327&r2=1540328&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java Sat Nov  9 15:22:35 2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.exec.To
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
+import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
@@ -39,9 +40,11 @@ import org.apache.hadoop.hive.serde2.Ser
 import org.apache.hadoop.hive.serde2.Serializer;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+// import org.apache.hadoop.util.StringUtils;
 
 public class VectorReduceSinkOperator extends ReduceSinkOperator {
 
@@ -87,12 +90,6 @@ public class VectorReduceSinkOperator ex
    */
   private transient VectorExpressionWriter[] partitionWriters;
 
-  private transient ObjectInspector keyObjectInspector;
-  private transient ObjectInspector valueObjectInspector;
-  private transient int [] keyHashCode = new int [VectorizedRowBatch.DEFAULT_SIZE];
-
-  private transient int[] hashResult; // the pre-created array for reducerHash results
-
   public VectorReduceSinkOperator(VectorizationContext vContext, OperatorDesc conf)
       throws HiveException {
     this();
@@ -110,7 +107,6 @@ public class VectorReduceSinkOperator ex
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     try {
-
       numDistributionKeys = conf.getNumDistributionKeys();
       distinctColIndices = conf.getDistinctColumnIndices();
       numDistinctExprs = distinctColIndices.size();
@@ -183,7 +179,7 @@ public class VectorReduceSinkOperator ex
         numDistributionKeys;
       cachedKeys = new Object[numKeys][keyLen];
       cachedValues = new Object[valueEval.length];
-      
+
       int tag = conf.getTag();
       tagByte[0] = (byte) tag;
       LOG.info("Using tag = " + tag);
@@ -209,81 +205,84 @@ public class VectorReduceSinkOperator ex
         partitionEval.length));
 
     try {
-
-      for (int i = 0; i < partitionEval.length; i++) {
-        partitionEval[i].evaluate(vrg);
-      }
-
-      // run the vector evaluations
-      for (int i = 0; i < valueEval.length; i++) {
-         valueEval[i].evaluate(vrg);
-      }
       // Evaluate the keys
       for (int i = 0; i < keyEval.length; i++) {
         keyEval[i].evaluate(vrg);
       }
 
-      Object[] distributionKeys = new Object[numDistributionKeys];
-
       // Determine which rows we need to emit based on topN optimization
-      int startResult = reducerHash.startVectorizedBatch();
-      if (startResult == TopNHash.EXCLUDED) {
+      int startResult = reducerHash.startVectorizedBatch(vrg.size);
+      if (startResult == TopNHash.EXCLUDE) {
         return; // TopN wants us to exclude all rows.
       }
-      boolean useTopN = startResult != TopNHash.FORWARD;
-      if (useTopN && (hashResult == null || hashResult.length < vrg.size)) {
-        hashResult = new int[Math.max(vrg.size, VectorizedRowBatch.DEFAULT_SIZE)];
+      // TODO: can we do this later/only for the keys that are needed? E.g. update vrg.selected.
+      for (int i = 0; i < partitionEval.length; i++) {
+        partitionEval[i].evaluate(vrg);
+      }
+      // run the vector evaluations
+      for (int i = 0; i < valueEval.length; i++) {
+         valueEval[i].evaluate(vrg);
       }
 
-      for (int j = 0 ; j < vrg.size; ++j) {
-        int rowIndex = j;
+      boolean useTopN = startResult != TopNHash.FORWARD;
+      // Go thru the batch once. If we are not using TopN, we will forward all things and be done.
+      // If we are using topN, we will make the first key for each row and store/forward it.
+      // Values, hashes and additional distinct rows will be handled in the 2nd pass in that case.
+      for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) {
+        int rowIndex = batchIndex;
         if (vrg.selectedInUse) {
-          rowIndex = vrg.selected[j];
+          rowIndex = vrg.selected[batchIndex];
         }
-        // First, evaluate the key - the way things stand we'd need it regardless.
-        for (int i = 0; i < keyEval.length; i++) {
-          int batchColumn = keyEval[i].getOutputColumn();
-          ColumnVector vectorColumn = vrg.cols[batchColumn];
-          distributionKeys[i] = keyWriters[i].writeValue(vectorColumn, rowIndex);
+        // First, make distrib key components for this row and determine distKeyLength.
+        populatedCachedDistributionKeys(vrg, rowIndex, 0);
+        HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null);
+        int distKeyLength = firstKey.getDistKeyLength();
+        // Add first distinct expression, if any.
+        if (numDistinctExprs > 0) {
+          populateCachedDistinctKeys(vrg, rowIndex, 0);
+          firstKey = toHiveKey(cachedKeys[0], tag, distKeyLength);
         }
-        // no distinct key
-        System.arraycopy(distributionKeys, 0, cachedKeys[0], 0, numDistributionKeys);
-        // TopN is not supported for multi-distinct currently. If we have more cachedKeys
-        // than one for every input key horrible things will happen (OOB error on array likely).
-        assert !useTopN || cachedKeys.length <= 1;
-        for (int i = 0; i < cachedKeys.length; i++) {
-          // Serialize the keys and append the tag.
-          Object keyObj = keySerializer.serialize(cachedKeys[i], keyObjectInspector);
-          setKeyWritable(keyIsText ? (Text)keyObj : (BytesWritable)keyObj, tag);
-          if (useTopN) {
-            reducerHash.tryStoreVectorizedKey(keyWritable, j, hashResult);
-          } else {
-            // No TopN, just forward the key
-            keyWritable.setHashCode(computeHashCode(vrg, rowIndex));
-            collect(keyWritable, makeValueWritable(vrg, rowIndex));
-           }
+
+        if (useTopN) {
+          reducerHash.tryStoreVectorizedKey(firstKey, batchIndex);
+        } else {
+        // No TopN, just forward the first key and all others.
+          int hashCode = computeHashCode(vrg, rowIndex);
+          firstKey.setHashCode(hashCode);
+          BytesWritable value = makeValueWritable(vrg, rowIndex);
+          collect(firstKey, value);
+          forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 0);
         }
       }
 
       if (!useTopN) return; // All done.
 
       // If we use topN, we have called tryStore on every key now. We can process the results.
-      for (int j = 0 ; j < vrg.size; ++j) {
-        int index = hashResult[j];
-        if (index == TopNHash.EXCLUDED) continue;
-        int rowIndex = j;
+      for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) {
+        int result = reducerHash.getVectorizedBatchResult(batchIndex);
+        if (result == TopNHash.EXCLUDE) continue;
+        int rowIndex = batchIndex;
         if (vrg.selectedInUse) {
-          rowIndex = vrg.selected[j];
+          rowIndex = vrg.selected[batchIndex];
         }
-        // Compute everything now - we'd either store it, or forward it.
+        // Compute value and hashcode - we'd either store or forward them.
         int hashCode = computeHashCode(vrg, rowIndex);
         BytesWritable value = makeValueWritable(vrg, rowIndex);
-        if (index < 0) {
-          // Kinda hacky; see getVectorizedKeyToForward javadoc.
-          byte[] key = reducerHash.getVectorizedKeyToForward(index);
-          collect(key, value, hashCode);
+        int distKeyLength = -1;
+        if (result == TopNHash.FORWARD) {
+          HiveKey firstKey = reducerHash.getVectorizedKeyToForward(batchIndex);
+          firstKey.setHashCode(hashCode);
+          distKeyLength = firstKey.getDistKeyLength();
+          collect(firstKey, value);
         } else {
-          reducerHash.storeValue(index, value, hashCode, true);
+          reducerHash.storeValue(result, value, hashCode, true);
+          distKeyLength = reducerHash.getVectorizedKeyDistLength(batchIndex);
+        }
+        // Now forward other the rows if there's multi-distinct (but see TODO in forward...).
+        // Unfortunately, that means we will have to rebuild the cachedKeys. Start at 1.
+        if (numDistinctExprs > 1) {
+          populatedCachedDistributionKeys(vrg, rowIndex, 1);
+          forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 1);
         }
       }
     } catch (SerDeException e) {
@@ -293,6 +292,74 @@ public class VectorReduceSinkOperator ex
     }
   }
 
+  /**
+   * This function creates and forwards all the additional KVs for the multi-distinct case,
+   * after the first (0th) KV pertaining to the row has already been stored or forwarded.
+   * @param vrg the batch
+   * @param rowIndex the row index in the batch
+   * @param hashCode the partitioning hash code to use; same as for the first KV
+   * @param value the value to use; same as for the first KV
+   * @param distKeyLength the distribution key length of the first key; TODO probably extraneous
+   * @param tag the tag
+   * @param baseIndex the index in cachedKeys where the pre-evaluated distribution keys are stored
+   */
+  private void forwardExtraDistinctRows(VectorizedRowBatch vrg, int rowIndex,int hashCode,
+      BytesWritable value, int distKeyLength, int tag, int baseIndex)
+          throws HiveException, SerDeException, IOException {
+    // TODO: We don't have to forward extra distinct rows immediately (same in non-vector) if
+    //       the first key has already been stored. There's few bytes difference between keys
+    //       for different distincts, and the value/etc. are all the same.
+    //       We could store deltas to re-gen extra rows when flushing TopN.
+    for (int i = 1; i < numDistinctExprs; i++) {
+      if (i != baseIndex) {
+        System.arraycopy(cachedKeys[baseIndex], 0, cachedKeys[i], 0, numDistributionKeys);
+      }
+      populateCachedDistinctKeys(vrg, rowIndex, i);
+      HiveKey hiveKey = toHiveKey(cachedKeys[i], tag, distKeyLength);
+      hiveKey.setHashCode(hashCode);
+      collect(hiveKey, value);
+    }
+  }
+
+  /**
+   * Populate distribution keys part of cachedKeys for a particular row from the batch.
+   * @param vrg the batch
+   * @param rowIndex the row index in the batch
+   * @param index the cachedKeys index to write to
+   */
+  private void populatedCachedDistributionKeys(
+      VectorizedRowBatch vrg, int rowIndex, int index) throws HiveException {
+    for (int i = 0; i < numDistributionKeys; i++) {
+      int batchColumn = keyEval[i].getOutputColumn();
+      ColumnVector vectorColumn = vrg.cols[batchColumn];
+      cachedKeys[index][i] = keyWriters[i].writeValue(vectorColumn, rowIndex);
+    }
+    if (cachedKeys[index].length > numDistributionKeys) {
+      cachedKeys[index][numDistributionKeys] = null;
+    }
+  }
+
+  /**
+   * Populate distinct keys part of cachedKeys for a particular row from the batch.
+   * @param vrg the batch
+   * @param rowIndex the row index in the batch
+   * @param index the cachedKeys index to write to
+   */
+  private void populateCachedDistinctKeys(
+      VectorizedRowBatch vrg, int rowIndex, int index) throws HiveException {
+    StandardUnion union;
+    cachedKeys[index][numDistributionKeys] = union = new StandardUnion(
+        (byte)index, new Object[distinctColIndices.get(index).size()]);
+    Object[] distinctParameters = (Object[]) union.getObject();
+    for (int distinctParamI = 0; distinctParamI < distinctParameters.length; distinctParamI++) {
+      int distinctColIndex = distinctColIndices.get(index).get(distinctParamI);
+      int batchColumn = keyEval[distinctColIndex].getOutputColumn();
+      distinctParameters[distinctParamI] =
+          keyWriters[distinctColIndex].writeValue(vrg.cols[batchColumn], rowIndex);
+    }
+    union.setTag((byte) index);
+  }
+
   private BytesWritable makeValueWritable(VectorizedRowBatch vrg, int rowIndex)
       throws HiveException, SerDeException {
     for (int i = 0; i < valueEval.length; i++) {
@@ -308,10 +375,8 @@ public class VectorReduceSinkOperator ex
     // Evaluate the HashCode
     int keyHashCode = 0;
     if (partitionEval.length == 0) {
-      // If no partition cols, just distribute the data uniformly to provide
-      // better
-      // load balance. If the requirement is to have a single reducer, we
-      // should set
+      // If no partition cols, just distribute the data uniformly to provide better
+      // load balance. If the requirement is to have a single reducer, we should set
       // the number of reducers to 1.
       // Use a constant seed to make the code deterministic.
       if (random == null) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java?rev=1540328&r1=1540327&r2=1540328&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java Sat Nov  9 15:22:35 2013
@@ -29,7 +29,10 @@ public class HiveKey extends BytesWritab
 
   private static final int LENGTH_BYTES = 4;
 
-  boolean hashCodeValid;
+  private int hashCode;
+  private boolean hashCodeValid;
+
+  private transient int distKeyLength;
 
   public HiveKey() {
     hashCodeValid = false;
@@ -37,15 +40,13 @@ public class HiveKey extends BytesWritab
 
   public HiveKey(byte[] bytes, int hashcode) {
     super(bytes);
-    myHashCode = hashcode;
+    hashCode = hashcode;
     hashCodeValid = true;
   }
 
-  protected int myHashCode;
-
   public void setHashCode(int myHashCode) {
     hashCodeValid = true;
-    this.myHashCode = myHashCode;
+    hashCode = myHashCode;
   }
 
   @Override
@@ -54,7 +55,15 @@ public class HiveKey extends BytesWritab
       throw new RuntimeException("Cannot get hashCode() from deserialized "
           + HiveKey.class);
     }
-    return myHashCode;
+    return hashCode;
+  }
+
+  public void setDistKeyLength(int distKeyLength) {
+    this.distKeyLength = distKeyLength;
+  }
+
+  public int getDistKeyLength() {
+    return distKeyLength;
   }
 
   /** A Comparator optimized for HiveKey. */

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java?rev=1540328&r1=1540327&r2=1540328&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java Sat Nov  9 15:22:35 2013
@@ -122,11 +122,6 @@ public class LimitPushdownOptimizer impl
         }
       }
       if (rs != null) {
-        List<List<Integer>> distincts = rs.getConf().getDistinctColumnIndices();
-        if (distincts != null && distincts.size() > 1) {
-          // multi distinct case. can not sure that it's safe just by multiplying limit value
-          return false;
-        }
         LimitOperator limit = (LimitOperator) nd;
         rs.getConf().setTopN(limit.getConf().getLimit());
         rs.getConf().setTopNMemoryUsage(((LimitPushdownContext) procCtx).threshold);

Modified: hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown.q?rev=1540328&r1=1540327&r2=1540328&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown.q Sat Nov  9 15:22:35 2013
@@ -22,12 +22,17 @@ select value,avg(key + 1) from src group
 
 -- distincts
 explain
-select distinct(key) from src limit 20;
-select distinct(key) from src limit 20;
+select distinct(cdouble) from alltypesorc limit 20;
+select distinct(cdouble) from alltypesorc limit 20;
 
 explain
-select key, count(distinct(key)) from src group by key limit 20;
-select key, count(distinct(key)) from src group by key limit 20;
+select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20;
+select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20;
+
+-- multi distinct
+explain
+select ctinyint, count(distinct(cstring1)), count(distinct(cstring2)) from alltypesorc group by ctinyint limit 20;
+select ctinyint, count(distinct(cstring1)), count(distinct(cstring2)) from alltypesorc group by ctinyint limit 20;
 
 -- limit zero
 explain

Modified: hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown_negative.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown_negative.q?rev=1540328&r1=1540327&r2=1540328&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown_negative.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown_negative.q Sat Nov  9 15:22:35 2013
@@ -16,7 +16,3 @@ CREATE TABLE dest_3(key STRING, c1 INT);
 EXPLAIN FROM src
 INSERT OVERWRITE TABLE dest_2 SELECT value, sum(key) GROUP BY value
 INSERT OVERWRITE TABLE dest_3 SELECT value, sum(key) GROUP BY value limit 20;
-
--- nagative, multi distinct
-explain
-select count(distinct key)+count(distinct value) from src limit 20;

Modified: hive/trunk/ql/src/test/results/clientpositive/limit_pushdown.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/limit_pushdown.q.out?rev=1540328&r1=1540327&r2=1540328&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/limit_pushdown.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/limit_pushdown.q.out Sat Nov  9 15:22:35 2013
@@ -392,14 +392,14 @@ val_129	130.0
 val_131	132.0
 PREHOOK: query: -- distincts
 explain
-select distinct(key) from src limit 20
+select distinct(cdouble) from alltypesorc limit 20
 PREHOOK: type: QUERY
 POSTHOOK: query: -- distincts
 explain
-select distinct(key) from src limit 20
+select distinct(cdouble) from alltypesorc limit 20
 POSTHOOK: type: QUERY
 ABSTRACT SYNTAX TREE:
-  (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECTDI (TOK_SELEXPR (TOK_TABLE_OR_COL key))) (TOK_LIMIT 20)))
+  (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME alltypesorc))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECTDI (TOK_SELEXPR (TOK_TABLE_OR_COL cdouble))) (TOK_LIMIT 20)))
 
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
@@ -409,29 +409,29 @@ STAGE PLANS:
   Stage: Stage-1
     Map Reduce
       Alias -> Map Operator Tree:
-        src 
+        alltypesorc 
           TableScan
-            alias: src
+            alias: alltypesorc
             Select Operator
               expressions:
-                    expr: key
-                    type: string
-              outputColumnNames: key
+                    expr: cdouble
+                    type: double
+              outputColumnNames: cdouble
               Group By Operator
                 bucketGroup: false
                 keys:
-                      expr: key
-                      type: string
+                      expr: cdouble
+                      type: double
                 mode: hash
                 outputColumnNames: _col0
                 Reduce Output Operator
                   key expressions:
                         expr: _col0
-                        type: string
+                        type: double
                   sort order: +
                   Map-reduce partition columns:
                         expr: _col0
-                        type: string
+                        type: double
                   tag: -1
                   TopN: 20
                   TopN Hash Memory Usage: 0.3
@@ -440,13 +440,13 @@ STAGE PLANS:
           bucketGroup: false
           keys:
                 expr: KEY._col0
-                type: string
+                type: double
           mode: mergepartial
           outputColumnNames: _col0
           Select Operator
             expressions:
                   expr: _col0
-                  type: string
+                  type: double
             outputColumnNames: _col0
             Limit
               File Output Operator
@@ -462,42 +462,42 @@ STAGE PLANS:
       limit: 20
 
 
-PREHOOK: query: select distinct(key) from src limit 20
+PREHOOK: query: select distinct(cdouble) from alltypesorc limit 20
 PREHOOK: type: QUERY
-PREHOOK: Input: default@src
+PREHOOK: Input: default@alltypesorc
 #### A masked pattern was here ####
-POSTHOOK: query: select distinct(key) from src limit 20
+POSTHOOK: query: select distinct(cdouble) from alltypesorc limit 20
 POSTHOOK: type: QUERY
-POSTHOOK: Input: default@src
+POSTHOOK: Input: default@alltypesorc
 #### A masked pattern was here ####
-0
-10
-100
-103
-104
-105
-11
-111
-113
-114
-116
-118
-119
-12
-120
-125
-126
-128
-129
-131
+NULL
+-16379.0
+-16373.0
+-16372.0
+-16369.0
+-16355.0
+-16339.0
+-16324.0
+-16311.0
+-16310.0
+-16309.0
+-16307.0
+-16306.0
+-16305.0
+-16300.0
+-16296.0
+-16280.0
+-16277.0
+-16274.0
+-16269.0
 PREHOOK: query: explain
-select key, count(distinct(key)) from src group by key limit 20
+select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20
 PREHOOK: type: QUERY
 POSTHOOK: query: explain
-select key, count(distinct(key)) from src group by key limit 20
+select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20
 POSTHOOK: type: QUERY
 ABSTRACT SYNTAX TREE:
-  (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL key)))) (TOK_GROUPBY (TOK_TABLE_OR_COL key)) (TOK_LIMIT 20)))
+  (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME alltypesorc))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL ctinyint)) (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL cdouble)))) (TOK_GROUPBY (TOK_TABLE_OR_COL ctinyint)) (TOK_LIMIT 20)))
 
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
@@ -507,36 +507,42 @@ STAGE PLANS:
   Stage: Stage-1
     Map Reduce
       Alias -> Map Operator Tree:
-        src 
+        alltypesorc 
           TableScan
-            alias: src
+            alias: alltypesorc
             Select Operator
               expressions:
-                    expr: key
-                    type: string
-              outputColumnNames: key
+                    expr: ctinyint
+                    type: tinyint
+                    expr: cdouble
+                    type: double
+              outputColumnNames: ctinyint, cdouble
               Group By Operator
                 aggregations:
-                      expr: count(DISTINCT key)
+                      expr: count(DISTINCT cdouble)
                 bucketGroup: false
                 keys:
-                      expr: key
-                      type: string
+                      expr: ctinyint
+                      type: tinyint
+                      expr: cdouble
+                      type: double
                 mode: hash
-                outputColumnNames: _col0, _col1
+                outputColumnNames: _col0, _col1, _col2
                 Reduce Output Operator
                   key expressions:
                         expr: _col0
-                        type: string
+                        type: tinyint
+                        expr: _col1
+                        type: double
                   sort order: ++
                   Map-reduce partition columns:
                         expr: _col0
-                        type: string
+                        type: tinyint
                   tag: -1
                   TopN: 20
                   TopN Hash Memory Usage: 0.3
                   value expressions:
-                        expr: _col1
+                        expr: _col2
                         type: bigint
       Reduce Operator Tree:
         Group By Operator
@@ -545,13 +551,13 @@ STAGE PLANS:
           bucketGroup: false
           keys:
                 expr: KEY._col0
-                type: string
+                type: tinyint
           mode: mergepartial
           outputColumnNames: _col0, _col1
           Select Operator
             expressions:
                   expr: _col0
-                  type: string
+                  type: tinyint
                   expr: _col1
                   type: bigint
             outputColumnNames: _col0, _col1
@@ -569,34 +575,161 @@ STAGE PLANS:
       limit: 20
 
 
-PREHOOK: query: select key, count(distinct(key)) from src group by key limit 20
+PREHOOK: query: select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20
 PREHOOK: type: QUERY
-PREHOOK: Input: default@src
+PREHOOK: Input: default@alltypesorc
 #### A masked pattern was here ####
-POSTHOOK: query: select key, count(distinct(key)) from src group by key limit 20
+POSTHOOK: query: select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20
 POSTHOOK: type: QUERY
-POSTHOOK: Input: default@src
+POSTHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+NULL	2932
+-64	24
+-63	19
+-62	27
+-61	25
+-60	27
+-59	31
+-58	23
+-57	35
+-56	36
+-55	29
+-54	26
+-53	22
+-52	33
+-51	21
+-50	30
+-49	26
+-48	29
+-47	22
+-46	24
+PREHOOK: query: -- multi distinct
+explain
+select ctinyint, count(distinct(cstring1)), count(distinct(cstring2)) from alltypesorc group by ctinyint limit 20
+PREHOOK: type: QUERY
+POSTHOOK: query: -- multi distinct
+explain
+select ctinyint, count(distinct(cstring1)), count(distinct(cstring2)) from alltypesorc group by ctinyint limit 20
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME alltypesorc))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL ctinyint)) (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL cstring1))) (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL cstring2)))) (TOK_GROUPBY (TOK_TABLE_OR_COL ctinyint)) (TOK_LIMIT 20)))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        alltypesorc 
+          TableScan
+            alias: alltypesorc
+            Select Operator
+              expressions:
+                    expr: ctinyint
+                    type: tinyint
+                    expr: cstring1
+                    type: string
+                    expr: cstring2
+                    type: string
+              outputColumnNames: ctinyint, cstring1, cstring2
+              Group By Operator
+                aggregations:
+                      expr: count(DISTINCT cstring1)
+                      expr: count(DISTINCT cstring2)
+                bucketGroup: false
+                keys:
+                      expr: ctinyint
+                      type: tinyint
+                      expr: cstring1
+                      type: string
+                      expr: cstring2
+                      type: string
+                mode: hash
+                outputColumnNames: _col0, _col1, _col2, _col3, _col4
+                Reduce Output Operator
+                  key expressions:
+                        expr: _col0
+                        type: tinyint
+                        expr: _col1
+                        type: string
+                        expr: _col2
+                        type: string
+                  sort order: +++
+                  Map-reduce partition columns:
+                        expr: _col0
+                        type: tinyint
+                  tag: -1
+                  TopN: 20
+                  TopN Hash Memory Usage: 0.3
+                  value expressions:
+                        expr: _col3
+                        type: bigint
+                        expr: _col4
+                        type: bigint
+      Reduce Operator Tree:
+        Group By Operator
+          aggregations:
+                expr: count(DISTINCT KEY._col1:0._col0)
+                expr: count(DISTINCT KEY._col1:1._col0)
+          bucketGroup: false
+          keys:
+                expr: KEY._col0
+                type: tinyint
+          mode: mergepartial
+          outputColumnNames: _col0, _col1, _col2
+          Select Operator
+            expressions:
+                  expr: _col0
+                  type: tinyint
+                  expr: _col1
+                  type: bigint
+                  expr: _col2
+                  type: bigint
+            outputColumnNames: _col0, _col1, _col2
+            Limit
+              File Output Operator
+                compressed: false
+                GlobalTableId: 0
+                table:
+                    input format: org.apache.hadoop.mapred.TextInputFormat
+                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: 20
+
+
+PREHOOK: query: select ctinyint, count(distinct(cstring1)), count(distinct(cstring2)) from alltypesorc group by ctinyint limit 20
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+POSTHOOK: query: select ctinyint, count(distinct(cstring1)), count(distinct(cstring2)) from alltypesorc group by ctinyint limit 20
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
 #### A masked pattern was here ####
-0	1
-10	1
-100	1
-103	1
-104	1
-105	1
-11	1
-111	1
-113	1
-114	1
-116	1
-118	1
-119	1
-12	1
-120	1
-125	1
-126	1
-128	1
-129	1
-131	1
+NULL	3065	3
+-64	3	13
+-63	3	16
+-62	3	23
+-61	3	25
+-60	3	25
+-59	3	27
+-58	3	24
+-57	3	23
+-56	3	22
+-55	3	21
+-54	3	21
+-53	3	17
+-52	3	21
+-51	1012	1045
+-50	3	25
+-49	3	24
+-48	3	27
+-47	3	23
+-46	3	19
 PREHOOK: query: -- limit zero
 explain
 select key,value from src order by key limit 0

Modified: hive/trunk/ql/src/test/results/clientpositive/limit_pushdown_negative.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/limit_pushdown_negative.q.out?rev=1540328&r1=1540327&r2=1540328&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/limit_pushdown_negative.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/limit_pushdown_negative.q.out Sat Nov  9 15:22:35 2013
@@ -428,84 +428,3 @@ STAGE PLANS:
     Stats-Aggr Operator
 
 
-PREHOOK: query: -- nagative, multi distinct
-explain
-select count(distinct key)+count(distinct value) from src limit 20
-PREHOOK: type: QUERY
-POSTHOOK: query: -- nagative, multi distinct
-explain
-select count(distinct key)+count(distinct value) from src limit 20
-POSTHOOK: type: QUERY
-ABSTRACT SYNTAX TREE:
-  (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (+ (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL key)) (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL value))))) (TOK_LIMIT 20)))
-
-STAGE DEPENDENCIES:
-  Stage-1 is a root stage
-  Stage-0 is a root stage
-
-STAGE PLANS:
-  Stage: Stage-1
-    Map Reduce
-      Alias -> Map Operator Tree:
-        src 
-          TableScan
-            alias: src
-            Select Operator
-              expressions:
-                    expr: key
-                    type: string
-                    expr: value
-                    type: string
-              outputColumnNames: key, value
-              Group By Operator
-                aggregations:
-                      expr: count(DISTINCT key)
-                      expr: count(DISTINCT value)
-                bucketGroup: false
-                keys:
-                      expr: key
-                      type: string
-                      expr: value
-                      type: string
-                mode: hash
-                outputColumnNames: _col0, _col1, _col2, _col3
-                Reduce Output Operator
-                  key expressions:
-                        expr: _col0
-                        type: string
-                        expr: _col1
-                        type: string
-                  sort order: ++
-                  tag: -1
-                  value expressions:
-                        expr: _col2
-                        type: bigint
-                        expr: _col3
-                        type: bigint
-      Reduce Operator Tree:
-        Group By Operator
-          aggregations:
-                expr: count(DISTINCT KEY._col0:0._col0)
-                expr: count(DISTINCT KEY._col0:1._col0)
-          bucketGroup: false
-          mode: mergepartial
-          outputColumnNames: _col0, _col1
-          Select Operator
-            expressions:
-                  expr: (_col0 + _col1)
-                  type: bigint
-            outputColumnNames: _col0
-            Limit
-              File Output Operator
-                compressed: false
-                GlobalTableId: 0
-                table:
-                    input format: org.apache.hadoop.mapred.TextInputFormat
-                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-
-  Stage: Stage-0
-    Fetch Operator
-      limit: 20
-
-

Modified: hive/trunk/ql/src/test/results/clientpositive/vectorization_limit.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/vectorization_limit.q.out?rev=1540328&r1=1540327&r2=1540328&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/vectorization_limit.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/vectorization_limit.q.out Sat Nov  9 15:22:35 2013
@@ -473,7 +473,26 @@ POSTHOOK: query: select ctinyint, count(
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@alltypesorc
 #### A masked pattern was here ####
-NULL	19
+NULL	2932
+-64	24
+-63	19
+-62	27
+-61	25
+-60	27
+-59	31
+-58	23
+-57	35
+-56	36
+-55	29
+-54	26
+-53	22
+-52	33
+-51	21
+-50	30
+-49	26
+-48	29
+-47	22
+-46	24
 PREHOOK: query: -- limit zero
 explain
 select ctinyint,cdouble from alltypesorc order by ctinyint limit 0