You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/11/01 19:10:27 UTC

svn commit: r1538012 - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/exec/ java/org/apache/hadoop/hive/ql/exec/vector/ test/queries/clientpositive/ test/results/clientpositive/

Author: hashutosh
Date: Fri Nov  1 18:10:26 2013
New Revision: 1538012

URL: http://svn.apache.org/r1538012
Log:
HIVE-5503 : TopN optimization in VectorReduceSink (Sergey Shelukhin via Ashutosh Chauhan)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
    hive/trunk/ql/src/test/queries/clientpositive/vectorization_limit.q
    hive/trunk/ql/src/test/results/clientpositive/vectorization_limit.q.out

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1538012&r1=1538011&r2=1538012&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Fri Nov  1 18:10:26 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.io.BinaryComparable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Text;
@@ -90,9 +91,8 @@ public class ReduceSinkOperator extends 
     return inputAlias;
   }
 
-  // picks topN K:V pairs from input. can be null
-  private transient TopNHash reducerHash;
-
+  // picks topN K:V pairs from input.
+  protected transient TopNHash reducerHash = new TopNHash();
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
 
@@ -136,7 +136,11 @@ public class ReduceSinkOperator extends 
           .newInstance();
       valueSerializer.initialize(null, valueTableDesc.getProperties());
 
-      reducerHash = createTopKHash();
+      int limit = conf.getTopN();
+      float memUsage = conf.getTopNMemoryUsage();
+      if (limit >= 0 && memUsage > 0) {
+        reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this);
+      }
 
       firstRow = true;
       initializeChildren(hconf);
@@ -146,26 +150,8 @@ public class ReduceSinkOperator extends 
     }
   }
 
-  private TopNHash createTopKHash() {
-    int limit = conf.getTopN();
-    float percent = conf.getTopNMemoryUsage();
-    if (limit < 0 || percent <= 0) {
-      return null;
-    }
-    if (limit == 0) {
-      return TopNHash.create0();
-    }
-    // limit * 64 : compensation of arrays for key/value/hashcodes
-    long threshold = (long) (percent * Runtime.getRuntime().maxMemory()) - limit * 64;
-    if (threshold < 0) {
-      return null;
-    }
-    return TopNHash.create(conf.isMapGroupBy(), limit, threshold, this);
-  }
-
   transient InspectableObject tempInspectableObject = new InspectableObject();
   protected transient HiveKey keyWritable = new HiveKey();
-  protected transient Writable value;
 
   transient StructObjectInspector keyObjectInspector;
   transient StructObjectInspector valueObjectInspector;
@@ -214,6 +200,7 @@ public class ReduceSinkOperator extends 
 
     if (outputColNames.size() > length) {
       // union keys
+      assert distinctColIndices != null;
       List<ObjectInspector> uois = new ArrayList<ObjectInspector>();
       for (List<Integer> distinctCols : distinctColIndices) {
         List<String> names = new ArrayList<String>();
@@ -240,6 +227,9 @@ public class ReduceSinkOperator extends 
       ObjectInspector rowInspector = inputObjInspectors[tag];
       if (firstRow) {
         firstRow = false;
+        // TODO: this is fishy - we init object inspectors based on first tag. We
+        //       should either init for each tag, or if rowInspector doesn't really
+        //       matter, then we can create this in ctor and get rid of firstRow.
         keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval,
             distinctColIndices,
             conf.getOutputKeyColumnNames(), numDistributionKeys, rowInspector);
@@ -253,32 +243,6 @@ public class ReduceSinkOperator extends 
         cachedValues = new Object[valueEval.length];
       }
 
-      // Evaluate the HashCode
-      int keyHashCode = 0;
-      if (partitionEval.length == 0) {
-        // If no partition cols, just distribute the data uniformly to provide
-        // better
-        // load balance. If the requirement is to have a single reducer, we
-        // should set
-        // the number of reducers to 1.
-        // Use a constant seed to make the code deterministic.
-        if (random == null) {
-          random = new Random(12345);
-        }
-        keyHashCode = random.nextInt();
-      } else {
-        for (int i = 0; i < partitionEval.length; i++) {
-          Object o = partitionEval[i].evaluate(row);
-          keyHashCode = keyHashCode * 31
-              + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
-        }
-      }
-
-      // Evaluate the value
-      for (int i = 0; i < valueEval.length; i++) {
-        cachedValues[i] = valueEval[i].evaluate(row);
-      }
-
       // Evaluate the keys
       for (int i = 0; i < numDistributionKeys; i++) {
         cachedKeys[0][i] = keyEval[i].evaluate(row);
@@ -303,64 +267,21 @@ public class ReduceSinkOperator extends 
         }
       }
 
-      BytesWritable value = null;
-      // Serialize the keys and append the tag
       for (int i = 0; i < cachedKeys.length; i++) {
-        if (keyIsText) {
-          Text key = (Text) keySerializer.serialize(cachedKeys[i],
-              keyObjectInspector);
-          if (tag == -1) {
-            keyWritable.set(key.getBytes(), 0, key.getLength());
-          } else {
-            int keyLength = key.getLength();
-            keyWritable.setSize(keyLength + 1);
-            System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
-            keyWritable.get()[keyLength] = tagByte[0];
-          }
-        } else {
-          // Must be BytesWritable
-          BytesWritable key = (BytesWritable) keySerializer.serialize(
-              cachedKeys[i], keyObjectInspector);
-          if (tag == -1) {
-            keyWritable.set(key.getBytes(), 0, key.getLength());
-          } else {
-            int keyLength = key.getLength();
-            keyWritable.setSize(keyLength + 1);
-            System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
-            keyWritable.get()[keyLength] = tagByte[0];
-          }
-        }
-        keyWritable.setHashCode(keyHashCode);
-
-        if (reducerHash == null) {
-          if (null != out) {
-            collect(keyWritable, value = getValue(row, value));
-          }
-       } else {
-          int index = reducerHash.indexOf(keyWritable);
-          if (index == TopNHash.EXCLUDED) {
-            continue;
-          }
-          value = getValue(row, value);
-          if (index >= 0) {
-            reducerHash.set(index, value);
-          } else {
-            if (index == TopNHash.FORWARD) {
-              collect(keyWritable, value);
-            } else if (index == TopNHash.FLUSH) {
-              LOG.info("Top-N hash is flushed");
-              reducerHash.flush();
-              // we can now retry adding key/value into hash, which is flushed.
-              // but for simplicity, just forward them
-              collect(keyWritable, value);
-            } else if (index == TopNHash.DISABLE) {
-              LOG.info("Top-N hash is disabled");
-              reducerHash.flush();
-              collect(keyWritable, value);
-              reducerHash = null;
-            }
-          }
+        // Serialize the keys and append the tag
+        Object keyObj = keySerializer.serialize(cachedKeys[i], keyObjectInspector);
+        setKeyWritable(keyIsText ? (Text)keyObj : (BytesWritable)keyObj, tag);
+        int topNIndex = reducerHash.tryStoreKey(keyWritable);
+        if (TopNHash.EXCLUDED == topNIndex) continue;
+        int keyHashCode = computeHashCode(row);
+        BytesWritable valueWritable = getValue(row);
+        if (TopNHash.FORWARD == topNIndex) {
+          keyWritable.setHashCode(keyHashCode);
+          collect(keyWritable, valueWritable);
+          continue;
         }
+        assert topNIndex >= 0;
+        reducerHash.storeValue(topNIndex, valueWritable, keyHashCode, false);
       }
     } catch (HiveException e) {
       throw e;
@@ -369,10 +290,58 @@ public class ReduceSinkOperator extends 
     }
   }
 
