You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/11/01 19:10:27 UTC
svn commit: r1538012 - in /hive/trunk/ql/src:
java/org/apache/hadoop/hive/ql/exec/
java/org/apache/hadoop/hive/ql/exec/vector/ test/queries/clientpositive/
test/results/clientpositive/
Author: hashutosh
Date: Fri Nov 1 18:10:26 2013
New Revision: 1538012
URL: http://svn.apache.org/r1538012
Log:
HIVE-5503 : TopN optimization in VectorReduceSink (Sergey Shelukhin via Ashutosh Chauhan)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
hive/trunk/ql/src/test/queries/clientpositive/vectorization_limit.q
hive/trunk/ql/src/test/results/clientpositive/vectorization_limit.q.out
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1538012&r1=1538011&r2=1538012&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Fri Nov 1 18:10:26 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.io.BinaryComparable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.Text;
@@ -90,9 +91,8 @@ public class ReduceSinkOperator extends
return inputAlias;
}
- // picks topN K:V pairs from input. can be null
- private transient TopNHash reducerHash;
-
+ // picks topN K:V pairs from input.
+ protected transient TopNHash reducerHash = new TopNHash();
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
@@ -136,7 +136,11 @@ public class ReduceSinkOperator extends
.newInstance();
valueSerializer.initialize(null, valueTableDesc.getProperties());
- reducerHash = createTopKHash();
+ int limit = conf.getTopN();
+ float memUsage = conf.getTopNMemoryUsage();
+ if (limit >= 0 && memUsage > 0) {
+ reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this);
+ }
firstRow = true;
initializeChildren(hconf);
@@ -146,26 +150,8 @@ public class ReduceSinkOperator extends
}
}
- private TopNHash createTopKHash() {
- int limit = conf.getTopN();
- float percent = conf.getTopNMemoryUsage();
- if (limit < 0 || percent <= 0) {
- return null;
- }
- if (limit == 0) {
- return TopNHash.create0();
- }
- // limit * 64 : compensation of arrays for key/value/hashcodes
- long threshold = (long) (percent * Runtime.getRuntime().maxMemory()) - limit * 64;
- if (threshold < 0) {
- return null;
- }
- return TopNHash.create(conf.isMapGroupBy(), limit, threshold, this);
- }
-
transient InspectableObject tempInspectableObject = new InspectableObject();
protected transient HiveKey keyWritable = new HiveKey();
- protected transient Writable value;
transient StructObjectInspector keyObjectInspector;
transient StructObjectInspector valueObjectInspector;
@@ -214,6 +200,7 @@ public class ReduceSinkOperator extends
if (outputColNames.size() > length) {
// union keys
+ assert distinctColIndices != null;
List<ObjectInspector> uois = new ArrayList<ObjectInspector>();
for (List<Integer> distinctCols : distinctColIndices) {
List<String> names = new ArrayList<String>();
@@ -240,6 +227,9 @@ public class ReduceSinkOperator extends
ObjectInspector rowInspector = inputObjInspectors[tag];
if (firstRow) {
firstRow = false;
+ // TODO: this is fishy - we init object inspectors based on first tag. We
+ // should either init for each tag, or if rowInspector doesn't really
+ // matter, then we can create this in ctor and get rid of firstRow.
keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval,
distinctColIndices,
conf.getOutputKeyColumnNames(), numDistributionKeys, rowInspector);
@@ -253,32 +243,6 @@ public class ReduceSinkOperator extends
cachedValues = new Object[valueEval.length];
}
- // Evaluate the HashCode
- int keyHashCode = 0;
- if (partitionEval.length == 0) {
- // If no partition cols, just distribute the data uniformly to provide
- // better
- // load balance. If the requirement is to have a single reducer, we
- // should set
- // the number of reducers to 1.
- // Use a constant seed to make the code deterministic.
- if (random == null) {
- random = new Random(12345);
- }
- keyHashCode = random.nextInt();
- } else {
- for (int i = 0; i < partitionEval.length; i++) {
- Object o = partitionEval[i].evaluate(row);
- keyHashCode = keyHashCode * 31
- + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
- }
- }
-
- // Evaluate the value
- for (int i = 0; i < valueEval.length; i++) {
- cachedValues[i] = valueEval[i].evaluate(row);
- }
-
// Evaluate the keys
for (int i = 0; i < numDistributionKeys; i++) {
cachedKeys[0][i] = keyEval[i].evaluate(row);
@@ -303,64 +267,21 @@ public class ReduceSinkOperator extends
}
}
- BytesWritable value = null;
- // Serialize the keys and append the tag
for (int i = 0; i < cachedKeys.length; i++) {
- if (keyIsText) {
- Text key = (Text) keySerializer.serialize(cachedKeys[i],
- keyObjectInspector);
- if (tag == -1) {
- keyWritable.set(key.getBytes(), 0, key.getLength());
- } else {
- int keyLength = key.getLength();
- keyWritable.setSize(keyLength + 1);
- System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
- keyWritable.get()[keyLength] = tagByte[0];
- }
- } else {
- // Must be BytesWritable
- BytesWritable key = (BytesWritable) keySerializer.serialize(
- cachedKeys[i], keyObjectInspector);
- if (tag == -1) {
- keyWritable.set(key.getBytes(), 0, key.getLength());
- } else {
- int keyLength = key.getLength();
- keyWritable.setSize(keyLength + 1);
- System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
- keyWritable.get()[keyLength] = tagByte[0];
- }
- }
- keyWritable.setHashCode(keyHashCode);
-
- if (reducerHash == null) {
- if (null != out) {
- collect(keyWritable, value = getValue(row, value));
- }
- } else {
- int index = reducerHash.indexOf(keyWritable);
- if (index == TopNHash.EXCLUDED) {
- continue;
- }
- value = getValue(row, value);
- if (index >= 0) {
- reducerHash.set(index, value);
- } else {
- if (index == TopNHash.FORWARD) {
- collect(keyWritable, value);
- } else if (index == TopNHash.FLUSH) {
- LOG.info("Top-N hash is flushed");
- reducerHash.flush();
- // we can now retry adding key/value into hash, which is flushed.
- // but for simplicity, just forward them
- collect(keyWritable, value);
- } else if (index == TopNHash.DISABLE) {
- LOG.info("Top-N hash is disabled");
- reducerHash.flush();
- collect(keyWritable, value);
- reducerHash = null;
- }
- }
+ // Serialize the keys and append the tag
+ Object keyObj = keySerializer.serialize(cachedKeys[i], keyObjectInspector);
+ setKeyWritable(keyIsText ? (Text)keyObj : (BytesWritable)keyObj, tag);
+ int topNIndex = reducerHash.tryStoreKey(keyWritable);
+ if (TopNHash.EXCLUDED == topNIndex) continue;
+ int keyHashCode = computeHashCode(row);
+ BytesWritable valueWritable = getValue(row);
+ if (TopNHash.FORWARD == topNIndex) {
+ keyWritable.setHashCode(keyHashCode);
+ collect(keyWritable, valueWritable);
+ continue;
}
+ assert topNIndex >= 0;
+ reducerHash.storeValue(topNIndex, valueWritable, keyHashCode, false);
}
} catch (HiveException e) {
throw e;
@@ -369,10 +290,58 @@ public class ReduceSinkOperator extends
}
}
- public void collect(BytesWritable key, BytesWritable value) throws IOException {
+ private int computeHashCode(Object row) throws HiveException {
+ // Evaluate the HashCode
+ int keyHashCode = 0;
+ if (partitionEval.length == 0) {
+ // If no partition cols, just distribute the data uniformly to provide
+ // better
+ // load balance. If the requirement is to have a single reducer, we
+ // should set
+ // the number of reducers to 1.
+ // Use a constant seed to make the code deterministic.
+ if (random == null) {
+ random = new Random(12345);
+ }
+ keyHashCode = random.nextInt();
+ } else {
+ for (int i = 0; i < partitionEval.length; i++) {
+ Object o = partitionEval[i].evaluate(row);
+ keyHashCode = keyHashCode * 31
+ + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
+ }
+ }
+ return keyHashCode;
+ }
+
+ protected void setKeyWritable(BinaryComparable key, int tag) {
+ if (tag == -1) {
+ keyWritable.set(key.getBytes(), 0, key.getLength());
+ } else {
+ int keyLength = key.getLength();
+ keyWritable.setSize(keyLength + 1);
+ System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
+ keyWritable.get()[keyLength] = tagByte[0];
+ }
+ }
+
+ public void collect(byte[] key, byte[] value, int hash) throws IOException {
+ HiveKey keyWritable = new HiveKey(key, hash);
+ BytesWritable valueWritable = new BytesWritable(value);
+ collect(keyWritable, valueWritable);
+ }
+
+ protected void collect(byte[] key, Writable valueWritable, int hash) throws IOException {
+ HiveKey keyWritable = new HiveKey(key, hash);
+ collect(keyWritable, valueWritable);
+ }
+
+ protected void collect(BytesWritable keyWritable, Writable valueWritable) throws IOException {
// Since this is a terminal operator, update counters explicitly -
// forward is not called
- out.collect(key, value);
+ if (null != out) {
+ out.collect(keyWritable, valueWritable);
+ }
if (++outputRows % 1000 == 0) {
if (counterNameToEnum != null) {
incrCounter(numOutputRowsCntr, outputRows);
@@ -382,11 +351,7 @@ public class ReduceSinkOperator extends
}
}
- // evaluate value lazily
- private BytesWritable getValue(Object row, BytesWritable value) throws Exception {
- if (value != null) {
- return value;
- }
+ private BytesWritable getValue(Object row) throws Exception {
// Evaluate the value
for (int i = 0; i < valueEval.length; i++) {
cachedValues[i] = valueEval[i].evaluate(row);
@@ -397,16 +362,9 @@ public class ReduceSinkOperator extends
@Override
protected void closeOp(boolean abort) throws HiveException {
- if (!abort && reducerHash != null) {
- try {
- reducerHash.flush();
- } catch (IOException e) {
- throw new HiveException(e);
- } finally {
- reducerHash = null;
- }
+ if (!abort) {
+ reducerHash.flush();
}
- reducerHash = null;
super.closeOp(abort);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java?rev=1538012&r1=1538011&r2=1538012&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java Fri Nov 1 18:10:26 2013
@@ -20,49 +20,65 @@ package org.apache.hadoop.hive.ql.exec;
import java.io.IOException;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
import java.util.SortedSet;
+import java.util.TreeMap;
import java.util.TreeSet;
import com.google.common.collect.MinMaxPriorityQueue;
-import org.apache.hadoop.hive.ql.io.HiveKey;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.io.BinaryComparable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.OutputCollector;
/**
* Stores binary key/value in sorted manner to get top-n key/value
+ * TODO: rename to TopNHeap?
*/
-abstract class TopNHash {
+public class TopNHash {
+ public static Log LOG = LogFactory.getLog(TopNHash.class);
/**
* For interaction between operator and top-n hash.
* Currently only used to forward key/values stored in hash.
*/
- public static interface BinaryCollector extends OutputCollector<BytesWritable, BytesWritable> {
+ public static interface BinaryCollector {
+ public void collect(byte[] key, byte[] value, int hash) throws IOException;
}
- protected static final int FORWARD = -1;
- protected static final int EXCLUDED = -2;
- protected static final int FLUSH = -3;
- protected static final int DISABLE = -4;
+ public static final int FORWARD = -1;
+ public static final int EXCLUDED = -2;
+ private static final int FLUSH = -3;
+ private static final int DISABLE = -4;
+ private static final int MAY_FORWARD = -5;
+
+ private BinaryCollector collector;
+ private int topN;
- protected final int topN;
- protected final BinaryCollector collector;
+ private long threshold; // max heap size
+ private long usage;
- protected final long threshold; // max heap size
- protected long usage; // heap usage (not exact)
+ // binary keys, values and hashCodes of rows, lined up by index
+ private byte[][] keys;
+ private byte[][] values;
+ private int[] hashes;
+ private IndexStore indexes; // The heap over the keys, storing indexes in the array.
- // binary keys, binary values and hashcodes of keys, lined up by index
- protected final byte[][] keys;
- protected final byte[][] values;
- protected final int[] hashes;
+ private int evicted; // recently evicted index (used for next key/value)
+ private int excluded; // count of excluded rows from previous flush
- protected int evicted; // recetly evicted index (the biggest one. used for next key/value)
- protected int excluded; // count of excluded rows from previous flush
+ // temporary stuff used for vectorization
+ private int batchNumForwards = 0; // whether current batch has any forwarded keys
+ private int[] indexToBatchIndex; // mapping of index (lined up w/keys) to index in the batch
- protected final Comparator<Integer> C = new Comparator<Integer>() {
+ private boolean isEnabled = false;
+
+ private final Comparator<Integer> C = new Comparator<Integer>() {
public int compare(Integer o1, Integer o2) {
byte[] key1 = keys[o1];
byte[] key2 = keys[o2];
@@ -70,29 +86,201 @@ abstract class TopNHash {
}
};
- public static TopNHash create0() {
- return new HashForLimit0();
- }
-
- public static TopNHash create(boolean grouped, int topN, long threshold,
- BinaryCollector collector) {
+ public void initialize(
+ int topN, float memUsage, boolean isMapGroupBy, BinaryCollector collector) {
+ assert topN >= 0 && memUsage > 0;
+ assert !this.isEnabled;
+ this.isEnabled = false;
+ this.topN = topN;
+ this.collector = collector;
if (topN == 0) {
- return new HashForLimit0();
+ isEnabled = true;
+ return; // topN == 0 will cause a short-circuit, don't need any initialization
}
- if (grouped) {
- return new HashForGroup(topN, threshold, collector);
- }
- return new HashForRow(topN, threshold, collector);
- }
- TopNHash(int topN, long threshold, BinaryCollector collector) {
- this.topN = topN;
- this.threshold = threshold;
- this.collector = collector;
+ // limit * 64 : compensation of arrays for key/value/hashcodes
+ this.threshold = (long) (memUsage * Runtime.getRuntime().maxMemory()) - topN * 64;
+ if (threshold < 0) {
+ return;
+ }
+ this.indexes = isMapGroupBy ? new HashForGroup() : new HashForRow();
this.keys = new byte[topN + 1][];
this.values = new byte[topN + 1][];
this.hashes = new int[topN + 1];
this.evicted = topN;
+ this.isEnabled = true;
+ }
+
+ /**
+ * Try store the non-vectorized key.
+ * @param key Serialized key.
+ * @return TopNHash.FORWARD if the row should be forwarded;
+ * TopNHash.EXCLUDED if the row should be discarded;
+ * any other number if the row is to be stored; the index should be passed to storeValue.
+ */
+ public int tryStoreKey(BytesWritable key) throws HiveException, IOException {
+ if (!isEnabled) {
+ return FORWARD; // short-circuit quickly - forward all rows
+ }
+ if (topN == 0) {
+ return EXCLUDED; // short-circuit quickly - eat all rows
+ }
+ int index = insertKeyIntoHeap(key);
+ if (index >= 0) {
+ usage += key.getLength();
+ return index;
+ }
+ // IndexStore is trying to tell us something.
+ switch (index) {
+ case DISABLE: {
+ LOG.info("Top-N hash is disabled");
+ flushInternal();
+ isEnabled = false;
+ return FORWARD;
+ }
+ case FLUSH: {
+ LOG.info("Top-N hash is flushed");
+ flushInternal();
+ // we can now retry adding key/value into hash, which is flushed.
+ // but for simplicity, just forward them
+ return FORWARD;
+ }
+ case FORWARD: return FORWARD;
+ case EXCLUDED: return EXCLUDED; // skip the row.
+ default: {
+ assert false;
+ throw new HiveException("Invalid result trying to store the key: " + index);
+ }
+ }
+ }
+
+
+ /**
+ * Perform basic checks and initialize TopNHash for the new vectorized row batch.
+ * @return TopNHash.FORWARD if all rows should be forwarded w/o trying to call TopN;
+ * TopNHash.EXCLUDED if all rows should be discarded w/o trying to call TopN;
+ * any other result means the batch has been started.
+ */
+ public int startVectorizedBatch() throws IOException, HiveException {
+ if (!isEnabled) {
+ return FORWARD; // short-circuit quickly - forward all rows
+ } else if (topN == 0) {
+ return EXCLUDED; // short-circuit quickly - eat all rows
+ }
+ // Flush here if the memory usage is too high. After that, we have the entire
+ // batch already in memory anyway so we will bypass the memory checks.
+ if (usage > threshold) {
+ int excluded = this.excluded;
+ LOG.info("Top-N hash is flushing rows");
+ flushInternal();
+ if (excluded == 0) {
+ LOG.info("Top-N hash has been disabled");
+ isEnabled = false;
+ return FORWARD; // Hash is ineffective, disable.
+ }
+ }
+ if (indexToBatchIndex == null) {
+ indexToBatchIndex = new int[topN + 1]; // for current batch, contains key index in the batch
+ }
+ Arrays.fill(indexToBatchIndex, -1);
+ batchNumForwards = 0;
+ return 0;
+ }
+
+ /**
+ * Try to put the key from the current vectorized batch into the heap.
+ * @param key the key.
+ * @param batchIndex The index of the key in the vectorized batch (sequential, not .selected).
+ * @param results The results; the number of elements equivalent to vrg.size, by kindex.
+ * The result should be the same across the calls for the batch; in then end, for each k-index:
+ * - TopNHash.EXCLUDED - discard the row.
+ * - positive index - store the row using storeValue, same as tryStoreRow.
+ * - negative index - forward the row. getVectorizedKeyToForward called w/this index will
+ * return the key to use so it doesn't have to be rebuilt.
+ */
+ public void tryStoreVectorizedKey(BytesWritable key, int batchIndex, int[] results)
+ throws HiveException, IOException {
+ // Assumption - batchIndex is increasing; startVectorizedBatch was called
+ int size = indexes.size();
+ int index = size < topN ? size : evicted;
+ keys[index] = Arrays.copyOf(key.getBytes(), key.getLength());
+ Integer collisionIndex = indexes.store(index);
+ if (null != collisionIndex) {
+ // forward conditional on the survival of the corresponding key currently in indexes.
+ ++batchNumForwards;
+ results[batchIndex] = MAY_FORWARD - collisionIndex;
+ return;
+ }
+ indexToBatchIndex[index] = batchIndex;
+ results[batchIndex] = index;
+ if (size != topN) return;
+ evicted = indexes.removeBiggest(); // remove the biggest key
+ if (index == evicted) {
+ excluded++;
+ results[batchIndex] = EXCLUDED;
+ indexToBatchIndex[index] = -1;
+ return; // input key is bigger than any of keys in hash
+ }
+ removed(evicted);
+ int evictedBatchIndex = indexToBatchIndex[evicted];
+ if (evictedBatchIndex >= 0) {
+ // reset the result for the evicted index
+ results[evictedBatchIndex] = EXCLUDED;
+ indexToBatchIndex[evicted] = -1;
+ }
+ // Also evict all results grouped with this index; cannot be current key or before it.
+ if (batchNumForwards > 0) {
+ int evictedForward = (MAY_FORWARD - evicted);
+ boolean forwardRemoved = false;
+ for (int i = evictedBatchIndex + 1; i < batchIndex; ++i) {
+ if (results[i] == evictedForward) {
+ results[i] = EXCLUDED;
+ forwardRemoved = true;
+ }
+ }
+ if (forwardRemoved) {
+ --batchNumForwards;
+ }
+ }
+ }
+
+ /**
+ * After vectorized batch is processed, can return the key that caused a particular row
+ * to be forwarded. Because the row could only be marked to forward because it has
+ * the same key with some row already in the heap (for GBY), we can use that key from the
+ * heap to emit the forwarded row.
+ * @param index Negative index from the vectorized result. See tryStoreVectorizedKey.
+ * @return The key corresponding to the row.
+ */
+ public byte[] getVectorizedKeyToForward(int index) {
+ assert index <= MAY_FORWARD;
+ return keys[MAY_FORWARD - index];
+ }
+
+ /**
+ * Stores the value for the key in the heap.
+ * @param index The index, either from tryStoreKey or from tryStoreVectorizedKey result.
+ * @param value The value to store.
+ * @param keyHash The key hash to store.
+ * @param vectorized Whether the result is coming from a vectorized batch.
+ */
+ public void storeValue(int index, BytesWritable value, int keyHash, boolean vectorized) {
+ values[index] = Arrays.copyOf(value.getBytes(), value.getLength());
+ hashes[index] = keyHash;
+ // Vectorized doesn't adjust usage for the keys while processing the batch
+ usage += values[index].length + (vectorized ? keys[index].length : 0);
+ }
+
+ /**
+ * Flushes all the rows cached in the heap.
+ */
+ public void flush() throws HiveException {
+ if (!isEnabled || (topN == 0)) return;
+ try {
+ flushInternal();
+ } catch (IOException ex) {
+ throw new HiveException(ex);
+ }
}
/**
@@ -104,15 +292,14 @@ abstract class TopNHash {
* -3 for FLUSH : memory is not enough. flush values (keep keys only)
* -4 for DISABLE : hash is not effective. flush and disable it
*/
- public int indexOf(HiveKey key) {
- int size = size();
+ private int insertKeyIntoHeap(BinaryComparable key) {
if (usage > threshold) {
return excluded == 0 ? DISABLE : FLUSH;
}
+ int size = indexes.size();
int index = size < topN ? size : evicted;
keys[index] = Arrays.copyOf(key.getBytes(), key.getLength());
- hashes[index] = key.hashCode();
- if (!store(index)) {
+ if (null != indexes.store(index)) {
// it's only for GBY which should forward all values associated with the key in the range
// of limit. new value should be attatched with the key but in current implementation,
// only one values is allowed. with map-aggreagtion which is true by default,
@@ -120,7 +307,7 @@ abstract class TopNHash {
return FORWARD;
}
if (size == topN) {
- evicted = removeBiggest(); // remove the biggest key
+ evicted = indexes.removeBiggest(); // remove the biggest key
if (index == evicted) {
excluded++;
return EXCLUDED; // input key is bigger than any of keys in hash
@@ -130,130 +317,93 @@ abstract class TopNHash {
return index;
}
- protected abstract int size();
-
- protected abstract boolean store(int index);
-
- protected abstract int removeBiggest();
-
- protected abstract Iterable<Integer> indexes();
-
// key/value of the index is removed. retrieve memory usage
- public void removed(int index) {
+ private void removed(int index) {
usage -= keys[index].length;
keys[index] = null;
if (values[index] != null) {
- // value can be null if hash is flushed, which only keeps keys for limiting rows
usage -= values[index].length;
values[index] = null;
}
hashes[index] = -1;
}
- public void set(int index, BytesWritable value) {
- values[index] = Arrays.copyOf(value.getBytes(), value.getLength());
- usage += keys[index].length + values[index].length;
- }
-
- public void flush() throws IOException {
- for (int index : indexes()) {
- flush(index);
+ private void flushInternal() throws IOException, HiveException {
+ for (int index : indexes.indexes()) {
+ if (index != evicted && values[index] != null) {
+ collector.collect(keys[index], values[index], hashes[index]);
+ usage -= values[index].length;
+ values[index] = null;
+ hashes[index] = -1;
+ }
}
excluded = 0;
}
- protected void flush(int index) throws IOException {
- if (index != evicted && values[index] != null) {
- // BytesWritable copies array for set method. So just creats new one
- HiveKey keyWritable = new HiveKey(keys[index], hashes[index]);
- BytesWritable valueWritable = new BytesWritable(values[index]);
- collector.collect(keyWritable, valueWritable);
- usage -= values[index].length;
- values[index] = null;
- }
- }
-}
-
-/**
- * for order by, same keys are counted (For 1-2-2-3-4, limit 3 is 1-2-2)
- * MinMaxPriorityQueue is used because it alows duplication and fast access to biggest one
- */
-class HashForRow extends TopNHash {
-
- private final MinMaxPriorityQueue<Integer> indexes;
-
- HashForRow(int topN, long threshold, BinaryCollector collector) {
- super(topN, threshold, collector);
- this.indexes = MinMaxPriorityQueue.orderedBy(C).create();
- }
-
- protected int size() {
- return indexes.size();
- }
-
- // returns true always
- protected boolean store(int index) {
- return indexes.add(index);
+ private interface IndexStore {
+ int size();
+ /**
+ * @return the index which caused the item to be rejected; or null if accepted
+ */
+ Integer store(int index);
+ int removeBiggest();
+ Iterable<Integer> indexes();
}
- protected int removeBiggest() {
- return indexes.removeLast();
- }
-
- protected Iterable<Integer> indexes() {
- Integer[] array = indexes.toArray(new Integer[indexes.size()]);
- Arrays.sort(array, 0, array.length, C);
- return Arrays.asList(array);
- }
-}
-
-/**
- * for group by, same keys are not counted (For 1-2-2-3-4, limit 3 is 1-2-(2)-3)
- * simple TreeMap is used because group by does not need keep duplicated keys
- */
-class HashForGroup extends TopNHash {
+ /**
+ * for order by, same keys are counted (For 1-2-2-3-4, limit 3 is 1-2-2)
+ * MinMaxPriorityQueue is used because it alows duplication and fast access to biggest one
+ */
+ private class HashForRow implements IndexStore {
+ private final MinMaxPriorityQueue<Integer> indexes = MinMaxPriorityQueue.orderedBy(C).create();
- private final SortedSet<Integer> indexes;
+ public int size() {
+ return indexes.size();
+ }
- HashForGroup(int topN, long threshold, BinaryCollector collector) {
- super(topN, threshold, collector);
- this.indexes = new TreeSet<Integer>(C);
- }
+ // returns null always
+ public Integer store(int index) {
+ boolean result = indexes.add(index);
+ assert result;
+ return null;
+ }
- protected int size() {
- return indexes.size();
- }
+ public int removeBiggest() {
+ return indexes.removeLast();
+ }
- // returns false if index already exists in map
- protected boolean store(int index) {
- return indexes.add(index);
+ public Iterable<Integer> indexes() {
+ Integer[] array = indexes.toArray(new Integer[indexes.size()]);
+ Arrays.sort(array, 0, array.length, C);
+ return Arrays.asList(array);
+ }
}
- protected int removeBiggest() {
- Integer last = indexes.last();
- indexes.remove(last);
- return last;
- }
+ /**
+ * for group by, same keys are not counted (For 1-2-2-3-4, limit 3 is 1-2-(2)-3)
+ * simple TreeMap is used because group by does not need keep duplicated keys
+ */
+ private class HashForGroup implements IndexStore {
+ // TreeSet anyway uses TreeMap; so use plain TreeMap to be able to get value in collisions.
+ private final TreeMap<Integer, Integer> indexes = new TreeMap<Integer, Integer>(C);
- protected Iterable<Integer> indexes() {
- return indexes;
- }
-}
+ public int size() {
+ return indexes.size();
+ }
-class HashForLimit0 extends TopNHash {
+ // returns false if index already exists in map
+ public Integer store(int index) {
+ return indexes.put(index, index);
+ }
- HashForLimit0() {
- super(0, 0, null);
- }
+ public int removeBiggest() {
+ Integer last = indexes.lastKey();
+ indexes.remove(last);
+ return last;
+ }
- @Override
- public int indexOf(HiveKey key) {
- return EXCLUDED;
+ public Iterable<Integer> indexes() {
+ return indexes.keySet();
+ }
}
-
- protected int size() { return 0; }
- protected boolean store(int index) { return false; }
- protected int removeBiggest() { return 0; }
- protected Iterable<Integer> indexes() { return Collections.emptyList(); }
}
-
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java?rev=1538012&r1=1538011&r2=1538012&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java Fri Nov 1 18:10:26 2013
@@ -19,12 +19,14 @@
package org.apache.hadoop.hive.ql.exec.vector;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TopNHash;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
@@ -39,6 +41,7 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
public class VectorReduceSinkOperator extends ReduceSinkOperator {
@@ -51,42 +54,44 @@ public class VectorReduceSinkOperator ex
* The evaluators for the key columns. Key columns decide the sort order on
* the reducer side. Key columns are passed to the reducer in the "key".
*/
- protected VectorExpression[] keyEval;
+ private VectorExpression[] keyEval;
/**
* The key value writers. These know how to write the necessary writable type
* based on key column metadata, from the primitive vector type.
*/
- protected transient VectorExpressionWriter[] keyWriters;
+ private transient VectorExpressionWriter[] keyWriters;
/**
* The evaluators for the value columns. Value columns are passed to reducer
* in the "value".
*/
- protected VectorExpression[] valueEval;
+ private VectorExpression[] valueEval;
/**
* The output value writers. These know how to write the necessary writable type
* based on value column metadata, from the primitive vector type.
*/
- protected transient VectorExpressionWriter[] valueWriters;
+ private transient VectorExpressionWriter[] valueWriters;
/**
* The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in
* Hive language). Partition columns decide the reducer that the current row
* goes to. Partition columns are not passed to reducer.
*/
- protected VectorExpression[] partitionEval;
+ private VectorExpression[] partitionEval;
/**
* The partition value writers. These know how to write the necessary writable type
* based on partition column metadata, from the primitive vector type.
*/
- protected transient VectorExpressionWriter[] partitionWriters;
+ private transient VectorExpressionWriter[] partitionWriters;
- transient ObjectInspector keyObjectInspector;
- transient ObjectInspector valueObjectInspector;
- transient int [] keyHashCode = new int [VectorizedRowBatch.DEFAULT_SIZE];
+ private transient ObjectInspector keyObjectInspector;
+ private transient ObjectInspector valueObjectInspector;
+ private transient int [] keyHashCode = new int [VectorizedRowBatch.DEFAULT_SIZE];
+
+ private transient int[] hashResult; // the pre-created array for reducerHash results
public VectorReduceSinkOperator(VectorizationContext vContext, OperatorDesc conf)
throws HiveException {
@@ -183,6 +188,11 @@ public class VectorReduceSinkOperator ex
tagByte[0] = (byte) tag;
LOG.info("Using tag = " + tag);
+ int limit = conf.getTopN();
+ float memUsage = conf.getTopNMemoryUsage();
+ if (limit >= 0 && memUsage > 0) {
+ reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this);
+ }
} catch(Exception e) {
throw new HiveException(e);
}
@@ -215,21 +225,22 @@ public class VectorReduceSinkOperator ex
Object[] distributionKeys = new Object[numDistributionKeys];
- // Emit a (k,v) pair for each row in the batch
- //
+ // Determine which rows we need to emit based on topN optimization
+ int startResult = reducerHash.startVectorizedBatch();
+ if (startResult == TopNHash.EXCLUDED) {
+ return; // TopN wants us to exclude all rows.
+ }
+ boolean useTopN = startResult != TopNHash.FORWARD;
+ if (useTopN && (hashResult == null || hashResult.length < vrg.size)) {
+ hashResult = new int[Math.max(vrg.size, VectorizedRowBatch.DEFAULT_SIZE)];
+ }
+
for (int j = 0 ; j < vrg.size; ++j) {
int rowIndex = j;
if (vrg.selectedInUse) {
rowIndex = vrg.selected[j];
}
- for (int i = 0; i < valueEval.length; i++) {
- int batchColumn = valueEval[i].getOutputColumn();
- ColumnVector vectorColumn = vrg.cols[batchColumn];
- cachedValues[i] = valueWriters[i].writeValue(vectorColumn, rowIndex);
- }
- // Serialize the value
- value = valueSerializer.serialize(cachedValues, valueObjectInspector);
-
+ // First, evaluate the key - the way things stand we'd need it regardless.
for (int i = 0; i < keyEval.length; i++) {
int batchColumn = keyEval[i].getOutputColumn();
ColumnVector vectorColumn = vrg.cols[batchColumn];
@@ -237,69 +248,42 @@ public class VectorReduceSinkOperator ex
}
// no distinct key
System.arraycopy(distributionKeys, 0, cachedKeys[0], 0, numDistributionKeys);
- // Serialize the keys and append the tag
+ // TopN is not supported for multi-distinct currently. If we have more cachedKeys
+ // than one for every input key horrible things will happen (OOB error on array likely).
+ assert !useTopN || cachedKeys.length <= 1;
for (int i = 0; i < cachedKeys.length; i++) {
- if (keyIsText) {
- Text key = (Text) keySerializer.serialize(cachedKeys[i],
- keyObjectInspector);
- if (tag == -1) {
- keyWritable.set(key.getBytes(), 0, key.getLength());
- } else {
- int keyLength = key.getLength();
- keyWritable.setSize(keyLength + 1);
- System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
- keyWritable.get()[keyLength] = tagByte[0];
- }
- } else {
- // Must be BytesWritable
- BytesWritable key = (BytesWritable) keySerializer.serialize(
- cachedKeys[i], keyObjectInspector);
- if (tag == -1) {
- keyWritable.set(key.getBytes(), 0, key.getLength());
- } else {
- int keyLength = key.getLength();
- keyWritable.setSize(keyLength + 1);
- System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
- keyWritable.get()[keyLength] = tagByte[0];
- }
- }
- // Evaluate the HashCode
- int keyHashCode = 0;
- if (partitionEval.length == 0) {
- // If no partition cols, just distribute the data uniformly to provide
- // better
- // load balance. If the requirement is to have a single reducer, we
- // should set
- // the number of reducers to 1.
- // Use a constant seed to make the code deterministic.
- if (random == null) {
- random = new Random(12345);
- }
- keyHashCode = random.nextInt();
+ // Serialize the keys and append the tag.
+ Object keyObj = keySerializer.serialize(cachedKeys[i], keyObjectInspector);
+ setKeyWritable(keyIsText ? (Text)keyObj : (BytesWritable)keyObj, tag);
+ if (useTopN) {
+ reducerHash.tryStoreVectorizedKey(keyWritable, j, hashResult);
} else {
- for (int p = 0; p < partitionEval.length; p++) {
- ColumnVector columnVector = vrg.cols[partitionEval[p].getOutputColumn()];
- Object partitionValue = partitionWriters[p].writeValue(columnVector, rowIndex);
- keyHashCode = keyHashCode
- * 31
- + ObjectInspectorUtils.hashCode(
- partitionValue,
- partitionWriters[p].getObjectInspector());
- }
- }
- keyWritable.setHashCode(keyHashCode);
- if (out != null) {
- out.collect(keyWritable, value);
- // Since this is a terminal operator, update counters explicitly -
- // forward is not called
- if (counterNameToEnum != null) {
- ++outputRows;
- if (outputRows % 1000 == 0) {
- incrCounter(numOutputRowsCntr, outputRows);
- outputRows = 0;
- }
- }
- }
+ // No TopN, just forward the key
+ keyWritable.setHashCode(computeHashCode(vrg, rowIndex));
+ collect(keyWritable, makeValueWritable(vrg, rowIndex));
+ }
+ }
+ }
+
+ if (!useTopN) return; // All done.
+
+ // If we use topN, we have called tryStore on every key now. We can process the results.
+ for (int j = 0 ; j < vrg.size; ++j) {
+ int index = hashResult[j];
+ if (index == TopNHash.EXCLUDED) continue;
+ int rowIndex = j;
+ if (vrg.selectedInUse) {
+ rowIndex = vrg.selected[j];
+ }
+ // Compute everything now - we'd either store it, or forward it.
+ int hashCode = computeHashCode(vrg, rowIndex);
+ BytesWritable value = makeValueWritable(vrg, rowIndex);
+ if (index < 0) {
+ // Kinda hacky; see getVectorizedKeyToForward javadoc.
+ byte[] key = reducerHash.getVectorizedKeyToForward(index);
+ collect(key, value, hashCode);
+ } else {
+ reducerHash.storeValue(index, value, hashCode, true);
}
}
} catch (SerDeException e) {
@@ -309,6 +293,45 @@ public class VectorReduceSinkOperator ex
}
}
+ private BytesWritable makeValueWritable(VectorizedRowBatch vrg, int rowIndex)
+ throws HiveException, SerDeException {
+ for (int i = 0; i < valueEval.length; i++) {
+ int batchColumn = valueEval[i].getOutputColumn();
+ ColumnVector vectorColumn = vrg.cols[batchColumn];
+ cachedValues[i] = valueWriters[i].writeValue(vectorColumn, rowIndex);
+ }
+ // Serialize the value
+ return (BytesWritable)valueSerializer.serialize(cachedValues, valueObjectInspector);
+ }
+
+ private int computeHashCode(VectorizedRowBatch vrg, int rowIndex) throws HiveException {
+ // Evaluate the HashCode
+ int keyHashCode = 0;
+ if (partitionEval.length == 0) {
+ // If no partition cols, just distribute the data uniformly to provide
+ // better
+ // load balance. If the requirement is to have a single reducer, we
+ // should set
+ // the number of reducers to 1.
+ // Use a constant seed to make the code deterministic.
+ if (random == null) {
+ random = new Random(12345);
+ }
+ keyHashCode = random.nextInt();
+ } else {
+ for (int p = 0; p < partitionEval.length; p++) {
+ ColumnVector columnVector = vrg.cols[partitionEval[p].getOutputColumn()];
+ Object partitionValue = partitionWriters[p].writeValue(columnVector, rowIndex);
+ keyHashCode = keyHashCode
+ * 31
+ + ObjectInspectorUtils.hashCode(
+ partitionValue,
+ partitionWriters[p].getObjectInspector());
+ }
+ }
+ return keyHashCode;
+ }
+
static public String getOperatorName() {
return "RS";
}
Modified: hive/trunk/ql/src/test/queries/clientpositive/vectorization_limit.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/vectorization_limit.q?rev=1538012&r1=1538011&r2=1538012&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/vectorization_limit.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/vectorization_limit.q Fri Nov 1 18:10:26 2013
@@ -1,3 +1,37 @@
SET hive.vectorized.execution.enabled=true;
explain SELECT cbigint, cdouble FROM alltypesorc WHERE cbigint < cdouble and cint > 0 limit 7;
SELECT cbigint, cdouble FROM alltypesorc WHERE cbigint < cdouble and cint > 0 limit 7;
+
+set hive.optimize.reducededuplication.min.reducer=1;
+set hive.limit.pushdown.memory.usage=0.3f;
+
+-- HIVE-3562 Some limit can be pushed down to map stage - c/p parts from limit_pushdown
+
+explain
+select ctinyint,cdouble,csmallint from alltypesorc where ctinyint is not null order by ctinyint,cdouble limit 20;
+select ctinyint,cdouble,csmallint from alltypesorc where ctinyint is not null order by ctinyint,cdouble limit 20;
+
+-- deduped RS
+explain
+select ctinyint,avg(cdouble + 1) from alltypesorc group by ctinyint order by ctinyint limit 20;
+select ctinyint,avg(cdouble + 1) from alltypesorc group by ctinyint order by ctinyint limit 20;
+
+-- distincts
+explain
+select distinct(ctinyint) from alltypesorc limit 20;
+select distinct(ctinyint) from alltypesorc limit 20;
+
+explain
+select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20;
+select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20;
+
+-- limit zero
+explain
+select ctinyint,cdouble from alltypesorc order by ctinyint limit 0;
+select ctinyint,cdouble from alltypesorc order by ctinyint limit 0;
+
+-- 2MR (applied to last RS)
+explain
+select cdouble, sum(ctinyint) as sum from alltypesorc where ctinyint is not null group by cdouble order by sum, cdouble limit 20;
+select cdouble, sum(ctinyint) as sum from alltypesorc where ctinyint is not null group by cdouble order by sum, cdouble limit 20;
+
Modified: hive/trunk/ql/src/test/results/clientpositive/vectorization_limit.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/vectorization_limit.q.out?rev=1538012&r1=1538011&r2=1538012&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/vectorization_limit.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/vectorization_limit.q.out Fri Nov 1 18:10:26 2013
@@ -62,3 +62,628 @@ POSTHOOK: Input: default@alltypesorc
-1887561756 -8881.0
-1887561756 -2281.0
-1887561756 9531.0
+PREHOOK: query: -- HIVE-3562 Some limit can be pushed down to map stage - c/p parts from limit_pushdown
+
+explain
+select ctinyint,cdouble,csmallint from alltypesorc where ctinyint is not null order by ctinyint,cdouble limit 20
+PREHOOK: type: QUERY
+POSTHOOK: query: -- HIVE-3562 Some limit can be pushed down to map stage - c/p parts from limit_pushdown
+
+explain
+select ctinyint,cdouble,csmallint from alltypesorc where ctinyint is not null order by ctinyint,cdouble limit 20
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME alltypesorc))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL ctinyint)) (TOK_SELEXPR (TOK_TABLE_OR_COL cdouble)) (TOK_SELEXPR (TOK_TABLE_OR_COL csmallint))) (TOK_WHERE (TOK_FUNCTION TOK_ISNOTNULL (TOK_TABLE_OR_COL ctinyint))) (TOK_ORDERBY (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL ctinyint)) (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL cdouble))) (TOK_LIMIT 20)))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ alltypesorc
+ TableScan
+ alias: alltypesorc
+ Filter Operator
+ predicate:
+ expr: ctinyint is not null
+ type: boolean
+ Vectorized execution: true
+ Select Operator
+ expressions:
+ expr: ctinyint
+ type: tinyint
+ expr: cdouble
+ type: double
+ expr: csmallint
+ type: smallint
+ outputColumnNames: _col0, _col1, _col2
+ Vectorized execution: true
+ Reduce Output Operator
+ key expressions:
+ expr: _col0
+ type: tinyint
+ expr: _col1
+ type: double
+ sort order: ++
+ tag: -1
+ TopN: 20
+ TopN Hash Memory Usage: 0.3
+ value expressions:
+ expr: _col0
+ type: tinyint
+ expr: _col1
+ type: double
+ expr: _col2
+ type: smallint
+ Vectorized execution: true
+ Reduce Operator Tree:
+ Extract
+ Limit
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: 20
+
+
+PREHOOK: query: select ctinyint,cdouble,csmallint from alltypesorc where ctinyint is not null order by ctinyint,cdouble limit 20
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+POSTHOOK: query: select ctinyint,cdouble,csmallint from alltypesorc where ctinyint is not null order by ctinyint,cdouble limit 20
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+-64 -15920.0 -15920
+-64 -10462.0 -10462
+-64 -9842.0 -9842
+-64 -8080.0 -8080
+-64 -7196.0 -7196
+-64 -7196.0 -7196
+-64 -7196.0 -7196
+-64 -7196.0 -7196
+-64 -7196.0 -7196
+-64 -7196.0 -7196
+-64 -7196.0 -7196
+-64 -6907.0 -6907
+-64 -4803.0 -4803
+-64 -4040.0 -4040
+-64 -4018.0 -4018
+-64 -3586.0 -3586
+-64 -3097.0 -3097
+-64 -2919.0 -2919
+-64 -1600.0 -1600
+-64 -200.0 -200
+PREHOOK: query: -- deduped RS
+explain
+select ctinyint,avg(cdouble + 1) from alltypesorc group by ctinyint order by ctinyint limit 20
+PREHOOK: type: QUERY
+POSTHOOK: query: -- deduped RS
+explain
+select ctinyint,avg(cdouble + 1) from alltypesorc group by ctinyint order by ctinyint limit 20
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME alltypesorc))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL ctinyint)) (TOK_SELEXPR (TOK_FUNCTION avg (+ (TOK_TABLE_OR_COL cdouble) 1)))) (TOK_GROUPBY (TOK_TABLE_OR_COL ctinyint)) (TOK_ORDERBY (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL ctinyint))) (TOK_LIMIT 20)))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ alltypesorc
+ TableScan
+ alias: alltypesorc
+ Select Operator
+ expressions:
+ expr: ctinyint
+ type: tinyint
+ expr: cdouble
+ type: double
+ outputColumnNames: ctinyint, cdouble
+ Vectorized execution: true
+ Group By Operator
+ aggregations:
+ expr: avg((cdouble + 1))
+ bucketGroup: false
+ keys:
+ expr: ctinyint
+ type: tinyint
+ mode: hash
+ outputColumnNames: _col0, _col1
+ Vectorized execution: true
+ Reduce Output Operator
+ key expressions:
+ expr: _col0
+ type: tinyint
+ sort order: +
+ Map-reduce partition columns:
+ expr: _col0
+ type: tinyint
+ tag: -1
+ TopN: 20
+ TopN Hash Memory Usage: 0.3
+ value expressions:
+ expr: _col1
+ type: struct<count:bigint,sum:double>
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations:
+ expr: avg(VALUE._col0)
+ bucketGroup: false
+ keys:
+ expr: KEY._col0
+ type: tinyint
+ mode: mergepartial
+ outputColumnNames: _col0, _col1
+ Select Operator
+ expressions:
+ expr: _col0
+ type: tinyint
+ expr: _col1
+ type: double
+ outputColumnNames: _col0, _col1
+ Limit
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: 20
+
+
+PREHOOK: query: select ctinyint,avg(cdouble + 1) from alltypesorc group by ctinyint order by ctinyint limit 20
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+POSTHOOK: query: select ctinyint,avg(cdouble + 1) from alltypesorc group by ctinyint order by ctinyint limit 20
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+NULL 9370.0945309795
+-64 373.52941176470586
+-63 2178.7272727272725
+-62 245.69387755102042
+-61 914.3404255319149
+-60 1071.82
+-59 318.27272727272725
+-58 3483.2444444444445
+-57 1867.0535714285713
+-56 2595.818181818182
+-55 2385.595744680851
+-54 2712.7272727272725
+-53 -532.7567567567568
+-52 2810.705882352941
+-51 -96.46341463414635
+-50 -960.0192307692307
+-49 768.7659574468086
+-48 1672.909090909091
+-47 -574.6428571428571
+-46 3033.55
+PREHOOK: query: -- distincts
+explain
+select distinct(ctinyint) from alltypesorc limit 20
+PREHOOK: type: QUERY
+POSTHOOK: query: -- distincts
+explain
+select distinct(ctinyint) from alltypesorc limit 20
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME alltypesorc))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECTDI (TOK_SELEXPR (TOK_TABLE_OR_COL ctinyint))) (TOK_LIMIT 20)))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ alltypesorc
+ TableScan
+ alias: alltypesorc
+ Select Operator
+ expressions:
+ expr: ctinyint
+ type: tinyint
+ outputColumnNames: ctinyint
+ Vectorized execution: true
+ Group By Operator
+ bucketGroup: false
+ keys:
+ expr: ctinyint
+ type: tinyint
+ mode: hash
+ outputColumnNames: _col0
+ Vectorized execution: true
+ Reduce Output Operator
+ key expressions:
+ expr: _col0
+ type: tinyint
+ sort order: +
+ Map-reduce partition columns:
+ expr: _col0
+ type: tinyint
+ tag: -1
+ TopN: 20
+ TopN Hash Memory Usage: 0.3
+ Reduce Operator Tree:
+ Group By Operator
+ bucketGroup: false
+ keys:
+ expr: KEY._col0
+ type: tinyint
+ mode: mergepartial
+ outputColumnNames: _col0
+ Select Operator
+ expressions:
+ expr: _col0
+ type: tinyint
+ outputColumnNames: _col0
+ Limit
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: 20
+
+
+PREHOOK: query: select distinct(ctinyint) from alltypesorc limit 20
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+POSTHOOK: query: select distinct(ctinyint) from alltypesorc limit 20
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+NULL
+-64
+-63
+-62
+-61
+-60
+-59
+-58
+-57
+-56
+-55
+-54
+-53
+-52
+-51
+-50
+-49
+-48
+-47
+-46
+PREHOOK: query: explain
+select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME alltypesorc))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL ctinyint)) (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL cdouble)))) (TOK_GROUPBY (TOK_TABLE_OR_COL ctinyint)) (TOK_LIMIT 20)))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ alltypesorc
+ TableScan
+ alias: alltypesorc
+ Select Operator
+ expressions:
+ expr: ctinyint
+ type: tinyint
+ expr: cdouble
+ type: double
+ outputColumnNames: ctinyint, cdouble
+ Vectorized execution: true
+ Group By Operator
+ aggregations:
+ expr: count(DISTINCT cdouble)
+ bucketGroup: false
+ keys:
+ expr: ctinyint
+ type: tinyint
+ expr: cdouble
+ type: double
+ mode: hash
+ outputColumnNames: _col0, _col1, _col2
+ Vectorized execution: true
+ Reduce Output Operator
+ key expressions:
+ expr: _col0
+ type: tinyint
+ expr: _col1
+ type: double
+ sort order: ++
+ Map-reduce partition columns:
+ expr: _col0
+ type: tinyint
+ tag: -1
+ TopN: 20
+ TopN Hash Memory Usage: 0.3
+ value expressions:
+ expr: _col2
+ type: bigint
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations:
+ expr: count(DISTINCT KEY._col1:0._col0)
+ bucketGroup: false
+ keys:
+ expr: KEY._col0
+ type: tinyint
+ mode: mergepartial
+ outputColumnNames: _col0, _col1
+ Select Operator
+ expressions:
+ expr: _col0
+ type: tinyint
+ expr: _col1
+ type: bigint
+ outputColumnNames: _col0, _col1
+ Limit
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: 20
+
+
+PREHOOK: query: select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+POSTHOOK: query: select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+NULL 19
+PREHOOK: query: -- limit zero
+explain
+select ctinyint,cdouble from alltypesorc order by ctinyint limit 0
+PREHOOK: type: QUERY
+POSTHOOK: query: -- limit zero
+explain
+select ctinyint,cdouble from alltypesorc order by ctinyint limit 0
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME alltypesorc))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL ctinyint)) (TOK_SELEXPR (TOK_TABLE_OR_COL cdouble))) (TOK_ORDERBY (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL ctinyint))) (TOK_LIMIT 0)))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ alltypesorc
+ TableScan
+ alias: alltypesorc
+ Select Operator
+ expressions:
+ expr: ctinyint
+ type: tinyint
+ expr: cdouble
+ type: double
+ outputColumnNames: _col0, _col1
+ Vectorized execution: true
+ Reduce Output Operator
+ key expressions:
+ expr: _col0
+ type: tinyint
+ sort order: +
+ tag: -1
+ value expressions:
+ expr: _col0
+ type: tinyint
+ expr: _col1
+ type: double
+ Vectorized execution: true
+ Reduce Operator Tree:
+ Extract
+ Limit
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: 0
+
+
+PREHOOK: query: select ctinyint,cdouble from alltypesorc order by ctinyint limit 0
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+POSTHOOK: query: select ctinyint,cdouble from alltypesorc order by ctinyint limit 0
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+PREHOOK: query: -- 2MR (applied to last RS)
+explain
+select cdouble, sum(ctinyint) as sum from alltypesorc where ctinyint is not null group by cdouble order by sum, cdouble limit 20
+PREHOOK: type: QUERY
+POSTHOOK: query: -- 2MR (applied to last RS)
+explain
+select cdouble, sum(ctinyint) as sum from alltypesorc where ctinyint is not null group by cdouble order by sum, cdouble limit 20
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME alltypesorc))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL cdouble)) (TOK_SELEXPR (TOK_FUNCTION sum (TOK_TABLE_OR_COL ctinyint)) sum)) (TOK_WHERE (TOK_FUNCTION TOK_ISNOTNULL (TOK_TABLE_OR_COL ctinyint))) (TOK_GROUPBY (TOK_TABLE_OR_COL cdouble)) (TOK_ORDERBY (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL sum)) (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL cdouble))) (TOK_LIMIT 20)))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-2 depends on stages: Stage-1
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ alltypesorc
+ TableScan
+ alias: alltypesorc
+ Filter Operator
+ predicate:
+ expr: ctinyint is not null
+ type: boolean
+ Vectorized execution: true
+ Select Operator
+ expressions:
+ expr: cdouble
+ type: double
+ expr: ctinyint
+ type: tinyint
+ outputColumnNames: cdouble, ctinyint
+ Vectorized execution: true
+ Group By Operator
+ aggregations:
+ expr: sum(ctinyint)
+ bucketGroup: false
+ keys:
+ expr: cdouble
+ type: double
+ mode: hash
+ outputColumnNames: _col0, _col1
+ Vectorized execution: true
+ Reduce Output Operator
+ key expressions:
+ expr: _col0
+ type: double
+ sort order: +
+ Map-reduce partition columns:
+ expr: _col0
+ type: double
+ tag: -1
+ value expressions:
+ expr: _col1
+ type: bigint
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations:
+ expr: sum(VALUE._col0)
+ bucketGroup: false
+ keys:
+ expr: KEY._col0
+ type: double
+ mode: mergepartial
+ outputColumnNames: _col0, _col1
+ Select Operator
+ expressions:
+ expr: _col0
+ type: double
+ expr: _col1
+ type: bigint
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+ Stage: Stage-2
+ Map Reduce
+ Alias -> Map Operator Tree:
+#### A masked pattern was here ####
+ TableScan
+ Reduce Output Operator
+ key expressions:
+ expr: _col1
+ type: bigint
+ expr: _col0
+ type: double
+ sort order: ++
+ tag: -1
+ TopN: 20
+ TopN Hash Memory Usage: 0.3
+ value expressions:
+ expr: _col0
+ type: double
+ expr: _col1
+ type: bigint
+ Reduce Operator Tree:
+ Extract
+ Limit
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: 20
+
+
+PREHOOK: query: select cdouble, sum(ctinyint) as sum from alltypesorc where ctinyint is not null group by cdouble order by sum, cdouble limit 20
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+POSTHOOK: query: select cdouble, sum(ctinyint) as sum from alltypesorc where ctinyint is not null group by cdouble order by sum, cdouble limit 20
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+NULL -32768
+-7196.0 -2009
+15601.0 -1733
+4811.0 -115
+-11322.0 -101
+-1121.0 -89
+7705.0 -88
+3520.0 -86
+-8118.0 -80
+5241.0 -80
+-11492.0 -78
+9452.0 -76
+557.0 -75
+10496.0 -67
+-15920.0 -64
+-10462.0 -64
+-9842.0 -64
+-8080.0 -64
+-6907.0 -64
+-4803.0 -64