You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/11/09 16:22:35 UTC
svn commit: r1540328 - in /hive/trunk/ql/src:
java/org/apache/hadoop/hive/ql/exec/
java/org/apache/hadoop/hive/ql/exec/vector/
java/org/apache/hadoop/hive/ql/io/
java/org/apache/hadoop/hive/ql/optimizer/ test/queries/clientpositive/
test/results/client...
Author: hashutosh
Date: Sat Nov 9 15:22:35 2013
New Revision: 1540328
URL: http://svn.apache.org/r1540328
Log:
HIVE-5657 : TopN produces incorrect results with count(distinct) (Sergey Shelukhin via Ashutosh Chauhan)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java
hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown.q
hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown_negative.q
hive/trunk/ql/src/test/results/clientpositive/limit_pushdown.q.out
hive/trunk/ql/src/test/results/clientpositive/limit_pushdown_negative.q.out
hive/trunk/ql/src/test/results/clientpositive/vectorization_limit.q.out
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1540328&r1=1540327&r2=1540328&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Sat Nov 9 15:22:35 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -46,6 +47,7 @@ import org.apache.hadoop.io.BinaryCompar
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.Text;
+// import org.apache.hadoop.util.StringUtils;
/**
* Reduce Sink Operator sends output to the reduce stage.
@@ -153,8 +155,8 @@ public class ReduceSinkOperator extends
transient InspectableObject tempInspectableObject = new InspectableObject();
protected transient HiveKey keyWritable = new HiveKey();
- transient StructObjectInspector keyObjectInspector;
- transient StructObjectInspector valueObjectInspector;
+ protected transient ObjectInspector keyObjectInspector;
+ protected transient ObjectInspector valueObjectInspector;
transient ObjectInspector[] partitionObjectInspectors;
protected transient Object[] cachedValues;
@@ -173,6 +175,7 @@ public class ReduceSinkOperator extends
* in this case, child GBY evaluates distict values with expression like KEY.col2:0.dist1
* see {@link ExprNodeColumnEvaluator}
*/
+ // TODO: we only ever use one row of these at a time. Why do we need to cache multiple?
protected transient Object[][] cachedKeys;
boolean firstRow;
protected transient Random random;
@@ -237,51 +240,41 @@ public class ReduceSinkOperator extends
.getOutputValueColumnNames(), rowInspector);
partitionObjectInspectors = initEvaluators(partitionEval, rowInspector);
int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1;
- int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 :
- numDistributionKeys;
+ int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 : numDistributionKeys;
cachedKeys = new Object[numKeys][keyLen];
cachedValues = new Object[valueEval.length];
}
- // Evaluate the keys
- for (int i = 0; i < numDistributionKeys; i++) {
- cachedKeys[0][i] = keyEval[i].evaluate(row);
- }
+ // Determine distKeyLength (w/o distincts), and then add the first if present.
+ populateCachedDistributionKeys(row, 0);
+ HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null);
+ int distKeyLength = firstKey.getDistKeyLength();
if (numDistinctExprs > 0) {
- // with distinct key(s)
- for (int i = 0; i < numDistinctExprs; i++) {
- if (i > 0) {
- System.arraycopy(cachedKeys[0], 0, cachedKeys[i], 0, numDistributionKeys);
- }
- StandardUnion union = (StandardUnion) cachedKeys[i][numDistributionKeys];
- if (union == null) {
- cachedKeys[i][numDistributionKeys] =
- union = new StandardUnion((byte)i, new Object[distinctColIndices.get(i).size()]);
- }
- Object[] distinctParameters = (Object[]) union.getObject();
- for (int j = 0; j < distinctParameters.length; j++) {
- distinctParameters[j] =
- keyEval[distinctColIndices.get(i).get(j)].evaluate(row);
- }
- union.setTag((byte) i);
- }
+ populateCachedDistinctKeys(row, 0);
+ firstKey = toHiveKey(cachedKeys[0], tag, distKeyLength);
}
- for (int i = 0; i < cachedKeys.length; i++) {
- // Serialize the keys and append the tag
- Object keyObj = keySerializer.serialize(cachedKeys[i], keyObjectInspector);
- setKeyWritable(keyIsText ? (Text)keyObj : (BytesWritable)keyObj, tag);
- int topNIndex = reducerHash.tryStoreKey(keyWritable);
- if (TopNHash.EXCLUDED == topNIndex) continue;
- int keyHashCode = computeHashCode(row);
- BytesWritable valueWritable = getValue(row);
- if (TopNHash.FORWARD == topNIndex) {
- keyWritable.setHashCode(keyHashCode);
- collect(keyWritable, valueWritable);
- continue;
- }
- assert topNIndex >= 0;
- reducerHash.storeValue(topNIndex, valueWritable, keyHashCode, false);
+ // Try to store the first key. If it's not excluded, we will proceed.
+ int firstIndex = reducerHash.tryStoreKey(firstKey);
+ if (firstIndex == TopNHash.EXCLUDE) return; // Nothing to do.
+ // Compute value and hashcode - we'd either store or forward them.
+ BytesWritable value = makeValueWritable(row);
+ int hashCode = computeHashCode(row);
+ if (firstIndex == TopNHash.FORWARD) {
+ firstKey.setHashCode(hashCode);
+ collect(firstKey, value);
+ } else {
+ assert firstIndex >= 0;
+ reducerHash.storeValue(firstIndex, value, hashCode, false);
+ }
+
+ // All other distinct keys will just be forwarded. This could be optimized...
+ for (int i = 1; i < numDistinctExprs; i++) {
+ System.arraycopy(cachedKeys[0], 0, cachedKeys[i], 0, numDistributionKeys);
+ populateCachedDistinctKeys(row, i);
+ HiveKey hiveKey = toHiveKey(cachedKeys[i], tag, distKeyLength);
+ hiveKey.setHashCode(hashCode);
+ collect(hiveKey, value);
}
} catch (HiveException e) {
throw e;
@@ -290,14 +283,38 @@ public class ReduceSinkOperator extends
}
}
+ private void populateCachedDistributionKeys(Object row, int index) throws HiveException {
+ for (int i = 0; i < numDistributionKeys; i++) {
+ cachedKeys[index][i] = keyEval[i].evaluate(row);
+ }
+ if (cachedKeys[0].length > numDistributionKeys) {
+ cachedKeys[index][numDistributionKeys] = null;
+ }
+ }
+
+ /**
+ * Populate distinct keys part of cachedKeys for a particular row.
+ * @param row the row
+ * @param index the cachedKeys index to write to
+ */
+ private void populateCachedDistinctKeys(Object row, int index) throws HiveException {
+ StandardUnion union;
+ cachedKeys[index][numDistributionKeys] = union = new StandardUnion(
+ (byte)index, new Object[distinctColIndices.get(index).size()]);
+ Object[] distinctParameters = (Object[]) union.getObject();
+ for (int distinctParamI = 0; distinctParamI < distinctParameters.length; distinctParamI++) {
+ distinctParameters[distinctParamI] =
+ keyEval[distinctColIndices.get(index).get(distinctParamI)].evaluate(row);
+ }
+ union.setTag((byte) index);
+ }
+
private int computeHashCode(Object row) throws HiveException {
// Evaluate the HashCode
int keyHashCode = 0;
if (partitionEval.length == 0) {
- // If no partition cols, just distribute the data uniformly to provide
- // better
- // load balance. If the requirement is to have a single reducer, we
- // should set
+ // If no partition cols, just distribute the data uniformly to provide better
+ // load balance. If the requirement is to have a single reducer, we should set
// the number of reducers to 1.
// Use a constant seed to make the code deterministic.
if (random == null) {
@@ -314,15 +331,19 @@ public class ReduceSinkOperator extends
return keyHashCode;
}
- protected void setKeyWritable(BinaryComparable key, int tag) {
+ // Serialize the keys and append the tag
+ protected HiveKey toHiveKey(Object obj, int tag, Integer distLength) throws SerDeException {
+ BinaryComparable key = (BinaryComparable)keySerializer.serialize(obj, keyObjectInspector);
+ int keyLength = key.getLength();
if (tag == -1) {
- keyWritable.set(key.getBytes(), 0, key.getLength());
+ keyWritable.set(key.getBytes(), 0, keyLength);
} else {
- int keyLength = key.getLength();
keyWritable.setSize(keyLength + 1);
System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
keyWritable.get()[keyLength] = tagByte[0];
}
+ keyWritable.setDistKeyLength((distLength == null) ? keyLength : distLength);
+ return keyWritable;
}
public void collect(byte[] key, byte[] value, int hash) throws IOException {
@@ -331,11 +352,6 @@ public class ReduceSinkOperator extends
collect(keyWritable, valueWritable);
}
- protected void collect(byte[] key, Writable valueWritable, int hash) throws IOException {
- HiveKey keyWritable = new HiveKey(key, hash);
- collect(keyWritable, valueWritable);
- }
-
protected void collect(BytesWritable keyWritable, Writable valueWritable) throws IOException {
// Since this is a terminal operator, update counters explicitly -
// forward is not called
@@ -351,7 +367,7 @@ public class ReduceSinkOperator extends
}
}
- private BytesWritable getValue(Object row) throws Exception {
+ private BytesWritable makeValueWritable(Object row) throws Exception {
// Evaluate the value
for (int i = 0; i < valueEval.length; i++) {
cachedValues[i] = valueEval[i].evaluate(row);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java?rev=1540328&r1=1540327&r2=1540328&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java Sat Nov 9 15:22:35 2013
@@ -31,6 +31,8 @@ import com.google.common.collect.MinMaxP
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.io.BinaryComparable;
import org.apache.hadoop.io.BytesWritable;
@@ -51,11 +53,9 @@ public class TopNHash {
public void collect(byte[] key, byte[] value, int hash) throws IOException;
}
- public static final int FORWARD = -1;
- public static final int EXCLUDED = -2;
- private static final int FLUSH = -3;
- private static final int DISABLE = -4;
- private static final int MAY_FORWARD = -5;
+ public static final int FORWARD = -1; // Forward the row to reducer as is.
+ public static final int EXCLUDE = -2; // Discard the row.
+ private static final int MAY_FORWARD = -3; // Vectorized - may forward the row, not sure yet.
private BinaryCollector collector;
private int topN;
@@ -67,14 +67,17 @@ public class TopNHash {
private byte[][] keys;
private byte[][] values;
private int[] hashes;
+ private int[] distKeyLengths;
private IndexStore indexes; // The heap over the keys, storing indexes in the array.
private int evicted; // recently evicted index (used for next key/value)
private int excluded; // count of excluded rows from previous flush
- // temporary stuff used for vectorization
+ // temporary single-batch context used for vectorization
private int batchNumForwards = 0; // whether current batch has any forwarded keys
private int[] indexToBatchIndex; // mapping of index (lined up w/keys) to index in the batch
+ private int[] batchIndexToResult; // mapping of index in the batch (linear) to hash result
+ private int batchSize; // Size of the current batch.
private boolean isEnabled = false;
@@ -82,7 +85,9 @@ public class TopNHash {
public int compare(Integer o1, Integer o2) {
byte[] key1 = keys[o1];
byte[] key2 = keys[o2];
- return WritableComparator.compareBytes(key1, 0, key1.length, key2, 0, key2.length);
+ int length1 = distKeyLengths[o1];
+ int length2 = distKeyLengths[o2];
+ return WritableComparator.compareBytes(key1, 0, length1, key2, 0, length2);
}
};
@@ -107,6 +112,7 @@ public class TopNHash {
this.keys = new byte[topN + 1][];
this.values = new byte[topN + 1][];
this.hashes = new int[topN + 1];
+ this.distKeyLengths = new int[topN + 1];
this.evicted = topN;
this.isEnabled = true;
}
@@ -118,12 +124,12 @@ public class TopNHash {
* TopNHash.EXCLUDED if the row should be discarded;
* any other number if the row is to be stored; the index should be passed to storeValue.
*/
- public int tryStoreKey(BytesWritable key) throws HiveException, IOException {
+ public int tryStoreKey(HiveKey key) throws HiveException, IOException {
if (!isEnabled) {
return FORWARD; // short-circuit quickly - forward all rows
}
if (topN == 0) {
- return EXCLUDED; // short-circuit quickly - eat all rows
+ return EXCLUDE; // short-circuit quickly - eat all rows
}
int index = insertKeyIntoHeap(key);
if (index >= 0) {
@@ -132,21 +138,8 @@ public class TopNHash {
}
// IndexStore is trying to tell us something.
switch (index) {
- case DISABLE: {
- LOG.info("Top-N hash is disabled");
- flushInternal();
- isEnabled = false;
- return FORWARD;
- }
- case FLUSH: {
- LOG.info("Top-N hash is flushed");
- flushInternal();
- // we can now retry adding key/value into hash, which is flushed.
- // but for simplicity, just forward them
- return FORWARD;
- }
case FORWARD: return FORWARD;
- case EXCLUDED: return EXCLUDED; // skip the row.
+ case EXCLUDE: return EXCLUDE; // skip the row.
default: {
assert false;
throw new HiveException("Invalid result trying to store the key: " + index);
@@ -157,15 +150,16 @@ public class TopNHash {
/**
* Perform basic checks and initialize TopNHash for the new vectorized row batch.
+ * @param size batch size
* @return TopNHash.FORWARD if all rows should be forwarded w/o trying to call TopN;
* TopNHash.EXCLUDED if all rows should be discarded w/o trying to call TopN;
* any other result means the batch has been started.
*/
- public int startVectorizedBatch() throws IOException, HiveException {
+ public int startVectorizedBatch(int size) throws IOException, HiveException {
if (!isEnabled) {
return FORWARD; // short-circuit quickly - forward all rows
} else if (topN == 0) {
- return EXCLUDED; // short-circuit quickly - eat all rows
+ return EXCLUDE; // short-circuit quickly - eat all rows
}
// Flush here if the memory usage is too high. After that, we have the entire
// batch already in memory anyway so we will bypass the memory checks.
@@ -179,8 +173,13 @@ public class TopNHash {
return FORWARD; // Hash is ineffective, disable.
}
}
+ // Started ok; initialize context for new batch.
+ batchSize = size;
+ if (batchIndexToResult == null || batchIndexToResult.length < batchSize) {
+ batchIndexToResult = new int[Math.max(batchSize, VectorizedRowBatch.DEFAULT_SIZE)];
+ }
if (indexToBatchIndex == null) {
- indexToBatchIndex = new int[topN + 1]; // for current batch, contains key index in the batch
+ indexToBatchIndex = new int[topN + 1];
}
Arrays.fill(indexToBatchIndex, -1);
batchNumForwards = 0;
@@ -191,33 +190,28 @@ public class TopNHash {
* Try to put the key from the current vectorized batch into the heap.
* @param key the key.
* @param batchIndex The index of the key in the vectorized batch (sequential, not .selected).
- * @param results The results; the number of elements equivalent to vrg.size, by kindex.
- * The result should be the same across the calls for the batch; in then end, for each k-index:
- * - TopNHash.EXCLUDED - discard the row.
- * - positive index - store the row using storeValue, same as tryStoreRow.
- * - negative index - forward the row. getVectorizedKeyToForward called w/this index will
- * return the key to use so it doesn't have to be rebuilt.
*/
- public void tryStoreVectorizedKey(BytesWritable key, int batchIndex, int[] results)
- throws HiveException, IOException {
+ public void tryStoreVectorizedKey(HiveKey key, int batchIndex)
+ throws HiveException, IOException {
// Assumption - batchIndex is increasing; startVectorizedBatch was called
int size = indexes.size();
int index = size < topN ? size : evicted;
keys[index] = Arrays.copyOf(key.getBytes(), key.getLength());
+ distKeyLengths[index] = key.getDistKeyLength();
Integer collisionIndex = indexes.store(index);
if (null != collisionIndex) {
// forward conditional on the survival of the corresponding key currently in indexes.
++batchNumForwards;
- results[batchIndex] = MAY_FORWARD - collisionIndex;
+ batchIndexToResult[batchIndex] = MAY_FORWARD - collisionIndex;
return;
}
indexToBatchIndex[index] = batchIndex;
- results[batchIndex] = index;
+ batchIndexToResult[batchIndex] = index;
if (size != topN) return;
evicted = indexes.removeBiggest(); // remove the biggest key
if (index == evicted) {
excluded++;
- results[batchIndex] = EXCLUDED;
+ batchIndexToResult[batchIndex] = EXCLUDE;
indexToBatchIndex[index] = -1;
return; // input key is bigger than any of keys in hash
}
@@ -225,36 +219,54 @@ public class TopNHash {
int evictedBatchIndex = indexToBatchIndex[evicted];
if (evictedBatchIndex >= 0) {
// reset the result for the evicted index
- results[evictedBatchIndex] = EXCLUDED;
+ batchIndexToResult[evictedBatchIndex] = EXCLUDE;
indexToBatchIndex[evicted] = -1;
}
- // Also evict all results grouped with this index; cannot be current key or before it.
- if (batchNumForwards > 0) {
- int evictedForward = (MAY_FORWARD - evicted);
- boolean forwardRemoved = false;
- for (int i = evictedBatchIndex + 1; i < batchIndex; ++i) {
- if (results[i] == evictedForward) {
- results[i] = EXCLUDED;
- forwardRemoved = true;
- }
- }
- if (forwardRemoved) {
+ // Evict all results grouped with this index; it cannot be any key further in the batch.
+ // If we evict a key from this batch, the keys grouped with it cannot be earlier that that key.
+ // If we evict a key that is not from this batch, initial i = (-1) + 1 = 0, as intended.
+ int evictedForward = (MAY_FORWARD - evicted);
+ for (int i = evictedBatchIndex + 1; i < batchIndex && (batchNumForwards > 0); ++i) {
+ if (batchIndexToResult[i] == evictedForward) {
+ batchIndexToResult[i] = EXCLUDE;
--batchNumForwards;
}
}
}
/**
+ * Get vectorized batch result for particular index.
+ * @param batchIndex index of the key in the batch.
+ * @return the result, same as from {@link #tryStoreKey(HiveKey)}
+ */
+ public int getVectorizedBatchResult(int batchIndex) {
+ int result = batchIndexToResult[batchIndex];
+ return (result <= MAY_FORWARD) ? FORWARD : result;
+ }
+
+ /**
* After vectorized batch is processed, can return the key that caused a particular row
* to be forwarded. Because the row could only be marked to forward because it has
* the same key with some row already in the heap (for GBY), we can use that key from the
* heap to emit the forwarded row.
- * @param index Negative index from the vectorized result. See tryStoreVectorizedKey.
- * @return The key corresponding to the row.
+ * @param batchIndex index of the key in the batch.
+ * @return The key corresponding to the index.
*/
- public byte[] getVectorizedKeyToForward(int index) {
- assert index <= MAY_FORWARD;
- return keys[MAY_FORWARD - index];
+ public HiveKey getVectorizedKeyToForward(int batchIndex) {
+ int index = MAY_FORWARD - batchIndexToResult[batchIndex];
+ HiveKey hk = new HiveKey();
+ hk.set(keys[index], 0, keys[index].length);
+ hk.setDistKeyLength(distKeyLengths[index]);
+ return hk;
+ }
+
+ /**
+ * After vectorized batch is processed, can return distribution keys length of a key.
+ * @param batchIndex index of the key in the batch.
+ * @return The distribution length corresponding to the key.
+ */
+ public int getVectorizedKeyDistLength(int batchIndex) {
+ return distKeyLengths[batchIndexToResult[batchIndex]];
}
/**
@@ -289,16 +301,22 @@ public class TopNHash {
* <p/>
* -1 for FORWARD : should be forwarded to output collector (for GBY)
* -2 for EXCLUDED : not in top-k. ignore it
- * -3 for FLUSH : memory is not enough. flush values (keep keys only)
- * -4 for DISABLE : hash is not effective. flush and disable it
*/
- private int insertKeyIntoHeap(BinaryComparable key) {
+ private int insertKeyIntoHeap(HiveKey key) throws IOException, HiveException {
if (usage > threshold) {
- return excluded == 0 ? DISABLE : FLUSH;
+ flushInternal();
+ if (excluded == 0) {
+ LOG.info("Top-N hash is disabled");
+ isEnabled = false;
+ }
+ // we can now retry adding key/value into hash, which is flushed.
+ // but for simplicity, just forward them
+ return FORWARD;
}
int size = indexes.size();
int index = size < topN ? size : evicted;
keys[index] = Arrays.copyOf(key.getBytes(), key.getLength());
+ distKeyLengths[index] = key.getDistKeyLength();
if (null != indexes.store(index)) {
// it's only for GBY which should forward all values associated with the key in the range
// of limit. new value should be attatched with the key but in current implementation,
@@ -310,7 +328,7 @@ public class TopNHash {
evicted = indexes.removeBiggest(); // remove the biggest key
if (index == evicted) {
excluded++;
- return EXCLUDED; // input key is bigger than any of keys in hash
+ return EXCLUDE; // input key is bigger than any of keys in hash
}
removed(evicted);
}
@@ -326,6 +344,7 @@ public class TopNHash {
values[index] = null;
}
hashes[index] = -1;
+ distKeyLengths[index] = -1;
}
private void flushInternal() throws IOException, HiveException {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java?rev=1540328&r1=1540327&r2=1540328&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java Sat Nov 9 15:22:35 2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.exec.To
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
+import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
@@ -39,9 +40,11 @@ import org.apache.hadoop.hive.serde2.Ser
import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+// import org.apache.hadoop.util.StringUtils;
public class VectorReduceSinkOperator extends ReduceSinkOperator {
@@ -87,12 +90,6 @@ public class VectorReduceSinkOperator ex
*/
private transient VectorExpressionWriter[] partitionWriters;
- private transient ObjectInspector keyObjectInspector;
- private transient ObjectInspector valueObjectInspector;
- private transient int [] keyHashCode = new int [VectorizedRowBatch.DEFAULT_SIZE];
-
- private transient int[] hashResult; // the pre-created array for reducerHash results
-
public VectorReduceSinkOperator(VectorizationContext vContext, OperatorDesc conf)
throws HiveException {
this();
@@ -110,7 +107,6 @@ public class VectorReduceSinkOperator ex
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
try {
-
numDistributionKeys = conf.getNumDistributionKeys();
distinctColIndices = conf.getDistinctColumnIndices();
numDistinctExprs = distinctColIndices.size();
@@ -183,7 +179,7 @@ public class VectorReduceSinkOperator ex
numDistributionKeys;
cachedKeys = new Object[numKeys][keyLen];
cachedValues = new Object[valueEval.length];
-
+
int tag = conf.getTag();
tagByte[0] = (byte) tag;
LOG.info("Using tag = " + tag);
@@ -209,81 +205,84 @@ public class VectorReduceSinkOperator ex
partitionEval.length));
try {
-
- for (int i = 0; i < partitionEval.length; i++) {
- partitionEval[i].evaluate(vrg);
- }
-
- // run the vector evaluations
- for (int i = 0; i < valueEval.length; i++) {
- valueEval[i].evaluate(vrg);
- }
// Evaluate the keys
for (int i = 0; i < keyEval.length; i++) {
keyEval[i].evaluate(vrg);
}
- Object[] distributionKeys = new Object[numDistributionKeys];
-
// Determine which rows we need to emit based on topN optimization
- int startResult = reducerHash.startVectorizedBatch();
- if (startResult == TopNHash.EXCLUDED) {
+ int startResult = reducerHash.startVectorizedBatch(vrg.size);
+ if (startResult == TopNHash.EXCLUDE) {
return; // TopN wants us to exclude all rows.
}
- boolean useTopN = startResult != TopNHash.FORWARD;
- if (useTopN && (hashResult == null || hashResult.length < vrg.size)) {
- hashResult = new int[Math.max(vrg.size, VectorizedRowBatch.DEFAULT_SIZE)];
+ // TODO: can we do this later/only for the keys that are needed? E.g. update vrg.selected.
+ for (int i = 0; i < partitionEval.length; i++) {
+ partitionEval[i].evaluate(vrg);
+ }
+ // run the vector evaluations
+ for (int i = 0; i < valueEval.length; i++) {
+ valueEval[i].evaluate(vrg);
}
- for (int j = 0 ; j < vrg.size; ++j) {
- int rowIndex = j;
+ boolean useTopN = startResult != TopNHash.FORWARD;
+ // Go thru the batch once. If we are not using TopN, we will forward all things and be done.
+ // If we are using topN, we will make the first key for each row and store/forward it.
+ // Values, hashes and additional distinct rows will be handled in the 2nd pass in that case.
+ for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) {
+ int rowIndex = batchIndex;
if (vrg.selectedInUse) {
- rowIndex = vrg.selected[j];
+ rowIndex = vrg.selected[batchIndex];
}
- // First, evaluate the key - the way things stand we'd need it regardless.
- for (int i = 0; i < keyEval.length; i++) {
- int batchColumn = keyEval[i].getOutputColumn();
- ColumnVector vectorColumn = vrg.cols[batchColumn];
- distributionKeys[i] = keyWriters[i].writeValue(vectorColumn, rowIndex);
+ // First, make distrib key components for this row and determine distKeyLength.
+ populatedCachedDistributionKeys(vrg, rowIndex, 0);
+ HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null);
+ int distKeyLength = firstKey.getDistKeyLength();
+ // Add first distinct expression, if any.
+ if (numDistinctExprs > 0) {
+ populateCachedDistinctKeys(vrg, rowIndex, 0);
+ firstKey = toHiveKey(cachedKeys[0], tag, distKeyLength);
}
- // no distinct key
- System.arraycopy(distributionKeys, 0, cachedKeys[0], 0, numDistributionKeys);
- // TopN is not supported for multi-distinct currently. If we have more cachedKeys
- // than one for every input key horrible things will happen (OOB error on array likely).
- assert !useTopN || cachedKeys.length <= 1;
- for (int i = 0; i < cachedKeys.length; i++) {
- // Serialize the keys and append the tag.
- Object keyObj = keySerializer.serialize(cachedKeys[i], keyObjectInspector);
- setKeyWritable(keyIsText ? (Text)keyObj : (BytesWritable)keyObj, tag);
- if (useTopN) {
- reducerHash.tryStoreVectorizedKey(keyWritable, j, hashResult);
- } else {
- // No TopN, just forward the key
- keyWritable.setHashCode(computeHashCode(vrg, rowIndex));
- collect(keyWritable, makeValueWritable(vrg, rowIndex));
- }
+
+ if (useTopN) {
+ reducerHash.tryStoreVectorizedKey(firstKey, batchIndex);
+ } else {
+ // No TopN, just forward the first key and all others.
+ int hashCode = computeHashCode(vrg, rowIndex);
+ firstKey.setHashCode(hashCode);
+ BytesWritable value = makeValueWritable(vrg, rowIndex);
+ collect(firstKey, value);
+ forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 0);
}
}
if (!useTopN) return; // All done.
// If we use topN, we have called tryStore on every key now. We can process the results.
- for (int j = 0 ; j < vrg.size; ++j) {
- int index = hashResult[j];
- if (index == TopNHash.EXCLUDED) continue;
- int rowIndex = j;
+ for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) {
+ int result = reducerHash.getVectorizedBatchResult(batchIndex);
+ if (result == TopNHash.EXCLUDE) continue;
+ int rowIndex = batchIndex;
if (vrg.selectedInUse) {
- rowIndex = vrg.selected[j];
+ rowIndex = vrg.selected[batchIndex];
}
- // Compute everything now - we'd either store it, or forward it.
+ // Compute value and hashcode - we'd either store or forward them.
int hashCode = computeHashCode(vrg, rowIndex);
BytesWritable value = makeValueWritable(vrg, rowIndex);
- if (index < 0) {
- // Kinda hacky; see getVectorizedKeyToForward javadoc.
- byte[] key = reducerHash.getVectorizedKeyToForward(index);
- collect(key, value, hashCode);
+ int distKeyLength = -1;
+ if (result == TopNHash.FORWARD) {
+ HiveKey firstKey = reducerHash.getVectorizedKeyToForward(batchIndex);
+ firstKey.setHashCode(hashCode);
+ distKeyLength = firstKey.getDistKeyLength();
+ collect(firstKey, value);
} else {
- reducerHash.storeValue(index, value, hashCode, true);
+ reducerHash.storeValue(result, value, hashCode, true);
+ distKeyLength = reducerHash.getVectorizedKeyDistLength(batchIndex);
+ }
+ // Now forward other the rows if there's multi-distinct (but see TODO in forward...).
+ // Unfortunately, that means we will have to rebuild the cachedKeys. Start at 1.
+ if (numDistinctExprs > 1) {
+ populatedCachedDistributionKeys(vrg, rowIndex, 1);
+ forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 1);
}
}
} catch (SerDeException e) {
@@ -293,6 +292,74 @@ public class VectorReduceSinkOperator ex
}
}
+ /**
+ * This function creates and forwards all the additional KVs for the multi-distinct case,
+ * after the first (0th) KV pertaining to the row has already been stored or forwarded.
+ * @param vrg the batch
+ * @param rowIndex the row index in the batch
+ * @param hashCode the partitioning hash code to use; same as for the first KV
+ * @param value the value to use; same as for the first KV
+ * @param distKeyLength the distribution key length of the first key; TODO probably extraneous
+ * @param tag the tag
+ * @param baseIndex the index in cachedKeys where the pre-evaluated distribution keys are stored
+ */
+ private void forwardExtraDistinctRows(VectorizedRowBatch vrg, int rowIndex,int hashCode,
+ BytesWritable value, int distKeyLength, int tag, int baseIndex)
+ throws HiveException, SerDeException, IOException {
+ // TODO: We don't have to forward extra distinct rows immediately (same in non-vector) if
+ // the first key has already been stored. There's few bytes difference between keys
+ // for different distincts, and the value/etc. are all the same.
+ // We could store deltas to re-gen extra rows when flushing TopN.
+ for (int i = 1; i < numDistinctExprs; i++) {
+ if (i != baseIndex) {
+ System.arraycopy(cachedKeys[baseIndex], 0, cachedKeys[i], 0, numDistributionKeys);
+ }
+ populateCachedDistinctKeys(vrg, rowIndex, i);
+ HiveKey hiveKey = toHiveKey(cachedKeys[i], tag, distKeyLength);
+ hiveKey.setHashCode(hashCode);
+ collect(hiveKey, value);
+ }
+ }
+
+ /**
+ * Populate distribution keys part of cachedKeys for a particular row from the batch.
+ * @param vrg the batch
+ * @param rowIndex the row index in the batch
+ * @param index the cachedKeys index to write to
+ */
+ private void populatedCachedDistributionKeys(
+ VectorizedRowBatch vrg, int rowIndex, int index) throws HiveException {
+ for (int i = 0; i < numDistributionKeys; i++) {
+ int batchColumn = keyEval[i].getOutputColumn();
+ ColumnVector vectorColumn = vrg.cols[batchColumn];
+ cachedKeys[index][i] = keyWriters[i].writeValue(vectorColumn, rowIndex);
+ }
+ if (cachedKeys[index].length > numDistributionKeys) {
+ cachedKeys[index][numDistributionKeys] = null;
+ }
+ }
+
+ /**
+ * Populate distinct keys part of cachedKeys for a particular row from the batch.
+ * @param vrg the batch
+ * @param rowIndex the row index in the batch
+ * @param index the cachedKeys index to write to
+ */
+ private void populateCachedDistinctKeys(
+ VectorizedRowBatch vrg, int rowIndex, int index) throws HiveException {
+ StandardUnion union;
+ cachedKeys[index][numDistributionKeys] = union = new StandardUnion(
+ (byte)index, new Object[distinctColIndices.get(index).size()]);
+ Object[] distinctParameters = (Object[]) union.getObject();
+ for (int distinctParamI = 0; distinctParamI < distinctParameters.length; distinctParamI++) {
+ int distinctColIndex = distinctColIndices.get(index).get(distinctParamI);
+ int batchColumn = keyEval[distinctColIndex].getOutputColumn();
+ distinctParameters[distinctParamI] =
+ keyWriters[distinctColIndex].writeValue(vrg.cols[batchColumn], rowIndex);
+ }
+ union.setTag((byte) index);
+ }
+
private BytesWritable makeValueWritable(VectorizedRowBatch vrg, int rowIndex)
throws HiveException, SerDeException {
for (int i = 0; i < valueEval.length; i++) {
@@ -308,10 +375,8 @@ public class VectorReduceSinkOperator ex
// Evaluate the HashCode
int keyHashCode = 0;
if (partitionEval.length == 0) {
- // If no partition cols, just distribute the data uniformly to provide
- // better
- // load balance. If the requirement is to have a single reducer, we
- // should set
+ // If no partition cols, just distribute the data uniformly to provide better
+ // load balance. If the requirement is to have a single reducer, we should set
// the number of reducers to 1.
// Use a constant seed to make the code deterministic.
if (random == null) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java?rev=1540328&r1=1540327&r2=1540328&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java Sat Nov 9 15:22:35 2013
@@ -29,7 +29,10 @@ public class HiveKey extends BytesWritab
private static final int LENGTH_BYTES = 4;
- boolean hashCodeValid;
+ private int hashCode;
+ private boolean hashCodeValid;
+
+ private transient int distKeyLength;
public HiveKey() {
hashCodeValid = false;
@@ -37,15 +40,13 @@ public class HiveKey extends BytesWritab
public HiveKey(byte[] bytes, int hashcode) {
super(bytes);
- myHashCode = hashcode;
+ hashCode = hashcode;
hashCodeValid = true;
}
- protected int myHashCode;
-
public void setHashCode(int myHashCode) {
hashCodeValid = true;
- this.myHashCode = myHashCode;
+ hashCode = myHashCode;
}
@Override
@@ -54,7 +55,15 @@ public class HiveKey extends BytesWritab
throw new RuntimeException("Cannot get hashCode() from deserialized "
+ HiveKey.class);
}
- return myHashCode;
+ return hashCode;
+ }
+
+ public void setDistKeyLength(int distKeyLength) {
+ this.distKeyLength = distKeyLength;
+ }
+
+ public int getDistKeyLength() {
+ return distKeyLength;
}
/** A Comparator optimized for HiveKey. */
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java?rev=1540328&r1=1540327&r2=1540328&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java Sat Nov 9 15:22:35 2013
@@ -122,11 +122,6 @@ public class LimitPushdownOptimizer impl
}
}
if (rs != null) {
- List<List<Integer>> distincts = rs.getConf().getDistinctColumnIndices();
- if (distincts != null && distincts.size() > 1) {
- // multi distinct case. can not sure that it's safe just by multiplying limit value
- return false;
- }
LimitOperator limit = (LimitOperator) nd;
rs.getConf().setTopN(limit.getConf().getLimit());
rs.getConf().setTopNMemoryUsage(((LimitPushdownContext) procCtx).threshold);
Modified: hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown.q?rev=1540328&r1=1540327&r2=1540328&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown.q Sat Nov 9 15:22:35 2013
@@ -22,12 +22,17 @@ select value,avg(key + 1) from src group
-- distincts
explain
-select distinct(key) from src limit 20;
-select distinct(key) from src limit 20;
+select distinct(cdouble) from alltypesorc limit 20;
+select distinct(cdouble) from alltypesorc limit 20;
explain
-select key, count(distinct(key)) from src group by key limit 20;
-select key, count(distinct(key)) from src group by key limit 20;
+select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20;
+select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20;
+
+-- multi distinct
+explain
+select ctinyint, count(distinct(cstring1)), count(distinct(cstring2)) from alltypesorc group by ctinyint limit 20;
+select ctinyint, count(distinct(cstring1)), count(distinct(cstring2)) from alltypesorc group by ctinyint limit 20;
-- limit zero
explain
Modified: hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown_negative.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown_negative.q?rev=1540328&r1=1540327&r2=1540328&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown_negative.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown_negative.q Sat Nov 9 15:22:35 2013
@@ -16,7 +16,3 @@ CREATE TABLE dest_3(key STRING, c1 INT);
EXPLAIN FROM src
INSERT OVERWRITE TABLE dest_2 SELECT value, sum(key) GROUP BY value
INSERT OVERWRITE TABLE dest_3 SELECT value, sum(key) GROUP BY value limit 20;
-
--- nagative, multi distinct
-explain
-select count(distinct key)+count(distinct value) from src limit 20;
Modified: hive/trunk/ql/src/test/results/clientpositive/limit_pushdown.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/limit_pushdown.q.out?rev=1540328&r1=1540327&r2=1540328&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/limit_pushdown.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/limit_pushdown.q.out Sat Nov 9 15:22:35 2013
@@ -392,14 +392,14 @@ val_129 130.0
val_131 132.0
PREHOOK: query: -- distincts
explain
-select distinct(key) from src limit 20
+select distinct(cdouble) from alltypesorc limit 20
PREHOOK: type: QUERY
POSTHOOK: query: -- distincts
explain
-select distinct(key) from src limit 20
+select distinct(cdouble) from alltypesorc limit 20
POSTHOOK: type: QUERY
ABSTRACT SYNTAX TREE:
- (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECTDI (TOK_SELEXPR (TOK_TABLE_OR_COL key))) (TOK_LIMIT 20)))
+ (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME alltypesorc))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECTDI (TOK_SELEXPR (TOK_TABLE_OR_COL cdouble))) (TOK_LIMIT 20)))
STAGE DEPENDENCIES:
Stage-1 is a root stage
@@ -409,29 +409,29 @@ STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
- src
+ alltypesorc
TableScan
- alias: src
+ alias: alltypesorc
Select Operator
expressions:
- expr: key
- type: string
- outputColumnNames: key
+ expr: cdouble
+ type: double
+ outputColumnNames: cdouble
Group By Operator
bucketGroup: false
keys:
- expr: key
- type: string
+ expr: cdouble
+ type: double
mode: hash
outputColumnNames: _col0
Reduce Output Operator
key expressions:
expr: _col0
- type: string
+ type: double
sort order: +
Map-reduce partition columns:
expr: _col0
- type: string
+ type: double
tag: -1
TopN: 20
TopN Hash Memory Usage: 0.3
@@ -440,13 +440,13 @@ STAGE PLANS:
bucketGroup: false
keys:
expr: KEY._col0
- type: string
+ type: double
mode: mergepartial
outputColumnNames: _col0
Select Operator
expressions:
expr: _col0
- type: string
+ type: double
outputColumnNames: _col0
Limit
File Output Operator
@@ -462,42 +462,42 @@ STAGE PLANS:
limit: 20
-PREHOOK: query: select distinct(key) from src limit 20
+PREHOOK: query: select distinct(cdouble) from alltypesorc limit 20
PREHOOK: type: QUERY
-PREHOOK: Input: default@src
+PREHOOK: Input: default@alltypesorc
#### A masked pattern was here ####
-POSTHOOK: query: select distinct(key) from src limit 20
+POSTHOOK: query: select distinct(cdouble) from alltypesorc limit 20
POSTHOOK: type: QUERY
-POSTHOOK: Input: default@src
+POSTHOOK: Input: default@alltypesorc
#### A masked pattern was here ####
-0
-10
-100
-103
-104
-105
-11
-111
-113
-114
-116
-118
-119
-12
-120
-125
-126
-128
-129
-131
+NULL
+-16379.0
+-16373.0
+-16372.0
+-16369.0
+-16355.0
+-16339.0
+-16324.0
+-16311.0
+-16310.0
+-16309.0
+-16307.0
+-16306.0
+-16305.0
+-16300.0
+-16296.0
+-16280.0
+-16277.0
+-16274.0
+-16269.0
PREHOOK: query: explain
-select key, count(distinct(key)) from src group by key limit 20
+select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20
PREHOOK: type: QUERY
POSTHOOK: query: explain
-select key, count(distinct(key)) from src group by key limit 20
+select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20
POSTHOOK: type: QUERY
ABSTRACT SYNTAX TREE:
- (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL key)))) (TOK_GROUPBY (TOK_TABLE_OR_COL key)) (TOK_LIMIT 20)))
+ (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME alltypesorc))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL ctinyint)) (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL cdouble)))) (TOK_GROUPBY (TOK_TABLE_OR_COL ctinyint)) (TOK_LIMIT 20)))
STAGE DEPENDENCIES:
Stage-1 is a root stage
@@ -507,36 +507,42 @@ STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
- src
+ alltypesorc
TableScan
- alias: src
+ alias: alltypesorc
Select Operator
expressions:
- expr: key
- type: string
- outputColumnNames: key
+ expr: ctinyint
+ type: tinyint
+ expr: cdouble
+ type: double
+ outputColumnNames: ctinyint, cdouble
Group By Operator
aggregations:
- expr: count(DISTINCT key)
+ expr: count(DISTINCT cdouble)
bucketGroup: false
keys:
- expr: key
- type: string
+ expr: ctinyint
+ type: tinyint
+ expr: cdouble
+ type: double
mode: hash
- outputColumnNames: _col0, _col1
+ outputColumnNames: _col0, _col1, _col2
Reduce Output Operator
key expressions:
expr: _col0
- type: string
+ type: tinyint
+ expr: _col1
+ type: double
sort order: ++
Map-reduce partition columns:
expr: _col0
- type: string
+ type: tinyint
tag: -1
TopN: 20
TopN Hash Memory Usage: 0.3
value expressions:
- expr: _col1
+ expr: _col2
type: bigint
Reduce Operator Tree:
Group By Operator
@@ -545,13 +551,13 @@ STAGE PLANS:
bucketGroup: false
keys:
expr: KEY._col0
- type: string
+ type: tinyint
mode: mergepartial
outputColumnNames: _col0, _col1
Select Operator
expressions:
expr: _col0
- type: string
+ type: tinyint
expr: _col1
type: bigint
outputColumnNames: _col0, _col1
@@ -569,34 +575,161 @@ STAGE PLANS:
limit: 20
-PREHOOK: query: select key, count(distinct(key)) from src group by key limit 20
+PREHOOK: query: select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20
PREHOOK: type: QUERY
-PREHOOK: Input: default@src
+PREHOOK: Input: default@alltypesorc
#### A masked pattern was here ####
-POSTHOOK: query: select key, count(distinct(key)) from src group by key limit 20
+POSTHOOK: query: select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20
POSTHOOK: type: QUERY
-POSTHOOK: Input: default@src
+POSTHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+NULL 2932
+-64 24
+-63 19
+-62 27
+-61 25
+-60 27
+-59 31
+-58 23
+-57 35
+-56 36
+-55 29
+-54 26
+-53 22
+-52 33
+-51 21
+-50 30
+-49 26
+-48 29
+-47 22
+-46 24
+PREHOOK: query: -- multi distinct
+explain
+select ctinyint, count(distinct(cstring1)), count(distinct(cstring2)) from alltypesorc group by ctinyint limit 20
+PREHOOK: type: QUERY
+POSTHOOK: query: -- multi distinct
+explain
+select ctinyint, count(distinct(cstring1)), count(distinct(cstring2)) from alltypesorc group by ctinyint limit 20
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME alltypesorc))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL ctinyint)) (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL cstring1))) (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL cstring2)))) (TOK_GROUPBY (TOK_TABLE_OR_COL ctinyint)) (TOK_LIMIT 20)))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ alltypesorc
+ TableScan
+ alias: alltypesorc
+ Select Operator
+ expressions:
+ expr: ctinyint
+ type: tinyint
+ expr: cstring1
+ type: string
+ expr: cstring2
+ type: string
+ outputColumnNames: ctinyint, cstring1, cstring2
+ Group By Operator
+ aggregations:
+ expr: count(DISTINCT cstring1)
+ expr: count(DISTINCT cstring2)
+ bucketGroup: false
+ keys:
+ expr: ctinyint
+ type: tinyint
+ expr: cstring1
+ type: string
+ expr: cstring2
+ type: string
+ mode: hash
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4
+ Reduce Output Operator
+ key expressions:
+ expr: _col0
+ type: tinyint
+ expr: _col1
+ type: string
+ expr: _col2
+ type: string
+ sort order: +++
+ Map-reduce partition columns:
+ expr: _col0
+ type: tinyint
+ tag: -1
+ TopN: 20
+ TopN Hash Memory Usage: 0.3
+ value expressions:
+ expr: _col3
+ type: bigint
+ expr: _col4
+ type: bigint
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations:
+ expr: count(DISTINCT KEY._col1:0._col0)
+ expr: count(DISTINCT KEY._col1:1._col0)
+ bucketGroup: false
+ keys:
+ expr: KEY._col0
+ type: tinyint
+ mode: mergepartial
+ outputColumnNames: _col0, _col1, _col2
+ Select Operator
+ expressions:
+ expr: _col0
+ type: tinyint
+ expr: _col1
+ type: bigint
+ expr: _col2
+ type: bigint
+ outputColumnNames: _col0, _col1, _col2
+ Limit
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: 20
+
+
+PREHOOK: query: select ctinyint, count(distinct(cstring1)), count(distinct(cstring2)) from alltypesorc group by ctinyint limit 20
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+POSTHOOK: query: select ctinyint, count(distinct(cstring1)), count(distinct(cstring2)) from alltypesorc group by ctinyint limit 20
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
#### A masked pattern was here ####
-0 1
-10 1
-100 1
-103 1
-104 1
-105 1
-11 1
-111 1
-113 1
-114 1
-116 1
-118 1
-119 1
-12 1
-120 1
-125 1
-126 1
-128 1
-129 1
-131 1
+NULL 3065 3
+-64 3 13
+-63 3 16
+-62 3 23
+-61 3 25
+-60 3 25
+-59 3 27
+-58 3 24
+-57 3 23
+-56 3 22
+-55 3 21
+-54 3 21
+-53 3 17
+-52 3 21
+-51 1012 1045
+-50 3 25
+-49 3 24
+-48 3 27
+-47 3 23
+-46 3 19
PREHOOK: query: -- limit zero
explain
select key,value from src order by key limit 0
Modified: hive/trunk/ql/src/test/results/clientpositive/limit_pushdown_negative.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/limit_pushdown_negative.q.out?rev=1540328&r1=1540327&r2=1540328&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/limit_pushdown_negative.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/limit_pushdown_negative.q.out Sat Nov 9 15:22:35 2013
@@ -428,84 +428,3 @@ STAGE PLANS:
Stats-Aggr Operator
-PREHOOK: query: -- nagative, multi distinct
-explain
-select count(distinct key)+count(distinct value) from src limit 20
-PREHOOK: type: QUERY
-POSTHOOK: query: -- nagative, multi distinct
-explain
-select count(distinct key)+count(distinct value) from src limit 20
-POSTHOOK: type: QUERY
-ABSTRACT SYNTAX TREE:
- (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (+ (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL key)) (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL value))))) (TOK_LIMIT 20)))
-
-STAGE DEPENDENCIES:
- Stage-1 is a root stage
- Stage-0 is a root stage
-
-STAGE PLANS:
- Stage: Stage-1
- Map Reduce
- Alias -> Map Operator Tree:
- src
- TableScan
- alias: src
- Select Operator
- expressions:
- expr: key
- type: string
- expr: value
- type: string
- outputColumnNames: key, value
- Group By Operator
- aggregations:
- expr: count(DISTINCT key)
- expr: count(DISTINCT value)
- bucketGroup: false
- keys:
- expr: key
- type: string
- expr: value
- type: string
- mode: hash
- outputColumnNames: _col0, _col1, _col2, _col3
- Reduce Output Operator
- key expressions:
- expr: _col0
- type: string
- expr: _col1
- type: string
- sort order: ++
- tag: -1
- value expressions:
- expr: _col2
- type: bigint
- expr: _col3
- type: bigint
- Reduce Operator Tree:
- Group By Operator
- aggregations:
- expr: count(DISTINCT KEY._col0:0._col0)
- expr: count(DISTINCT KEY._col0:1._col0)
- bucketGroup: false
- mode: mergepartial
- outputColumnNames: _col0, _col1
- Select Operator
- expressions:
- expr: (_col0 + _col1)
- type: bigint
- outputColumnNames: _col0
- Limit
- File Output Operator
- compressed: false
- GlobalTableId: 0
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-
- Stage: Stage-0
- Fetch Operator
- limit: 20
-
-
Modified: hive/trunk/ql/src/test/results/clientpositive/vectorization_limit.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/vectorization_limit.q.out?rev=1540328&r1=1540327&r2=1540328&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/vectorization_limit.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/vectorization_limit.q.out Sat Nov 9 15:22:35 2013
@@ -473,7 +473,26 @@ POSTHOOK: query: select ctinyint, count(
POSTHOOK: type: QUERY
POSTHOOK: Input: default@alltypesorc
#### A masked pattern was here ####
-NULL 19
+NULL 2932
+-64 24
+-63 19
+-62 27
+-61 25
+-60 27
+-59 31
+-58 23
+-57 35
+-56 36
+-55 29
+-54 26
+-53 22
+-52 33
+-51 21
+-50 30
+-49 26
+-48 29
+-47 22
+-46 24
PREHOOK: query: -- limit zero
explain
select ctinyint,cdouble from alltypesorc order by ctinyint limit 0