-  public void collect(BytesWritable key, BytesWritable value) throws IOException {
+  private int computeHashCode(Object row) throws HiveException {
+    // Evaluate the HashCode
+    int keyHashCode = 0;
+    if (partitionEval.length == 0) {
+      // If no partition cols, just distribute the data uniformly to provide
+      // better
+      // load balance. If the requirement is to have a single reducer, we
+      // should set
+      // the number of reducers to 1.
+      // Use a constant seed to make the code deterministic.
+      if (random == null) {
+        random = new Random(12345);
+      }
+      keyHashCode = random.nextInt();
+    } else {
+      for (int i = 0; i < partitionEval.length; i++) {
+        Object o = partitionEval[i].evaluate(row);
+        keyHashCode = keyHashCode * 31
+            + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
+      }
+    }
+    return keyHashCode;
+  }
+
+  protected void setKeyWritable(BinaryComparable key, int tag) {
+    if (tag == -1) {
+      keyWritable.set(key.getBytes(), 0, key.getLength());
+    } else {
+      int keyLength = key.getLength();
+      keyWritable.setSize(keyLength + 1);
+      System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
+      keyWritable.get()[keyLength] = tagByte[0];
+    }
+  }
+
+  public void collect(byte[] key, byte[] value, int hash) throws IOException {
+    HiveKey keyWritable = new HiveKey(key, hash);
+    BytesWritable valueWritable = new BytesWritable(value);
+    collect(keyWritable, valueWritable);
+  }
+
+  protected void collect(byte[] key, Writable valueWritable, int hash) throws IOException {
+    HiveKey keyWritable = new HiveKey(key, hash);
+    collect(keyWritable, valueWritable);
+  }
+
+  protected void collect(BytesWritable keyWritable, Writable valueWritable) throws IOException {
     // Since this is a terminal operator, update counters explicitly -
     // forward is not called
-    out.collect(key, value);
+    if (null != out) {
+      out.collect(keyWritable, valueWritable);
+    }
     if (++outputRows % 1000 == 0) {
       if (counterNameToEnum != null) {
         incrCounter(numOutputRowsCntr, outputRows);
@@ -382,11 +351,7 @@ public class ReduceSinkOperator extends 
     }
   }
 
-  // evaluate value lazily
-  private BytesWritable getValue(Object row, BytesWritable value) throws Exception {
-    if (value != null) {
-      return value;
-    }
+  private BytesWritable getValue(Object row) throws Exception {
     // Evaluate the value
     for (int i = 0; i < valueEval.length; i++) {
       cachedValues[i] = valueEval[i].evaluate(row);
@@ -397,16 +362,9 @@ public class ReduceSinkOperator extends 
 
   @Override
   protected void closeOp(boolean abort) throws HiveException {
-    if (!abort && reducerHash != null) {
-      try {
-        reducerHash.flush();
-      } catch (IOException e) {
-        throw new HiveException(e);
-      } finally {
-        reducerHash = null;
-      }
+    if (!abort) {
+      reducerHash.flush();
     }
-    reducerHash = null;
     super.closeOp(abort);
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java?rev=1538012&r1=1538011&r2=1538012&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java Fri Nov  1 18:10:26 2013
@@ -20,49 +20,65 @@ package org.apache.hadoop.hive.ql.exec;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
 import java.util.SortedSet;
+import java.util.TreeMap;
 import java.util.TreeSet;
 
 import com.google.common.collect.MinMaxPriorityQueue;
-import org.apache.hadoop.hive.ql.io.HiveKey;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.io.BinaryComparable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.OutputCollector;
 
 /**
  * Stores binary key/value in sorted manner to get top-n key/value
+ * TODO: rename to TopNHeap?
  */
-abstract class TopNHash {
+public class TopNHash {
+  public static Log LOG = LogFactory.getLog(TopNHash.class);
 
   /**
    * For interaction between operator and top-n hash.
    * Currently only used to forward key/values stored in hash.
    */
-  public static interface BinaryCollector extends OutputCollector<BytesWritable, BytesWritable> {
+  public static interface BinaryCollector {
+    public void collect(byte[] key, byte[] value, int hash) throws IOException;
   }
 
-  protected static final int FORWARD = -1;
-  protected static final int EXCLUDED = -2;
-  protected static final int FLUSH = -3;
-  protected static final int DISABLE = -4;
+  public static final int FORWARD = -1;
+  public static final int EXCLUDED = -2;
+  private static final int FLUSH = -3;
+  private static final int DISABLE = -4;
+  private static final int MAY_FORWARD = -5;
+
+  private BinaryCollector collector;
+  private int topN;
 
-  protected final int topN;
-  protected final BinaryCollector collector;
+  private long threshold;   // max heap size
+  private long usage;
 
-  protected final long threshold;   // max heap size
-  protected long usage;             // heap usage (not exact)
+  // binary keys, values and hashCodes of rows, lined up by index
+  private byte[][] keys;
+  private byte[][] values;
+  private int[] hashes;
+  private IndexStore indexes; // The heap over the keys, storing indexes in the array.
 
-  // binary keys, binary values and hashcodes of keys, lined up by index
-  protected final byte[][] keys;
-  protected final byte[][] values;
-  protected final int[] hashes;
+  private int evicted; // recently evicted index (used for next key/value)
+  private int excluded; // count of excluded rows from previous flush
 
-  protected int evicted;    // recetly evicted index (the biggest one. used for next key/value)
-  protected int excluded;   // count of excluded rows from previous flush
+  // temporary stuff used for vectorization
+  private int batchNumForwards = 0; // whether current batch has any forwarded keys
+  private int[] indexToBatchIndex; // mapping of index (lined up w/keys) to index in the batch
 
-  protected final Comparator<Integer> C = new Comparator<Integer>() {
+  private boolean isEnabled = false;
+
+  private final Comparator<Integer> C = new Comparator<Integer>() {
     public int compare(Integer o1, Integer o2) {
       byte[] key1 = keys[o1];
       byte[] key2 = keys[o2];
@@ -70,29 +86,201 @@ abstract class TopNHash {
     }
   };
 
-  public static TopNHash create0() {
-    return new HashForLimit0();
-  }
-
-  public static TopNHash create(boolean grouped, int topN, long threshold,
-      BinaryCollector collector) {
+  public void initialize(
+      int topN, float memUsage, boolean isMapGroupBy, BinaryCollector collector) {
+    assert topN >= 0 && memUsage > 0;
+    assert !this.isEnabled;
+    this.isEnabled = false;
+    this.topN = topN;
+    this.collector = collector;
     if (topN == 0) {
-      return new HashForLimit0();
+      isEnabled = true;
+      return; // topN == 0 will cause a short-circuit, don't need any initialization
     }
-    if (grouped) {
-      return new HashForGroup(topN, threshold, collector);
-    }
-    return new HashForRow(topN, threshold, collector);
-  }
 
-  TopNHash(int topN, long threshold, BinaryCollector collector) {
-    this.topN = topN;
-    this.threshold = threshold;
-    this.collector = collector;
+    // limit * 64 : compensation of arrays for key/value/hashcodes
+    this.threshold = (long) (memUsage * Runtime.getRuntime().maxMemory()) - topN * 64;
+    if (threshold < 0) {
+      return;
+    }
+    this.indexes = isMapGroupBy ? new HashForGroup() : new HashForRow();
     this.keys = new byte[topN + 1][];
     this.values = new byte[topN + 1][];
     this.hashes = new int[topN + 1];
     this.evicted = topN;
+    this.isEnabled = true;
+  }
+
+  /**
+   * Try store the non-vectorized key.
+   * @param key Serialized key.
+   * @return TopNHash.FORWARD if the row should be forwarded;
+   *         TopNHash.EXCLUDED if the row should be discarded;
+   *         any other number if the row is to be stored; the index should be passed to storeValue.
+   */
+  public int tryStoreKey(BytesWritable key) throws HiveException, IOException {
+    if (!isEnabled) {
+      return FORWARD; // short-circuit quickly - forward all rows
+    }
+    if (topN == 0) {
+      return EXCLUDED; // short-circuit quickly - eat all rows
+    }
+    int index = insertKeyIntoHeap(key);
+    if (index >= 0) {
+      usage += key.getLength();
+      return index;
+    }
+    // IndexStore is trying to tell us something.
+    switch (index) {
+      case DISABLE: {
+        LOG.info("Top-N hash is disabled");
+        flushInternal();
+        isEnabled = false;
+        return FORWARD;
+      }
+      case FLUSH: {
+        LOG.info("Top-N hash is flushed");
+        flushInternal();
+        // we can now retry adding key/value into hash, which is flushed.
+        // but for simplicity, just forward them
+        return FORWARD;
+      }
+      case FORWARD:  return FORWARD;
+      case EXCLUDED: return EXCLUDED; // skip the row.
+      default: {
+        assert false;
+        throw new HiveException("Invalid result trying to store the key: " + index);
+      }
+    }
+  }
+
+
+  /**
+   * Perform basic checks and initialize TopNHash for the new vectorized row batch.
+   * @return TopNHash.FORWARD if all rows should be forwarded w/o trying to call TopN;
+   *         TopNHash.EXCLUDED if all rows should be discarded w/o trying to call TopN;
+   *         any other result means the batch has been started.
+   */
+  public int startVectorizedBatch() throws IOException, HiveException {
+    if (!isEnabled) {
+      return FORWARD; // short-circuit quickly - forward all rows
+    } else if (topN == 0) {
+      return EXCLUDED; // short-circuit quickly - eat all rows
+    }
+    // Flush here if the memory usage is too high. After that, we have the entire
+    // batch already in memory anyway so we will bypass the memory checks.
+    if (usage > threshold) {
+      int excluded = this.excluded;
+      LOG.info("Top-N hash is flushing rows");
+      flushInternal();
+      if (excluded == 0) {
+        LOG.info("Top-N hash has been disabled");
+        isEnabled = false;
+        return FORWARD; // Hash is ineffective, disable.
+      }
+    }
+    if (indexToBatchIndex == null) {
+      indexToBatchIndex = new int[topN + 1]; // for current batch, contains key index in the batch
+    }
+    Arrays.fill(indexToBatchIndex, -1);
+    batchNumForwards = 0;
+    return 0;
+  }
+
+  /**
+   * Try to put the key from the current vectorized batch into the heap.
+   * @param key the key.
+   * @param batchIndex The index of the key in the vectorized batch (sequential, not .selected).
+   * @param results The results; the number of elements equivalent to vrg.size, by kindex.
+   *   The result should be the same across the calls for the batch; in then end, for each k-index:
+   *     - TopNHash.EXCLUDED - discard the row.
+   *     - positive index - store the row using storeValue, same as tryStoreRow.
+   *     - negative index - forward the row. getVectorizedKeyToForward called w/this index will
+   *        return the key to use so it doesn't have to be rebuilt.
+   */
+  public void tryStoreVectorizedKey(BytesWritable key, int batchIndex, int[] results)
+          throws HiveException, IOException {
+    // Assumption - batchIndex is increasing; startVectorizedBatch was called
+    int size = indexes.size();
+    int index = size < topN ? size : evicted;
+    keys[index] = Arrays.copyOf(key.getBytes(), key.getLength());
+    Integer collisionIndex = indexes.store(index);
+    if (null != collisionIndex) {
+      // forward conditional on the survival of the corresponding key currently in indexes.
+      ++batchNumForwards;
+      results[batchIndex] = MAY_FORWARD - collisionIndex;
+      return;
+    }
+    indexToBatchIndex[index] = batchIndex;
+    results[batchIndex] = index;
+    if (size != topN) return;
+    evicted = indexes.removeBiggest();  // remove the biggest key
+    if (index == evicted) {
+      excluded++;
+      results[batchIndex] = EXCLUDED;
+      indexToBatchIndex[index] = -1;
+      return; // input key is bigger than any of keys in hash
+    }
+    removed(evicted);
+    int evictedBatchIndex = indexToBatchIndex[evicted];
+    if (evictedBatchIndex >= 0) {
+      // reset the result for the evicted index
+      results[evictedBatchIndex] = EXCLUDED;
+      indexToBatchIndex[evicted] = -1;
+    }
+    // Also evict all results grouped with this index; cannot be current key or before it.
+    if (batchNumForwards > 0) {
+      int evictedForward = (MAY_FORWARD - evicted);
+      boolean forwardRemoved = false;
+      for (int i = evictedBatchIndex + 1; i < batchIndex; ++i) {
+        if (results[i] == evictedForward) {
+          results[i] = EXCLUDED;
+          forwardRemoved = true;
+        }
+      }
+      if (forwardRemoved) {
+        --batchNumForwards;
+      }
+    }
+  }
+
+  /**
+   * After vectorized batch is processed, can return the key that caused a particular row
+   * to be forwarded. Because the row could only be marked to forward because it has
+   * the same key with some row already in the heap (for GBY), we can use that key from the
+   * heap to emit the forwarded row.
+   * @param index Negative index from the vectorized result. See tryStoreVectorizedKey.
+   * @return The key corresponding to the row.
+   */
+  public byte[] getVectorizedKeyToForward(int index) {
+    assert index <= MAY_FORWARD;
+    return keys[MAY_FORWARD - index];
+  }
+
+  /**
+   * Stores the value for the key in the heap.
+   * @param index The index, either from tryStoreKey or from tryStoreVectorizedKey result.
+   * @param value The value to store.
+   * @param keyHash The key hash to store.
+   * @param vectorized Whether the result is coming from a vectorized batch.
+   */
+  public void storeValue(int index, BytesWritable value, int keyHash, boolean vectorized) {
+    values[index] = Arrays.copyOf(value.getBytes(), value.getLength());
+    hashes[index] = keyHash;
+    // Vectorized doesn't adjust usage for the keys while processing the batch
+    usage += values[index].length + (vectorized ? keys[index].length : 0);
+  }
+
+  /**
+   * Flushes all the rows cached in the heap.
+   */
+  public void flush() throws HiveException {
+    if (!isEnabled || (topN == 0)) return;
+    try {
+      flushInternal();
+    } catch (IOException ex) {
+      throw new HiveException(ex);
+    }
   }
 
   /**
@@ -104,15 +292,14 @@ abstract class TopNHash {
    * -3 for FLUSH     : memory is not enough. flush values (keep keys only)
    * -4 for DISABLE   : hash is not effective. flush and disable it
    */
-  public int indexOf(HiveKey key) {
-    int size = size();
+  private int insertKeyIntoHeap(BinaryComparable key) {
     if (usage > threshold) {
       return excluded == 0 ? DISABLE : FLUSH;
     }
+    int size = indexes.size();
     int index = size < topN ? size : evicted;
     keys[index] = Arrays.copyOf(key.getBytes(), key.getLength());
-    hashes[index] = key.hashCode();
-    if (!store(index)) {
+    if (null != indexes.store(index)) {
       // it's only for GBY which should forward all values associated with the key in the range
       // of limit. new value should be attatched with the key but in current implementation,
       // only one values is allowed. with map-aggreagtion which is true by default,
@@ -120,7 +307,7 @@ abstract class TopNHash {
       return FORWARD;
     }
     if (size == topN) {
-      evicted = removeBiggest();  // remove the biggest key
+      evicted = indexes.removeBiggest();  // remove the biggest key
       if (index == evicted) {
         excluded++;
         return EXCLUDED;          // input key is bigger than any of keys in hash
@@ -130,130 +317,93 @@ abstract class TopNHash {
     return index;
   }
 
-  protected abstract int size();
-
-  protected abstract boolean store(int index);
-
-  protected abstract int removeBiggest();
-
-  protected abstract Iterable<Integer> indexes();
-
   // key/value of the index is removed. retrieve memory usage
-  public void removed(int index) {
+  private void removed(int index) {
     usage -= keys[index].length;
     keys[index] = null;
     if (values[index] != null) {
-      // value can be null if hash is flushed, which only keeps keys for limiting rows
       usage -= values[index].length;
       values[index] = null;
     }
     hashes[index] = -1;
   }
 
-  public void set(int index, BytesWritable value) {
-    values[index] = Arrays.copyOf(value.getBytes(), value.getLength());
-    usage += keys[index].length + values[index].length;
-  }
-
-  public void flush() throws IOException {
-    for (int index : indexes()) {
-      flush(index);
+  private void flushInternal() throws IOException, HiveException {
+    for (int index : indexes.indexes()) {
+      if (index != evicted && values[index] != null) {
+        collector.collect(keys[index], values[index], hashes[index]);
+        usage -= values[index].length;
+        values[index] = null;
+        hashes[index] = -1;
+      }
     }
     excluded = 0;
   }
 
-  protected void flush(int index) throws IOException {
-    if (index != evicted && values[index] != null) {
-      // BytesWritable copies array for set method. So just creats new one
-      HiveKey keyWritable = new HiveKey(keys[index], hashes[index]);
-      BytesWritable valueWritable = new BytesWritable(values[index]);
-      collector.collect(keyWritable, valueWritable);
-      usage -= values[index].length;
-      values[index] = null;
-    }
-  }
-}
-
-/**
- * for order by, same keys are counted (For 1-2-2-3-4, limit 3 is 1-2-2)
- * MinMaxPriorityQueue is used because it alows duplication and fast access to biggest one
- */
-class HashForRow extends TopNHash {
-
-  private final MinMaxPriorityQueue<Integer> indexes;
-
-  HashForRow(int topN, long threshold, BinaryCollector collector) {
-    super(topN, threshold, collector);
-    this.indexes = MinMaxPriorityQueue.orderedBy(C).create();
-  }
-
-  protected int size() {
-    return indexes.size();
-  }
-
-  // returns true always
-  protected boolean store(int index) {
-    return indexes.add(index);
+  private interface IndexStore {
+    int size();
+    /**
+     * @return the index which caused the item to be rejected; or null if accepted
+     */
+    Integer store(int index);
+    int removeBiggest();
+    Iterable<Integer> indexes();
   }
 
-  protected int removeBiggest() {
-    return indexes.removeLast();
-  }
-
-  protected Iterable<Integer> indexes() {
-    Integer[] array = indexes.toArray(new Integer[indexes.size()]);
-    Arrays.sort(array, 0, array.length, C);
-    return Arrays.asList(array);
-  }
-}
-
-/**
- * for group by, same keys are not counted (For 1-2-2-3-4, limit 3 is 1-2-(2)-3)
- * simple TreeMap is used because group by does not need keep duplicated keys
- */
-class HashForGroup extends TopNHash {
+  /**
+   * for order by, same keys are counted (For 1-2-2-3-4, limit 3 is 1-2-2)
+   * MinMaxPriorityQueue is used because it alows duplication and fast access to biggest one
+   */
+  private class HashForRow implements IndexStore {
+    private final MinMaxPriorityQueue<Integer> indexes = MinMaxPriorityQueue.orderedBy(C).create();
 
-  private final SortedSet<Integer> indexes;
+    public int size() {
+      return indexes.size();
+    }
 
-  HashForGroup(int topN, long threshold, BinaryCollector collector) {
-    super(topN, threshold, collector);
-    this.indexes = new TreeSet<Integer>(C);
-  }
+    // returns null always
+    public Integer store(int index) {
+      boolean result = indexes.add(index);
+      assert result;
+      return null;
+    }
 
-  protected int size() {
-    return indexes.size();
-  }
+    public int removeBiggest() {
+      return indexes.removeLast();
+    }
 
-  // returns false if index already exists in map
-  protected boolean store(int index) {
-    return indexes.add(index);
+    public Iterable<Integer> indexes() {
+      Integer[] array = indexes.toArray(new Integer[indexes.size()]);
+      Arrays.sort(array, 0, array.length, C);
+      return Arrays.asList(array);
+    }
   }
 
-  protected int removeBiggest() {
-    Integer last = indexes.last();
-    indexes.remove(last);
-    return last;
-  }
+  /**
+   * for group by, same keys are not counted (For 1-2-2-3-4, limit 3 is 1-2-(2)-3)
+   * simple TreeMap is used because group by does not need keep duplicated keys
+   */
+  private class HashForGroup implements IndexStore {
+    // TreeSet anyway uses TreeMap; so use plain TreeMap to be able to get value in collisions.
+    private final TreeMap<Integer, Integer> indexes = new TreeMap<Integer, Integer>(C);
 
-  protected Iterable<Integer> indexes() {
-    return indexes;
-  }
-}
+    public int size() {
+      return indexes.size();
+    }
 
-class HashForLimit0 extends TopNHash {
+    // returns false if index already exists in map
+    public Integer store(int index) {
+      return indexes.put(index, index);
+    }
 
-  HashForLimit0() {
-    super(0, 0, null);
-  }
+    public int removeBiggest() {
+      Integer last = indexes.lastKey();
+      indexes.remove(last);
+      return last;
+    }
 
-  @Override
-  public int indexOf(HiveKey key) {
-    return EXCLUDED;
+    public Iterable<Integer> indexes() {
+      return indexes.keySet();
+    }
   }
-
-  protected int size() { return 0; }
-  protected boolean store(int index) { return false; }
-  protected int removeBiggest() { return 0; }
-  protected Iterable<Integer> indexes() { return Collections.emptyList(); }
 }
-

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java?rev=1538012&r1=1538011&r2=1538012&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java Fri Nov  1 18:10:26 2013
@@ -19,12 +19,14 @@
 package org.apache.hadoop.hive.ql.exec.vector;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TopNHash;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
@@ -39,6 +41,7 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 
 public class VectorReduceSinkOperator extends ReduceSinkOperator {
 
@@ -51,42 +54,44 @@ public class VectorReduceSinkOperator ex
    * The evaluators for the key columns. Key columns decide the sort order on
    * the reducer side. Key columns are passed to the reducer in the "key".
    */
-  protected VectorExpression[] keyEval;
+  private VectorExpression[] keyEval;
 
   /**
    * The key value writers. These know how to write the necessary writable type
    * based on key column metadata, from the primitive vector type.
    */
-  protected transient VectorExpressionWriter[] keyWriters;
+  private transient VectorExpressionWriter[] keyWriters;
 
   /**
    * The evaluators for the value columns. Value columns are passed to reducer
    * in the "value".
    */
-  protected VectorExpression[] valueEval;
+  private VectorExpression[] valueEval;
 
   /**
    * The output value writers. These know how to write the necessary writable type
    * based on value column metadata, from the primitive vector type.
    */
-  protected transient VectorExpressionWriter[] valueWriters;
+  private transient VectorExpressionWriter[] valueWriters;
 
   /**
    * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in
    * Hive language). Partition columns decide the reducer that the current row
    * goes to. Partition columns are not passed to reducer.
    */
-  protected VectorExpression[] partitionEval;
+  private VectorExpression[] partitionEval;
 
   /**
    * The partition value writers. These know how to write the necessary writable type
    * based on partition column metadata, from the primitive vector type.
    */
-  protected transient VectorExpressionWriter[] partitionWriters;
+  private transient VectorExpressionWriter[] partitionWriters;
 
-  transient ObjectInspector keyObjectInspector;
-  transient ObjectInspector valueObjectInspector;
-  transient int [] keyHashCode = new int [VectorizedRowBatch.DEFAULT_SIZE];
+  private transient ObjectInspector keyObjectInspector;
+  private transient ObjectInspector valueObjectInspector;
+  private transient int [] keyHashCode = new int [VectorizedRowBatch.DEFAULT_SIZE];
+
+  private transient int[] hashResult; // the pre-created array for reducerHash results
 
   public VectorReduceSinkOperator(VectorizationContext vContext, OperatorDesc conf)
       throws HiveException {
@@ -183,6 +188,11 @@ public class VectorReduceSinkOperator ex
       tagByte[0] = (byte) tag;
       LOG.info("Using tag = " + tag);
 
+      int limit = conf.getTopN();
+      float memUsage = conf.getTopNMemoryUsage();
+      if (limit >= 0 && memUsage > 0) {
+        reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this);
+      }
     } catch(Exception e) {
       throw new HiveException(e);
     }
@@ -215,21 +225,22 @@ public class VectorReduceSinkOperator ex
 
       Object[] distributionKeys = new Object[numDistributionKeys];
 
-      // Emit a (k,v) pair for each row in the batch
-      //
+      // Determine which rows we need to emit based on topN optimization
+      int startResult = reducerHash.startVectorizedBatch();
+      if (startResult == TopNHash.EXCLUDED) {
+        return; // TopN wants us to exclude all rows.
+      }
+      boolean useTopN = startResult != TopNHash.FORWARD;
+      if (useTopN && (hashResult == null || hashResult.length < vrg.size)) {
+        hashResult = new int[Math.max(vrg.size, VectorizedRowBatch.DEFAULT_SIZE)];
+      }
+
       for (int j = 0 ; j < vrg.size; ++j) {
         int rowIndex = j;
         if (vrg.selectedInUse) {
           rowIndex = vrg.selected[j];
         }
-        for (int i = 0; i < valueEval.length; i++) {
-          int batchColumn = valueEval[i].getOutputColumn();
-          ColumnVector vectorColumn = vrg.cols[batchColumn];
-          cachedValues[i] = valueWriters[i].writeValue(vectorColumn, rowIndex);
-        }
-        // Serialize the value
-        value = valueSerializer.serialize(cachedValues, valueObjectInspector);
-
+        // First, evaluate the key - the way things stand we'd need it regardless.
         for (int i = 0; i < keyEval.length; i++) {
           int batchColumn = keyEval[i].getOutputColumn();
           ColumnVector vectorColumn = vrg.cols[batchColumn];
@@ -237,69 +248,42 @@ public class VectorReduceSinkOperator ex
         }
         // no distinct key
         System.arraycopy(distributionKeys, 0, cachedKeys[0], 0, numDistributionKeys);
-        // Serialize the keys and append the tag
+        // TopN is not supported for multi-distinct currently. If we have more cachedKeys
+        // than one for every input key horrible things will happen (OOB error on array likely).
+        assert !useTopN || cachedKeys.length <= 1;
         for (int i = 0; i < cachedKeys.length; i++) {
-          if (keyIsText) {
-            Text key = (Text) keySerializer.serialize(cachedKeys[i],
-                keyObjectInspector);
-            if (tag == -1) {
-              keyWritable.set(key.getBytes(), 0, key.getLength());
-            } else {
-              int keyLength = key.getLength();
-              keyWritable.setSize(keyLength + 1);
-              System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
-              keyWritable.get()[keyLength] = tagByte[0];
-            }
-          } else {
-            // Must be BytesWritable
-            BytesWritable key = (BytesWritable) keySerializer.serialize(
-                cachedKeys[i], keyObjectInspector);
-            if (tag == -1) {
-              keyWritable.set(key.getBytes(), 0, key.getLength());
-            } else {
-              int keyLength = key.getLength();
-              keyWritable.setSize(keyLength + 1);
-              System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
-              keyWritable.get()[keyLength] = tagByte[0];
-            }
-          }
-          // Evaluate the HashCode
-          int keyHashCode = 0;
-          if (partitionEval.length == 0) {
-            // If no partition cols, just distribute the data uniformly to provide
-            // better
-            // load balance. If the requirement is to have a single reducer, we
-            // should set
-            // the number of reducers to 1.
-            // Use a constant seed to make the code deterministic.
-            if (random == null) {
-              random = new Random(12345);
-            }
-            keyHashCode = random.nextInt();
+          // Serialize the keys and append the tag.
+          Object keyObj = keySerializer.serialize(cachedKeys[i], keyObjectInspector);
+          setKeyWritable(keyIsText ? (Text)keyObj : (BytesWritable)keyObj, tag);
+          if (useTopN) {
+            reducerHash.tryStoreVectorizedKey(keyWritable, j, hashResult);
           } else {
-            for (int p = 0; p < partitionEval.length; p++) {
-              ColumnVector columnVector = vrg.cols[partitionEval[p].getOutputColumn()];
-              Object partitionValue = partitionWriters[p].writeValue(columnVector, rowIndex);
-              keyHashCode = keyHashCode
-                  * 31
-                  + ObjectInspectorUtils.hashCode(
-                      partitionValue,
-                      partitionWriters[p].getObjectInspector());
-            }
-          }
-          keyWritable.setHashCode(keyHashCode);
-          if (out != null) {
-            out.collect(keyWritable, value);
-            // Since this is a terminal operator, update counters explicitly -
-            // forward is not called
-            if (counterNameToEnum != null) {
-              ++outputRows;
-              if (outputRows % 1000 == 0) {
-                incrCounter(numOutputRowsCntr, outputRows);
-                outputRows = 0;
-              }
-            }
-          }
+            // No TopN, just forward the key
+            keyWritable.setHashCode(computeHashCode(vrg, rowIndex));
+            collect(keyWritable, makeValueWritable(vrg, rowIndex));
+           }
+        }
+      }
+
+      if (!useTopN) return; // All done.
+
+      // If we use topN, we have called tryStore on every key now. We can process the results.
+      for (int j = 0 ; j < vrg.size; ++j) {
+        int index = hashResult[j];
+        if (index == TopNHash.EXCLUDED) continue;
+        int rowIndex = j;
+        if (vrg.selectedInUse) {
+          rowIndex = vrg.selected[j];
+        }
+        // Compute everything now - we'd either store it, or forward it.
+        int hashCode = computeHashCode(vrg, rowIndex);
+        BytesWritable value = makeValueWritable(vrg, rowIndex);
+        if (index < 0) {
+          // Kinda hacky; see getVectorizedKeyToForward javadoc.
+          byte[] key = reducerHash.getVectorizedKeyToForward(index);
+          collect(key, value, hashCode);
+        } else {
+          reducerHash.storeValue(index, value, hashCode, true);
         }
       }
     } catch (SerDeException e) {
@@ -309,6 +293,45 @@ public class VectorReduceSinkOperator ex
     }
   }
 
+  private BytesWritable makeValueWritable(VectorizedRowBatch vrg, int rowIndex)
+      throws HiveException, SerDeException {
+    for (int i = 0; i < valueEval.length; i++) {
+      int batchColumn = valueEval[i].getOutputColumn();
+      ColumnVector vectorColumn = vrg.cols[batchColumn];
+      cachedValues[i] = valueWriters[i].writeValue(vectorColumn, rowIndex);
+    }
+    // Serialize the value
+    return (BytesWritable)valueSerializer.serialize(cachedValues, valueObjectInspector);
+  }
+
+  private int computeHashCode(VectorizedRowBatch vrg, int rowIndex) throws HiveException {
+    // Evaluate the HashCode
+    int keyHashCode = 0;
+    if (partitionEval.length == 0) {
+      // If no partition cols, just distribute the data uniformly to provide
+      // better
+      // load balance. If the requirement is to have a single reducer, we
+      // should set
+      // the number of reducers to 1.
+      // Use a constant seed to make the code deterministic.
+      if (random == null) {
+        random = new Random(12345);
+      }
+      keyHashCode = random.nextInt();
+    } else {
+      for (int p = 0; p < partitionEval.length; p++) {
+        ColumnVector columnVector = vrg.cols[partitionEval[p].getOutputColumn()];
+        Object partitionValue = partitionWriters[p].writeValue(columnVector, rowIndex);
+        keyHashCode = keyHashCode
+            * 31
+            + ObjectInspectorUtils.hashCode(
+                partitionValue,
+                partitionWriters[p].getObjectInspector());
+      }
+    }
+    return keyHashCode;
+  }
+
   static public String getOperatorName() {
     return "RS";
   }

Modified: hive/trunk/ql/src/test/queries/clientpositive/vectorization_limit.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/vectorization_limit.q?rev=1538012&r1=1538011&r2=1538012&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/vectorization_limit.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/vectorization_limit.q Fri Nov  1 18:10:26 2013
@@ -1,3 +1,37 @@
 SET hive.vectorized.execution.enabled=true;
 explain SELECT cbigint, cdouble FROM alltypesorc WHERE cbigint < cdouble and cint > 0 limit 7;
 SELECT cbigint, cdouble FROM alltypesorc WHERE cbigint < cdouble and cint > 0 limit 7;
+
+set hive.optimize.reducededuplication.min.reducer=1;
+set hive.limit.pushdown.memory.usage=0.3f;
+
+-- HIVE-3562 Some limit can be pushed down to map stage - c/p parts from limit_pushdown
+
+explain
+select ctinyint,cdouble,csmallint from alltypesorc where ctinyint is not null order by ctinyint,cdouble limit 20;
+select ctinyint,cdouble,csmallint from alltypesorc where ctinyint is not null order by ctinyint,cdouble limit 20;
+
+-- deduped RS
+explain
+select ctinyint,avg(cdouble + 1) from alltypesorc group by ctinyint order by ctinyint limit 20;
+select ctinyint,avg(cdouble + 1) from alltypesorc group by ctinyint order by ctinyint limit 20;
+
+-- distincts
+explain
+select distinct(ctinyint) from alltypesorc limit 20;
+select distinct(ctinyint) from alltypesorc limit 20;
+
+explain
+select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20;
+select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20;
+
+-- limit zero
+explain
+select ctinyint,cdouble from alltypesorc order by ctinyint limit 0;
+select ctinyint,cdouble from alltypesorc order by ctinyint limit 0;
+
+-- 2MR (applied to last RS)
+explain
+select cdouble, sum(ctinyint) as sum from alltypesorc where ctinyint is not null group by cdouble order by sum, cdouble limit 20;
+select cdouble, sum(ctinyint) as sum from alltypesorc where ctinyint is not null group by cdouble order by sum, cdouble limit 20;
+

Modified: hive/trunk/ql/src/test/results/clientpositive/vectorization_limit.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/vectorization_limit.q.out?rev=1538012&r1=1538011&r2=1538012&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/vectorization_limit.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/vectorization_limit.q.out Fri Nov  1 18:10:26 2013
@@ -62,3 +62,628 @@ POSTHOOK: Input: default@alltypesorc
 -1887561756	-8881.0
 -1887561756	-2281.0
 -1887561756	9531.0
+PREHOOK: query: -- HIVE-3562 Some limit can be pushed down to map stage - c/p parts from limit_pushdown
+
+explain
+select ctinyint,cdouble,csmallint from alltypesorc where ctinyint is not null order by ctinyint,cdouble limit 20
+PREHOOK: type: QUERY
+POSTHOOK: query: -- HIVE-3562 Some limit can be pushed down to map stage - c/p parts from limit_pushdown
+
+explain
+select ctinyint,cdouble,csmallint from alltypesorc where ctinyint is not null order by ctinyint,cdouble limit 20
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME alltypesorc))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL ctinyint)) (TOK_SELEXPR (TOK_TABLE_OR_COL cdouble)) (TOK_SELEXPR (TOK_TABLE_OR_COL csmallint))) (TOK_WHERE (TOK_FUNCTION TOK_ISNOTNULL (TOK_TABLE_OR_COL ctinyint))) (TOK_ORDERBY (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL ctinyint)) (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL cdouble))) (TOK_LIMIT 20)))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        alltypesorc 
+          TableScan
+            alias: alltypesorc
+            Filter Operator
+              predicate:
+                  expr: ctinyint is not null
+                  type: boolean
+              Vectorized execution: true
+              Select Operator
+                expressions:
+                      expr: ctinyint
+                      type: tinyint
+                      expr: cdouble
+                      type: double
+                      expr: csmallint
+                      type: smallint
+                outputColumnNames: _col0, _col1, _col2
+                Vectorized execution: true
+                Reduce Output Operator
+                  key expressions:
+                        expr: _col0
+                        type: tinyint
+                        expr: _col1
+                        type: double
+                  sort order: ++
+                  tag: -1
+                  TopN: 20
+                  TopN Hash Memory Usage: 0.3
+                  value expressions:
+                        expr: _col0
+                        type: tinyint
+                        expr: _col1
+                        type: double
+                        expr: _col2
+                        type: smallint
+                  Vectorized execution: true
+      Reduce Operator Tree:
+        Extract
+          Limit
+            File Output Operator
+              compressed: false
+              GlobalTableId: 0
+              table:
+                  input format: org.apache.hadoop.mapred.TextInputFormat
+                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: 20
+
+
+PREHOOK: query: select ctinyint,cdouble,csmallint from alltypesorc where ctinyint is not null order by ctinyint,cdouble limit 20
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+POSTHOOK: query: select ctinyint,cdouble,csmallint from alltypesorc where ctinyint is not null order by ctinyint,cdouble limit 20
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+-64	-15920.0	-15920
+-64	-10462.0	-10462
+-64	-9842.0	-9842
+-64	-8080.0	-8080
+-64	-7196.0	-7196
+-64	-7196.0	-7196
+-64	-7196.0	-7196
+-64	-7196.0	-7196
+-64	-7196.0	-7196
+-64	-7196.0	-7196
+-64	-7196.0	-7196
+-64	-6907.0	-6907
+-64	-4803.0	-4803
+-64	-4040.0	-4040
+-64	-4018.0	-4018
+-64	-3586.0	-3586
+-64	-3097.0	-3097
+-64	-2919.0	-2919
+-64	-1600.0	-1600
+-64	-200.0	-200
+PREHOOK: query: -- deduped RS
+explain
+select ctinyint,avg(cdouble + 1) from alltypesorc group by ctinyint order by ctinyint limit 20
+PREHOOK: type: QUERY
+POSTHOOK: query: -- deduped RS
+explain
+select ctinyint,avg(cdouble + 1) from alltypesorc group by ctinyint order by ctinyint limit 20
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME alltypesorc))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL ctinyint)) (TOK_SELEXPR (TOK_FUNCTION avg (+ (TOK_TABLE_OR_COL cdouble) 1)))) (TOK_GROUPBY (TOK_TABLE_OR_COL ctinyint)) (TOK_ORDERBY (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL ctinyint))) (TOK_LIMIT 20)))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        alltypesorc 
+          TableScan
+            alias: alltypesorc
+            Select Operator
+              expressions:
+                    expr: ctinyint
+                    type: tinyint
+                    expr: cdouble
+                    type: double
+              outputColumnNames: ctinyint, cdouble
+              Vectorized execution: true
+              Group By Operator
+                aggregations:
+                      expr: avg((cdouble + 1))
+                bucketGroup: false
+                keys:
+                      expr: ctinyint
+                      type: tinyint
+                mode: hash
+                outputColumnNames: _col0, _col1
+                Vectorized execution: true
+                Reduce Output Operator
+                  key expressions:
+                        expr: _col0
+                        type: tinyint
+                  sort order: +
+                  Map-reduce partition columns:
+                        expr: _col0
+                        type: tinyint
+                  tag: -1
+                  TopN: 20
+                  TopN Hash Memory Usage: 0.3
+                  value expressions:
+                        expr: _col1
+                        type: struct<count:bigint,sum:double>
+      Reduce Operator Tree:
+        Group By Operator
+          aggregations:
+                expr: avg(VALUE._col0)
+          bucketGroup: false
+          keys:
+                expr: KEY._col0
+                type: tinyint
+          mode: mergepartial
+          outputColumnNames: _col0, _col1
+          Select Operator
+            expressions:
+                  expr: _col0
+                  type: tinyint
+                  expr: _col1
+                  type: double
+            outputColumnNames: _col0, _col1
+            Limit
+              File Output Operator
+                compressed: false
+                GlobalTableId: 0
+                table:
+                    input format: org.apache.hadoop.mapred.TextInputFormat
+                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: 20
+
+
+PREHOOK: query: select ctinyint,avg(cdouble + 1) from alltypesorc group by ctinyint order by ctinyint limit 20
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+POSTHOOK: query: select ctinyint,avg(cdouble + 1) from alltypesorc group by ctinyint order by ctinyint limit 20
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+NULL	9370.0945309795
+-64	373.52941176470586
+-63	2178.7272727272725
+-62	245.69387755102042
+-61	914.3404255319149
+-60	1071.82
+-59	318.27272727272725
+-58	3483.2444444444445
+-57	1867.0535714285713
+-56	2595.818181818182
+-55	2385.595744680851
+-54	2712.7272727272725
+-53	-532.7567567567568
+-52	2810.705882352941
+-51	-96.46341463414635
+-50	-960.0192307692307
+-49	768.7659574468086
+-48	1672.909090909091
+-47	-574.6428571428571
+-46	3033.55
+PREHOOK: query: -- distincts
+explain
+select distinct(ctinyint) from alltypesorc limit 20
+PREHOOK: type: QUERY
+POSTHOOK: query: -- distincts
+explain
+select distinct(ctinyint) from alltypesorc limit 20
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME alltypesorc))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECTDI (TOK_SELEXPR (TOK_TABLE_OR_COL ctinyint))) (TOK_LIMIT 20)))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        alltypesorc 
+          TableScan
+            alias: alltypesorc
+            Select Operator
+              expressions:
+                    expr: ctinyint
+                    type: tinyint
+              outputColumnNames: ctinyint
+              Vectorized execution: true
+              Group By Operator
+                bucketGroup: false
+                keys:
+                      expr: ctinyint
+                      type: tinyint
+                mode: hash
+                outputColumnNames: _col0
+                Vectorized execution: true
+                Reduce Output Operator
+                  key expressions:
+                        expr: _col0
+                        type: tinyint
+                  sort order: +
+                  Map-reduce partition columns:
+                        expr: _col0
+                        type: tinyint
+                  tag: -1
+                  TopN: 20
+                  TopN Hash Memory Usage: 0.3
+      Reduce Operator Tree:
+        Group By Operator
+          bucketGroup: false
+          keys:
+                expr: KEY._col0
+                type: tinyint
+          mode: mergepartial
+          outputColumnNames: _col0
+          Select Operator
+            expressions:
+                  expr: _col0
+                  type: tinyint
+            outputColumnNames: _col0
+            Limit
+              File Output Operator
+                compressed: false
+                GlobalTableId: 0
+                table:
+                    input format: org.apache.hadoop.mapred.TextInputFormat
+                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: 20
+
+
+PREHOOK: query: select distinct(ctinyint) from alltypesorc limit 20
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+POSTHOOK: query: select distinct(ctinyint) from alltypesorc limit 20
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+NULL
+-64
+-63
+-62
+-61
+-60
+-59
+-58
+-57
+-56
+-55
+-54
+-53
+-52
+-51
+-50
+-49
+-48
+-47
+-46
+PREHOOK: query: explain
+select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME alltypesorc))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL ctinyint)) (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL cdouble)))) (TOK_GROUPBY (TOK_TABLE_OR_COL ctinyint)) (TOK_LIMIT 20)))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        alltypesorc 
+          TableScan
+            alias: alltypesorc
+            Select Operator
+              expressions:
+                    expr: ctinyint
+                    type: tinyint
+                    expr: cdouble
+                    type: double
+              outputColumnNames: ctinyint, cdouble
+              Vectorized execution: true
+              Group By Operator
+                aggregations:
+                      expr: count(DISTINCT cdouble)
+                bucketGroup: false
+                keys:
+                      expr: ctinyint
+                      type: tinyint
+                      expr: cdouble
+                      type: double
+                mode: hash
+                outputColumnNames: _col0, _col1, _col2
+                Vectorized execution: true
+                Reduce Output Operator
+                  key expressions:
+                        expr: _col0
+                        type: tinyint
+                        expr: _col1
+                        type: double
+                  sort order: ++
+                  Map-reduce partition columns:
+                        expr: _col0
+                        type: tinyint
+                  tag: -1
+                  TopN: 20
+                  TopN Hash Memory Usage: 0.3
+                  value expressions:
+                        expr: _col2
+                        type: bigint
+      Reduce Operator Tree:
+        Group By Operator
+          aggregations:
+                expr: count(DISTINCT KEY._col1:0._col0)
+          bucketGroup: false
+          keys:
+                expr: KEY._col0
+                type: tinyint
+          mode: mergepartial
+          outputColumnNames: _col0, _col1
+          Select Operator
+            expressions:
+                  expr: _col0
+                  type: tinyint
+                  expr: _col1
+                  type: bigint
+            outputColumnNames: _col0, _col1
+            Limit
+              File Output Operator
+                compressed: false
+                GlobalTableId: 0
+                table:
+                    input format: org.apache.hadoop.mapred.TextInputFormat
+                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: 20
+
+
+PREHOOK: query: select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+POSTHOOK: query: select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+NULL	19
+PREHOOK: query: -- limit zero
+explain
+select ctinyint,cdouble from alltypesorc order by ctinyint limit 0
+PREHOOK: type: QUERY
+POSTHOOK: query: -- limit zero
+explain
+select ctinyint,cdouble from alltypesorc order by ctinyint limit 0
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME alltypesorc))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL ctinyint)) (TOK_SELEXPR (TOK_TABLE_OR_COL cdouble))) (TOK_ORDERBY (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL ctinyint))) (TOK_LIMIT 0)))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        alltypesorc 
+          TableScan
+            alias: alltypesorc
+            Select Operator
+              expressions:
+                    expr: ctinyint
+                    type: tinyint
+                    expr: cdouble
+                    type: double
+              outputColumnNames: _col0, _col1
+              Vectorized execution: true
+              Reduce Output Operator
+                key expressions:
+                      expr: _col0
+                      type: tinyint
+                sort order: +
+                tag: -1
+                value expressions:
+                      expr: _col0
+                      type: tinyint
+                      expr: _col1
+                      type: double
+                Vectorized execution: true
+      Reduce Operator Tree:
+        Extract
+          Limit
+            File Output Operator
+              compressed: false
+              GlobalTableId: 0
+              table:
+                  input format: org.apache.hadoop.mapred.TextInputFormat
+                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: 0
+
+
+PREHOOK: query: select ctinyint,cdouble from alltypesorc order by ctinyint limit 0
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+POSTHOOK: query: select ctinyint,cdouble from alltypesorc order by ctinyint limit 0
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+PREHOOK: query: -- 2MR (applied to last RS)
+explain
+select cdouble, sum(ctinyint) as sum from alltypesorc where ctinyint is not null group by cdouble order by sum, cdouble limit 20
+PREHOOK: type: QUERY
+POSTHOOK: query: -- 2MR (applied to last RS)
+explain
+select cdouble, sum(ctinyint) as sum from alltypesorc where ctinyint is not null group by cdouble order by sum, cdouble limit 20
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME alltypesorc))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL cdouble)) (TOK_SELEXPR (TOK_FUNCTION sum (TOK_TABLE_OR_COL ctinyint)) sum)) (TOK_WHERE (TOK_FUNCTION TOK_ISNOTNULL (TOK_TABLE_OR_COL ctinyint))) (TOK_GROUPBY (TOK_TABLE_OR_COL cdouble)) (TOK_ORDERBY (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL sum)) (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL cdouble))) (TOK_LIMIT 20)))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-2 depends on stages: Stage-1
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        alltypesorc 
+          TableScan
+            alias: alltypesorc
+            Filter Operator
+              predicate:
+                  expr: ctinyint is not null
+                  type: boolean
+              Vectorized execution: true
+              Select Operator
+                expressions:
+                      expr: cdouble
+                      type: double
+                      expr: ctinyint
+                      type: tinyint
+                outputColumnNames: cdouble, ctinyint
+                Vectorized execution: true
+                Group By Operator
+                  aggregations:
+                        expr: sum(ctinyint)
+                  bucketGroup: false
+                  keys:
+                        expr: cdouble
+                        type: double
+                  mode: hash
+                  outputColumnNames: _col0, _col1
+                  Vectorized execution: true
+                  Reduce Output Operator
+                    key expressions:
+                          expr: _col0
+                          type: double
+                    sort order: +
+                    Map-reduce partition columns:
+                          expr: _col0
+                          type: double
+                    tag: -1
+                    value expressions:
+                          expr: _col1
+                          type: bigint
+      Reduce Operator Tree:
+        Group By Operator
+          aggregations:
+                expr: sum(VALUE._col0)
+          bucketGroup: false
+          keys:
+                expr: KEY._col0
+                type: double
+          mode: mergepartial
+          outputColumnNames: _col0, _col1
+          Select Operator
+            expressions:
+                  expr: _col0
+                  type: double
+                  expr: _col1
+                  type: bigint
+            outputColumnNames: _col0, _col1
+            File Output Operator
+              compressed: false
+              GlobalTableId: 0
+              table:
+                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                  serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+  Stage: Stage-2
+    Map Reduce
+      Alias -> Map Operator Tree:
+#### A masked pattern was here ####
+          TableScan
+            Reduce Output Operator
+              key expressions:
+                    expr: _col1
+                    type: bigint
+                    expr: _col0
+                    type: double
+              sort order: ++
+              tag: -1
+              TopN: 20
+              TopN Hash Memory Usage: 0.3
+              value expressions:
+                    expr: _col0
+                    type: double
+                    expr: _col1
+                    type: bigint
+      Reduce Operator Tree:
+        Extract
+          Limit
+            File Output Operator
+              compressed: false
+              GlobalTableId: 0
+              table:
+                  input format: org.apache.hadoop.mapred.TextInputFormat
+                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: 20
+
+
+PREHOOK: query: select cdouble, sum(ctinyint) as sum from alltypesorc where ctinyint is not null group by cdouble order by sum, cdouble limit 20
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+POSTHOOK: query: select cdouble, sum(ctinyint) as sum from alltypesorc where ctinyint is not null group by cdouble order by sum, cdouble limit 20
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+NULL	-32768
+-7196.0	-2009
+15601.0	-1733
+4811.0	-115
+-11322.0	-101
+-1121.0	-89
+7705.0	-88
+3520.0	-86
+-8118.0	-80
+5241.0	-80
+-11492.0	-78
+9452.0	-76
+557.0	-75
+10496.0	-67
+-15920.0	-64
+-10462.0	-64
+-9842.0	-64
+-8080.0	-64
+-6907.0	-64
+-4803.0	-64