You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mm...@apache.org on 2016/05/30 23:58:42 UTC
[2/2] hive git commit: HIVE-13713: We miss vectorization in a case of
count(*) when aggregation mode is COMPLETE (Matt McCline,
reviewed by Sergey Shelukhin)
HIVE-13713: We miss vectorization in a case of count(*) when aggregation mode is COMPLETE (Matt McCline, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8136a10c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8136a10c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8136a10c
Branch: refs/heads/master
Commit: 8136a10c1f0c4ebd5545225a9ebddf155ce5d933
Parents: d74d563
Author: Matt McCline <mm...@hortonworks.com>
Authored: Mon May 30 16:58:23 2016 -0700
Committer: Matt McCline <mm...@hortonworks.com>
Committed: Mon May 30 16:58:23 2016 -0700
----------------------------------------------------------------------
.../test/resources/testconfiguration.properties | 3 +
.../ql/exec/vector/AggregateDefinition.java | 13 +-
.../ql/exec/vector/VectorGroupByOperator.java | 56 +--
.../ql/exec/vector/VectorizationContext.java | 354 +++++++++++--------
.../hive/ql/optimizer/physical/Vectorizer.java | 227 +++++++-----
.../hadoop/hive/ql/plan/VectorGroupByDesc.java | 80 ++++-
.../exec/vector/TestVectorGroupByOperator.java | 31 +-
.../ql/optimizer/physical/TestVectorizer.java | 18 +-
.../test/queries/clientpositive/vector_count.q | 26 ++
.../queries/clientpositive/vector_groupby4.q | 23 ++
.../queries/clientpositive/vector_groupby6.q | 24 ++
.../tez/vector_aggregate_without_gby.q.out | 6 +-
.../tez/vector_auto_smb_mapjoin_14.q.out | 24 +-
.../clientpositive/tez/vector_count.q.out | 314 ++++++++++++++++
.../clientpositive/tez/vector_groupby4.q.out | 137 +++++++
.../clientpositive/tez/vector_groupby6.q.out | 137 +++++++
.../tez/vector_groupby_mapjoin.q.out | 10 +-
.../tez/vector_join_part_col_char.q.out | 8 +-
.../tez/vectorization_limit.q.out | 1 +
.../tez/vectorized_timestamp.q.out | 8 +-
.../results/clientpositive/vector_count.q.out | 286 +++++++++++++++
.../clientpositive/vector_groupby4.q.out | 134 +++++++
.../clientpositive/vector_groupby6.q.out | 134 +++++++
23 files changed, 1732 insertions(+), 322 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/8136a10c/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index fd6901c..c4ba277 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -270,6 +270,7 @@ minitez.query.files.shared=acid_globallimit.q,\
vector_coalesce_2.q,\
vector_complex_all.q,\
vector_complex_join.q,\
+ vector_count.q,\
vector_count_distinct.q,\
vector_data_types.q,\
vector_date_1.q,\
@@ -293,6 +294,8 @@ minitez.query.files.shared=acid_globallimit.q,\
vector_decimal_udf2.q,\
vector_distinct_2.q,\
vector_elt.q,\
+ vector_groupby4.q,\
+ vector_groupby6.q,\
vector_groupby_3.q,\
vector_groupby_mapjoin.q,\
vector_groupby_reduce.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/8136a10c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AggregateDefinition.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AggregateDefinition.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AggregateDefinition.java
index 3f15c6f..0334c40 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AggregateDefinition.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AggregateDefinition.java
@@ -20,19 +20,20 @@ package org.apache.hadoop.hive.ql.exec.vector;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
class AggregateDefinition {
private String name;
private VectorExpressionDescriptor.ArgumentType type;
- private GroupByDesc.Mode mode;
+ private GenericUDAFEvaluator.Mode udafEvaluatorMode;
private Class<? extends VectorAggregateExpression> aggClass;
- AggregateDefinition(String name, VectorExpressionDescriptor.ArgumentType type,
- GroupByDesc.Mode mode, Class<? extends VectorAggregateExpression> aggClass) {
+ AggregateDefinition(String name, VectorExpressionDescriptor.ArgumentType type,
+ GenericUDAFEvaluator.Mode udafEvaluatorMode, Class<? extends VectorAggregateExpression> aggClass) {
this.name = name;
this.type = type;
- this.mode = mode;
+ this.udafEvaluatorMode = udafEvaluatorMode;
this.aggClass = aggClass;
}
@@ -42,8 +43,8 @@ class AggregateDefinition {
VectorExpressionDescriptor.ArgumentType getType() {
return type;
}
- GroupByDesc.Mode getMode() {
- return mode;
+ GenericUDAFEvaluator.Mode getUdafEvaluatorMode() {
+ return udafEvaluatorMode;
}
Class<? extends VectorAggregateExpression> getAggClass() {
return aggClass;
http://git-wip-us.apache.org/repos/asf/hive/blob/8136a10c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
index 98a9bf6..6e53526 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc.ProcessingMode;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -51,6 +52,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
/**
* Vectorized GROUP BY operator implementation. Consumes the vectorized input and
@@ -542,18 +544,17 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
if (numEntriesHashTable > sumBatchSize * minReductionHashAggr) {
flush(true);
- changeToUnsortedStreamingMode();
+ changeToStreamingMode();
}
}
}
}
/**
- * Unsorted streaming processing mode. Each input VectorizedRowBatch may have
- * a mix of different keys (hence unsorted). Intermediate values are flushed
- * each time key changes.
+ * Streaming processing mode on ALREADY GROUPED data. Each input VectorizedRowBatch may
+ * have a mix of different keys. Intermediate values are flushed each time key changes.
*/
- private class ProcessingModeUnsortedStreaming extends ProcessingModeBase {
+ private class ProcessingModeStreaming extends ProcessingModeBase {
/**
* The aggregation buffers used in streaming mode
@@ -675,7 +676,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
* writeGroupRow does this and finally increments outputBatch.size.
*
*/
- private class ProcessingModeReduceMergePartialKeys extends ProcessingModeBase {
+ private class ProcessingModeReduceMergePartial extends ProcessingModeBase {
private boolean inGroup;
private boolean first;
@@ -761,8 +762,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
aggregators = new VectorAggregateExpression[aggrDesc.size()];
for (int i = 0; i < aggrDesc.size(); ++i) {
AggregationDesc aggDesc = aggrDesc.get(i);
- aggregators[i] =
- vContext.getAggregatorExpression(aggDesc, desc.getVectorDesc().isReduceMergePartial());
+ aggregators[i] = vContext.getAggregatorExpression(aggDesc);
}
isVectorOutput = desc.getVectorDesc().isVectorOutput();
@@ -810,12 +810,10 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
objectInspectors.add(aggregators[i].getOutputObjectInspector());
}
- if (outputKeyLength > 0 && !conf.getVectorDesc().isReduceMergePartial()) {
- // These data structures are only used by the non Reduce Merge-Partial Keys processing modes.
- keyWrappersBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions);
- aggregationBatchInfo = new VectorAggregationBufferBatch();
- aggregationBatchInfo.compileAggregationBatchInfo(aggregators);
- }
+ keyWrappersBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions);
+ aggregationBatchInfo = new VectorAggregationBufferBatch();
+ aggregationBatchInfo.compileAggregationBatchInfo(aggregators);
+
LOG.info("VectorGroupByOperator is vector output {}", isVectorOutput);
outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
outputFieldNames, objectInspectors);
@@ -835,29 +833,35 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
forwardCache = new Object[outputKeyLength + aggregators.length];
- if (outputKeyLength == 0) {
- // Hash and MergePartial global aggregation are both handled here.
+ switch (conf.getVectorDesc().getProcessingMode()) {
+ case GLOBAL:
+ Preconditions.checkState(outputKeyLength == 0);
processingMode = this.new ProcessingModeGlobalAggregate();
- } else if (conf.getVectorDesc().isReduceMergePartial()) {
- // Sorted GroupBy of vector batches where an individual batch has the same group key (e.g. reduce).
- processingMode = this.new ProcessingModeReduceMergePartialKeys();
- } else if (conf.getVectorDesc().isReduceStreaming()) {
- processingMode = this.new ProcessingModeUnsortedStreaming();
- } else {
- // We start in hash mode and may dynamically switch to unsorted stream mode.
+ break;
+ case HASH:
processingMode = this.new ProcessingModeHashAggregate();
+ break;
+ case MERGE_PARTIAL:
+ processingMode = this.new ProcessingModeReduceMergePartial();
+ break;
+ case STREAMING:
+ processingMode = this.new ProcessingModeStreaming();
+ break;
+ default:
+ throw new RuntimeException("Unsupported vector GROUP BY processing mode " +
+ conf.getVectorDesc().getProcessingMode().name());
}
processingMode.initialize(hconf);
}
/**
- * changes the processing mode to unsorted streaming
+ * changes the processing mode to streaming
* This is done at the request of the hash agg mode, if the number of keys
* exceeds the minReductionHashAggr factor
* @throws HiveException
*/
- private void changeToUnsortedStreamingMode() throws HiveException {
- processingMode = this.new ProcessingModeUnsortedStreaming();
+ private void changeToStreamingMode() throws HiveException {
+ processingMode = this.new ProcessingModeStreaming();
processingMode.initialize(null);
LOG.trace("switched to streaming mode");
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8136a10c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
index 58ce063..1a3299b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
@@ -47,8 +47,8 @@ import org.apache.hadoop.hive.ql.exec.FunctionInfo;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.ArgumentType;
import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.InputExpressionType;
-import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Mode;
import org.apache.hadoop.hive.ql.exec.vector.expressions.*;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFAvgDecimal;
@@ -114,6 +114,7 @@ import org.apache.hadoop.hive.ql.udf.UDFToLong;
import org.apache.hadoop.hive.ql.udf.UDFToShort;
import org.apache.hadoop.hive.ql.udf.UDFToString;
import org.apache.hadoop.hive.ql.udf.generic.*;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
import org.apache.hadoop.hive.serde2.ByteStream.Output;
import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
import org.apache.hadoop.hive.serde2.io.DateWritable;
@@ -404,7 +405,7 @@ public class VectorizationContext {
}
private VectorExpression getColumnVectorExpression(ExprNodeColumnDesc
- exprDesc, Mode mode) throws HiveException {
+ exprDesc, VectorExpressionDescriptor.Mode mode) throws HiveException {
int columnNum = getInputColumnIndex(exprDesc.getColumn());
VectorExpression expr = null;
switch (mode) {
@@ -425,7 +426,7 @@ public class VectorizationContext {
// Ok, try the UDF.
castToBooleanExpr = getVectorExpressionForUdf(null, UDFToBoolean.class, exprAsList,
- Mode.PROJECTION, null);
+ VectorExpressionDescriptor.Mode.PROJECTION, null);
if (castToBooleanExpr == null) {
throw new HiveException("Cannot vectorize converting expression " +
exprDesc.getExprString() + " to boolean");
@@ -443,10 +444,10 @@ public class VectorizationContext {
}
public VectorExpression[] getVectorExpressions(List<ExprNodeDesc> exprNodes) throws HiveException {
- return getVectorExpressions(exprNodes, Mode.PROJECTION);
+ return getVectorExpressions(exprNodes, VectorExpressionDescriptor.Mode.PROJECTION);
}
- public VectorExpression[] getVectorExpressions(List<ExprNodeDesc> exprNodes, Mode mode)
+ public VectorExpression[] getVectorExpressions(List<ExprNodeDesc> exprNodes, VectorExpressionDescriptor.Mode mode)
throws HiveException {
int i = 0;
@@ -461,7 +462,7 @@ public class VectorizationContext {
}
public VectorExpression getVectorExpression(ExprNodeDesc exprDesc) throws HiveException {
- return getVectorExpression(exprDesc, Mode.PROJECTION);
+ return getVectorExpression(exprDesc, VectorExpressionDescriptor.Mode.PROJECTION);
}
/**
@@ -472,7 +473,7 @@ public class VectorizationContext {
* @return {@link VectorExpression}
* @throws HiveException
*/
- public VectorExpression getVectorExpression(ExprNodeDesc exprDesc, Mode mode) throws HiveException {
+ public VectorExpression getVectorExpression(ExprNodeDesc exprDesc, VectorExpressionDescriptor.Mode mode) throws HiveException {
VectorExpression ve = null;
if (exprDesc instanceof ExprNodeColumnDesc) {
ve = getColumnVectorExpression((ExprNodeColumnDesc) exprDesc, mode);
@@ -873,14 +874,14 @@ public class VectorizationContext {
}
private VectorExpression getConstantVectorExpression(Object constantValue, TypeInfo typeInfo,
- Mode mode) throws HiveException {
+ VectorExpressionDescriptor.Mode mode) throws HiveException {
String typeName = typeInfo.getTypeName();
VectorExpressionDescriptor.ArgumentType vectorArgType = VectorExpressionDescriptor.ArgumentType.fromHiveTypeName(typeName);
if (vectorArgType == VectorExpressionDescriptor.ArgumentType.NONE) {
throw new HiveException("No vector argument type for type name " + typeName);
}
int outCol = -1;
- if (mode == Mode.PROJECTION) {
+ if (mode == VectorExpressionDescriptor.Mode.PROJECTION) {
outCol = ocm.allocateOutputColumn(typeName);
}
if (constantValue == null) {
@@ -889,7 +890,7 @@ public class VectorizationContext {
// Boolean is special case.
if (typeName.equalsIgnoreCase("boolean")) {
- if (mode == Mode.FILTER) {
+ if (mode == VectorExpressionDescriptor.Mode.FILTER) {
if (((Boolean) constantValue).booleanValue()) {
return new FilterConstantBooleanVectorExpression(1);
} else {
@@ -961,7 +962,7 @@ public class VectorizationContext {
}
private VectorExpression getVectorExpressionForUdf(GenericUDF genericeUdf,
- Class<?> udfClass, List<ExprNodeDesc> childExpr, Mode mode,
+ Class<?> udfClass, List<ExprNodeDesc> childExpr, VectorExpressionDescriptor.Mode mode,
TypeInfo returnType) throws HiveException {
int numChildren = (childExpr == null) ? 0 : childExpr.size();
@@ -973,13 +974,13 @@ public class VectorizationContext {
Class<?> vclass;
if (genericeUdf instanceof GenericUDFOPOr) {
- if (mode == Mode.PROJECTION) {
+ if (mode == VectorExpressionDescriptor.Mode.PROJECTION) {
vclass = ColOrCol.class;
} else {
vclass = FilterExprOrExpr.class;
}
} else if (genericeUdf instanceof GenericUDFOPAnd) {
- if (mode == Mode.PROJECTION) {
+ if (mode == VectorExpressionDescriptor.Mode.PROJECTION) {
vclass = ColAndCol.class;
} else {
vclass = FilterExprAndExpr.class;
@@ -987,8 +988,8 @@ public class VectorizationContext {
} else {
throw new RuntimeException("Unexpected multi-child UDF");
}
- Mode childrenMode = getChildrenMode(mode, udfClass);
- if (mode == Mode.PROJECTION) {
+ VectorExpressionDescriptor.Mode childrenMode = getChildrenMode(mode, udfClass);
+ if (mode == VectorExpressionDescriptor.Mode.PROJECTION) {
return createVectorMultiAndOrProjectionExpr(vclass, childExpr, childrenMode, returnType);
} else {
return createVectorExpression(vclass, childExpr, childrenMode, returnType);
@@ -1027,12 +1028,12 @@ public class VectorizationContext {
}
return null;
}
- Mode childrenMode = getChildrenMode(mode, udfClass);
+ VectorExpressionDescriptor.Mode childrenMode = getChildrenMode(mode, udfClass);
return createVectorExpression(vclass, childExpr, childrenMode, returnType);
}
private void determineChildrenVectorExprAndArguments(Class<?> vectorClass,
- List<ExprNodeDesc> childExpr, int numChildren, Mode childrenMode,
+ List<ExprNodeDesc> childExpr, int numChildren, VectorExpressionDescriptor.Mode childrenMode,
VectorExpression.Type [] inputTypes, List<VectorExpression> children, Object[] arguments)
throws HiveException {
for (int i = 0; i < numChildren; i++) {
@@ -1048,7 +1049,7 @@ public class VectorizationContext {
arguments[i] = vChild.getOutputColumn();
} else if (child instanceof ExprNodeColumnDesc) {
int colIndex = getInputColumnIndex((ExprNodeColumnDesc) child);
- if (childrenMode == Mode.FILTER) {
+ if (childrenMode == VectorExpressionDescriptor.Mode.FILTER) {
// In filter mode, the column must be a boolean
children.add(new SelectColumnIsTrue(colIndex));
}
@@ -1063,7 +1064,7 @@ public class VectorizationContext {
}
private VectorExpression createVectorExpression(Class<?> vectorClass,
- List<ExprNodeDesc> childExpr, Mode childrenMode, TypeInfo returnType) throws HiveException {
+ List<ExprNodeDesc> childExpr, VectorExpressionDescriptor.Mode childrenMode, TypeInfo returnType) throws HiveException {
int numChildren = childExpr == null ? 0: childExpr.size();
VectorExpression.Type [] inputTypes = new VectorExpression.Type[numChildren];
List<VectorExpression> children = new ArrayList<VectorExpression>();
@@ -1087,7 +1088,7 @@ public class VectorizationContext {
}
private VectorExpression createVectorMultiAndOrProjectionExpr(Class<?> vectorClass,
- List<ExprNodeDesc> childExpr, Mode childrenMode, TypeInfo returnType) throws HiveException {
+ List<ExprNodeDesc> childExpr, VectorExpressionDescriptor.Mode childrenMode, TypeInfo returnType) throws HiveException {
int numChildren = childExpr == null ? 0: childExpr.size();
VectorExpression.Type [] inputTypes = new VectorExpression.Type[numChildren];
List<VectorExpression> children = new ArrayList<VectorExpression>();
@@ -1119,11 +1120,11 @@ public class VectorizationContext {
}
}
- private Mode getChildrenMode(Mode mode, Class<?> udf) {
- if (mode.equals(Mode.FILTER) && (udf.equals(GenericUDFOPAnd.class) || udf.equals(GenericUDFOPOr.class))) {
- return Mode.FILTER;
+ private VectorExpressionDescriptor.Mode getChildrenMode(VectorExpressionDescriptor.Mode mode, Class<?> udf) {
+ if (mode.equals(VectorExpressionDescriptor.Mode.FILTER) && (udf.equals(GenericUDFOPAnd.class) || udf.equals(GenericUDFOPOr.class))) {
+ return VectorExpressionDescriptor.Mode.FILTER;
}
- return Mode.PROJECTION;
+ return VectorExpressionDescriptor.Mode.PROJECTION;
}
private String getNewInstanceArgumentString(Object [] args) {
@@ -1196,7 +1197,7 @@ public class VectorizationContext {
}
private VectorExpression getGenericUdfVectorExpression(GenericUDF udf,
- List<ExprNodeDesc> childExpr, Mode mode, TypeInfo returnType) throws HiveException {
+ List<ExprNodeDesc> childExpr, VectorExpressionDescriptor.Mode mode, TypeInfo returnType) throws HiveException {
List<ExprNodeDesc> castedChildren = evaluateCastOnConstants(childExpr);
childExpr = castedChildren;
@@ -1204,7 +1205,7 @@ public class VectorizationContext {
//First handle special cases. If one of the special case methods cannot handle it,
// it returns null.
VectorExpression ve = null;
- if (udf instanceof GenericUDFBetween && mode == Mode.FILTER) {
+ if (udf instanceof GenericUDFBetween && mode == VectorExpressionDescriptor.Mode.FILTER) {
ve = getBetweenFilterExpression(childExpr, mode, returnType);
} else if (udf instanceof GenericUDFIn) {
ve = getInExpression(childExpr, mode, returnType);
@@ -1249,13 +1250,13 @@ public class VectorizationContext {
}
private VectorExpression getCastToTimestamp(GenericUDFTimestamp udf,
- List<ExprNodeDesc> childExpr, Mode mode, TypeInfo returnType) throws HiveException {
+ List<ExprNodeDesc> childExpr, VectorExpressionDescriptor.Mode mode, TypeInfo returnType) throws HiveException {
VectorExpression ve = getVectorExpressionForUdf(udf, udf.getClass(), childExpr, mode, returnType);
// Replace with the milliseconds conversion
if (!udf.isIntToTimestampInSeconds() && ve instanceof CastLongToTimestamp) {
ve = createVectorExpression(CastMillisecondsLongToTimestamp.class,
- childExpr, Mode.PROJECTION, returnType);
+ childExpr, VectorExpressionDescriptor.Mode.PROJECTION, returnType);
}
return ve;
@@ -1266,7 +1267,7 @@ public class VectorizationContext {
int[] inputColumns = new int[childExpr.size()];
VectorExpression[] vectorChildren = null;
try {
- vectorChildren = getVectorExpressions(childExpr, Mode.PROJECTION);
+ vectorChildren = getVectorExpressions(childExpr, VectorExpressionDescriptor.Mode.PROJECTION);
int i = 0;
for (VectorExpression ve : vectorChildren) {
@@ -1293,7 +1294,7 @@ public class VectorizationContext {
int[] inputColumns = new int[childExpr.size()];
VectorExpression[] vectorChildren = null;
try {
- vectorChildren = getVectorExpressions(childExpr, Mode.PROJECTION);
+ vectorChildren = getVectorExpressions(childExpr, VectorExpressionDescriptor.Mode.PROJECTION);
int i = 0;
for (VectorExpression ve : vectorChildren) {
@@ -1363,7 +1364,7 @@ public class VectorizationContext {
}
private VectorExpression getStructInExpression(List<ExprNodeDesc> childExpr, ExprNodeDesc colExpr,
- TypeInfo colTypeInfo, List<ExprNodeDesc> inChildren, Mode mode, TypeInfo returnType)
+ TypeInfo colTypeInfo, List<ExprNodeDesc> inChildren, VectorExpressionDescriptor.Mode mode, TypeInfo returnType)
throws HiveException {
VectorExpression expr = null;
@@ -1484,9 +1485,9 @@ public class VectorizationContext {
// generate the serialized keys of the batch.
int scratchBytesCol = ocm.allocateOutputColumn("string");
- Class<?> cl = (mode == Mode.FILTER ? FilterStructColumnInList.class : StructColumnInList.class);
+ Class<?> cl = (mode == VectorExpressionDescriptor.Mode.FILTER ? FilterStructColumnInList.class : StructColumnInList.class);
- expr = createVectorExpression(cl, null, Mode.PROJECTION, returnType);
+ expr = createVectorExpression(cl, null, VectorExpressionDescriptor.Mode.PROJECTION, returnType);
((IStringInExpr) expr).setInListValues(serializedInChildren);
@@ -1500,7 +1501,7 @@ public class VectorizationContext {
/**
* Create a filter or boolean-valued expression for column IN ( <list-of-constants> )
*/
- private VectorExpression getInExpression(List<ExprNodeDesc> childExpr, Mode mode, TypeInfo returnType)
+ private VectorExpression getInExpression(List<ExprNodeDesc> childExpr, VectorExpressionDescriptor.Mode mode, TypeInfo returnType)
throws HiveException {
ExprNodeDesc colExpr = childExpr.get(0);
List<ExprNodeDesc> inChildren = childExpr.subList(1, childExpr.size());
@@ -1538,53 +1539,53 @@ public class VectorizationContext {
// determine class
Class<?> cl = null;
if (isIntFamily(colType)) {
- cl = (mode == Mode.FILTER ? FilterLongColumnInList.class : LongColumnInList.class);
+ cl = (mode == VectorExpressionDescriptor.Mode.FILTER ? FilterLongColumnInList.class : LongColumnInList.class);
long[] inVals = new long[childrenForInList.size()];
for (int i = 0; i != inVals.length; i++) {
inVals[i] = getIntFamilyScalarAsLong((ExprNodeConstantDesc) childrenForInList.get(i));
}
- expr = createVectorExpression(cl, childExpr.subList(0, 1), Mode.PROJECTION, returnType);
+ expr = createVectorExpression(cl, childExpr.subList(0, 1), VectorExpressionDescriptor.Mode.PROJECTION, returnType);
((ILongInExpr) expr).setInListValues(inVals);
} else if (isTimestampFamily(colType)) {
- cl = (mode == Mode.FILTER ? FilterTimestampColumnInList.class : TimestampColumnInList.class);
+ cl = (mode == VectorExpressionDescriptor.Mode.FILTER ? FilterTimestampColumnInList.class : TimestampColumnInList.class);
Timestamp[] inVals = new Timestamp[childrenForInList.size()];
for (int i = 0; i != inVals.length; i++) {
inVals[i] = getTimestampScalar(childrenForInList.get(i));
}
- expr = createVectorExpression(cl, childExpr.subList(0, 1), Mode.PROJECTION, returnType);
+ expr = createVectorExpression(cl, childExpr.subList(0, 1), VectorExpressionDescriptor.Mode.PROJECTION, returnType);
((ITimestampInExpr) expr).setInListValues(inVals);
} else if (isStringFamily(colType)) {
- cl = (mode == Mode.FILTER ? FilterStringColumnInList.class : StringColumnInList.class);
+ cl = (mode == VectorExpressionDescriptor.Mode.FILTER ? FilterStringColumnInList.class : StringColumnInList.class);
byte[][] inVals = new byte[childrenForInList.size()][];
for (int i = 0; i != inVals.length; i++) {
inVals[i] = getStringScalarAsByteArray((ExprNodeConstantDesc) childrenForInList.get(i));
}
- expr = createVectorExpression(cl, childExpr.subList(0, 1), Mode.PROJECTION, returnType);
+ expr = createVectorExpression(cl, childExpr.subList(0, 1), VectorExpressionDescriptor.Mode.PROJECTION, returnType);
((IStringInExpr) expr).setInListValues(inVals);
} else if (isFloatFamily(colType)) {
- cl = (mode == Mode.FILTER ? FilterDoubleColumnInList.class : DoubleColumnInList.class);
+ cl = (mode == VectorExpressionDescriptor.Mode.FILTER ? FilterDoubleColumnInList.class : DoubleColumnInList.class);
double[] inValsD = new double[childrenForInList.size()];
for (int i = 0; i != inValsD.length; i++) {
inValsD[i] = getNumericScalarAsDouble(childrenForInList.get(i));
}
- expr = createVectorExpression(cl, childExpr.subList(0, 1), Mode.PROJECTION, returnType);
+ expr = createVectorExpression(cl, childExpr.subList(0, 1), VectorExpressionDescriptor.Mode.PROJECTION, returnType);
((IDoubleInExpr) expr).setInListValues(inValsD);
} else if (isDecimalFamily(colType)) {
- cl = (mode == Mode.FILTER ? FilterDecimalColumnInList.class : DecimalColumnInList.class);
+ cl = (mode == VectorExpressionDescriptor.Mode.FILTER ? FilterDecimalColumnInList.class : DecimalColumnInList.class);
HiveDecimal[] inValsD = new HiveDecimal[childrenForInList.size()];
for (int i = 0; i != inValsD.length; i++) {
inValsD[i] = (HiveDecimal) getVectorTypeScalarValue(
(ExprNodeConstantDesc) childrenForInList.get(i));
}
- expr = createVectorExpression(cl, childExpr.subList(0, 1), Mode.PROJECTION, returnType);
+ expr = createVectorExpression(cl, childExpr.subList(0, 1), VectorExpressionDescriptor.Mode.PROJECTION, returnType);
((IDecimalInExpr) expr).setInListValues(inValsD);
} else if (isDateFamily(colType)) {
- cl = (mode == Mode.FILTER ? FilterLongColumnInList.class : LongColumnInList.class);
+ cl = (mode == VectorExpressionDescriptor.Mode.FILTER ? FilterLongColumnInList.class : LongColumnInList.class);
long[] inVals = new long[childrenForInList.size()];
for (int i = 0; i != inVals.length; i++) {
inVals[i] = (Integer) getVectorTypeScalarValue((ExprNodeConstantDesc) childrenForInList.get(i));
}
- expr = createVectorExpression(cl, childExpr.subList(0, 1), Mode.PROJECTION, returnType);
+ expr = createVectorExpression(cl, childExpr.subList(0, 1), VectorExpressionDescriptor.Mode.PROJECTION, returnType);
((ILongInExpr) expr).setInListValues(inVals);
}
@@ -1607,7 +1608,7 @@ public class VectorizationContext {
* descriptor based lookup.
*/
private VectorExpression getGenericUDFBridgeVectorExpression(GenericUDFBridge udf,
- List<ExprNodeDesc> childExpr, Mode mode, TypeInfo returnType) throws HiveException {
+ List<ExprNodeDesc> childExpr, VectorExpressionDescriptor.Mode mode, TypeInfo returnType) throws HiveException {
Class<? extends UDF> cl = udf.getUdfClass();
VectorExpression ve = null;
if (isCastToIntFamily(cl)) {
@@ -1741,21 +1742,21 @@ public class VectorizationContext {
// Return a constant vector expression
Object constantValue = ((ExprNodeConstantDesc) child).getValue();
HiveDecimal decimalValue = castConstantToDecimal(constantValue, child.getTypeInfo());
- return getConstantVectorExpression(decimalValue, returnType, Mode.PROJECTION);
+ return getConstantVectorExpression(decimalValue, returnType, VectorExpressionDescriptor.Mode.PROJECTION);
}
if (isIntFamily(inputType)) {
- return createVectorExpression(CastLongToDecimal.class, childExpr, Mode.PROJECTION, returnType);
+ return createVectorExpression(CastLongToDecimal.class, childExpr, VectorExpressionDescriptor.Mode.PROJECTION, returnType);
} else if (isFloatFamily(inputType)) {
- return createVectorExpression(CastDoubleToDecimal.class, childExpr, Mode.PROJECTION, returnType);
+ return createVectorExpression(CastDoubleToDecimal.class, childExpr, VectorExpressionDescriptor.Mode.PROJECTION, returnType);
} else if (decimalTypePattern.matcher(inputType).matches()) {
- return createVectorExpression(CastDecimalToDecimal.class, childExpr, Mode.PROJECTION,
+ return createVectorExpression(CastDecimalToDecimal.class, childExpr, VectorExpressionDescriptor.Mode.PROJECTION,
returnType);
} else if (isStringFamily(inputType)) {
- return createVectorExpression(CastStringToDecimal.class, childExpr, Mode.PROJECTION, returnType);
+ return createVectorExpression(CastStringToDecimal.class, childExpr, VectorExpressionDescriptor.Mode.PROJECTION, returnType);
} else if (inputType.equals("timestamp")) {
- return createVectorExpression(CastTimestampToDecimal.class, childExpr, Mode.PROJECTION, returnType);
+ return createVectorExpression(CastTimestampToDecimal.class, childExpr, VectorExpressionDescriptor.Mode.PROJECTION, returnType);
}
- throw null;
+ return null;
}
private VectorExpression getCastToString(List<ExprNodeDesc> childExpr, TypeInfo returnType)
@@ -1766,19 +1767,19 @@ public class VectorizationContext {
// Return a constant vector expression
Object constantValue = ((ExprNodeConstantDesc) child).getValue();
String strValue = castConstantToString(constantValue, child.getTypeInfo());
- return getConstantVectorExpression(strValue, returnType, Mode.PROJECTION);
+ return getConstantVectorExpression(strValue, returnType, VectorExpressionDescriptor.Mode.PROJECTION);
}
if (inputType.equals("boolean")) {
// Boolean must come before the integer family. It's a special case.
- return createVectorExpression(CastBooleanToStringViaLongToString.class, childExpr, Mode.PROJECTION, null);
+ return createVectorExpression(CastBooleanToStringViaLongToString.class, childExpr, VectorExpressionDescriptor.Mode.PROJECTION, null);
} else if (isIntFamily(inputType)) {
- return createVectorExpression(CastLongToString.class, childExpr, Mode.PROJECTION, returnType);
+ return createVectorExpression(CastLongToString.class, childExpr, VectorExpressionDescriptor.Mode.PROJECTION, returnType);
} else if (isDecimalFamily(inputType)) {
- return createVectorExpression(CastDecimalToString.class, childExpr, Mode.PROJECTION, returnType);
+ return createVectorExpression(CastDecimalToString.class, childExpr, VectorExpressionDescriptor.Mode.PROJECTION, returnType);
} else if (isDateFamily(inputType)) {
- return createVectorExpression(CastDateToString.class, childExpr, Mode.PROJECTION, returnType);
+ return createVectorExpression(CastDateToString.class, childExpr, VectorExpressionDescriptor.Mode.PROJECTION, returnType);
} else if (isStringFamily(inputType)) {
- return createVectorExpression(CastStringGroupToString.class, childExpr, Mode.PROJECTION, returnType);
+ return createVectorExpression(CastStringGroupToString.class, childExpr, VectorExpressionDescriptor.Mode.PROJECTION, returnType);
}
return null;
}
@@ -1794,15 +1795,15 @@ public class VectorizationContext {
}
if (inputType.equals("boolean")) {
// Boolean must come before the integer family. It's a special case.
- return createVectorExpression(CastBooleanToCharViaLongToChar.class, childExpr, Mode.PROJECTION, returnType);
+ return createVectorExpression(CastBooleanToCharViaLongToChar.class, childExpr, VectorExpressionDescriptor.Mode.PROJECTION, returnType);
} else if (isIntFamily(inputType)) {
- return createVectorExpression(CastLongToChar.class, childExpr, Mode.PROJECTION, returnType);
+ return createVectorExpression(CastLongToChar.class, childExpr, VectorExpressionDescriptor.Mode.PROJECTION, returnType);
} else if (isDecimalFamily(inputType)) {
- return createVectorExpression(CastDecimalToChar.class, childExpr, Mode.PROJECTION, returnType);
+ return createVectorExpression(CastDecimalToChar.class, childExpr, VectorExpressionDescriptor.Mode.PROJECTION, returnType);
} else if (isDateFamily(inputType)) {
- return createVectorExpression(CastDateToChar.class, childExpr, Mode.PROJECTION, returnType);
+ return createVectorExpression(CastDateToChar.class, childExpr, VectorExpressionDescriptor.Mode.PROJECTION, returnType);
} else if (isStringFamily(inputType)) {
- return createVectorExpression(CastStringGroupToChar.class, childExpr, Mode.PROJECTION, returnType);
+ return createVectorExpression(CastStringGroupToChar.class, childExpr, VectorExpressionDescriptor.Mode.PROJECTION, returnType);
}
return null;
}
@@ -1818,15 +1819,15 @@ public class VectorizationContext {
}
if (inputType.equals("boolean")) {
// Boolean must come before the integer family. It's a special case.
- return createVectorExpression(CastBooleanToVarCharViaLongToVarChar.class, childExpr, Mode.PROJECTION, returnType);
+ return createVectorExpression(CastBooleanToVarCharViaLongToVarChar.class, childExpr, VectorExpressionDescriptor.Mode.PROJECTION, returnType);
} else if (isIntFamily(inputType)) {
- return createVectorExpression(CastLongToVarChar.class, childExpr, Mode.PROJECTION, returnType);
+ return createVectorExpression(CastLongToVarChar.class, childExpr, VectorExpressionDescriptor.Mode.PROJECTION, returnType);
} else if (isDecimalFamily(inputType)) {
- return createVectorExpression(CastDecimalToVarChar.class, childExpr, Mode.PROJECTION, returnType);
+ return createVectorExpression(CastDecimalToVarChar.class, childExpr, VectorExpressionDescriptor.Mode.PROJECTION, returnType);
} else if (isDateFamily(inputType)) {
- return createVectorExpression(CastDateToVarChar.class, childExpr, Mode.PROJECTION, returnType);
+ return createVectorExpression(CastDateToVarChar.class, childExpr, VectorExpressionDescriptor.Mode.PROJECTION, returnType);
} else if (isStringFamily(inputType)) {
- return createVectorExpression(CastStringGroupToVarChar.class, childExpr, Mode.PROJECTION, returnType);
+ return createVectorExpression(CastStringGroupToVarChar.class, childExpr, VectorExpressionDescriptor.Mode.PROJECTION, returnType);
}
return null;
}
@@ -1839,17 +1840,17 @@ public class VectorizationContext {
// Return a constant vector expression
Object constantValue = ((ExprNodeConstantDesc) child).getValue();
Double doubleValue = castConstantToDouble(constantValue, child.getTypeInfo());
- return getConstantVectorExpression(doubleValue, returnType, Mode.PROJECTION);
+ return getConstantVectorExpression(doubleValue, returnType, VectorExpressionDescriptor.Mode.PROJECTION);
}
if (isIntFamily(inputType)) {
if (udf.equals(UDFToFloat.class)) {
// In order to convert from integer to float correctly, we need to apply the float cast not the double cast (HIVE-13338).
- return createVectorExpression(CastLongToFloatViaLongToDouble.class, childExpr, Mode.PROJECTION, returnType);
+ return createVectorExpression(CastLongToFloatViaLongToDouble.class, childExpr, VectorExpressionDescriptor.Mode.PROJECTION, returnType);
} else {
- return createVectorExpression(CastLongToDouble.class, childExpr, Mode.PROJECTION, returnType);
+ return createVectorExpression(CastLongToDouble.class, childExpr, VectorExpressionDescriptor.Mode.PROJECTION, returnType);
}
} else if (inputType.equals("timestamp")) {
- return createVectorExpression(CastTimestampToDouble.class, childExpr, Mode.PROJECTION,
+ return createVectorExpression(CastTimestampToDouble.class, childExpr, VectorExpressionDescriptor.Mode.PROJECTION,
returnType);
} else if (isFloatFamily(inputType)) {
@@ -1865,7 +1866,7 @@ public class VectorizationContext {
String inputType = childExpr.get(0).getTypeString();
if (child instanceof ExprNodeConstantDesc) {
if (null == ((ExprNodeConstantDesc)child).getValue()) {
- return getConstantVectorExpression(null, TypeInfoFactory.booleanTypeInfo, Mode.PROJECTION);
+ return getConstantVectorExpression(null, TypeInfoFactory.booleanTypeInfo, VectorExpressionDescriptor.Mode.PROJECTION);
}
// Don't do constant folding here. Wait until the optimizer is changed to do it.
// Family of related JIRAs: HIVE-7421, HIVE-7422, and HIVE-7424.
@@ -1875,7 +1876,7 @@ public class VectorizationContext {
if (isStringFamily(inputType)) {
// string casts to false if it is 0 characters long, otherwise true
VectorExpression lenExpr = createVectorExpression(StringLength.class, childExpr,
- Mode.PROJECTION, null);
+ VectorExpressionDescriptor.Mode.PROJECTION, null);
int outputCol = ocm.allocateOutputColumn("Long");
VectorExpression lenToBoolExpr =
@@ -1895,7 +1896,7 @@ public class VectorizationContext {
// Return a constant vector expression
Object constantValue = ((ExprNodeConstantDesc) child).getValue();
Long longValue = castConstantToLong(constantValue, child.getTypeInfo());
- return getConstantVectorExpression(longValue, TypeInfoFactory.longTypeInfo, Mode.PROJECTION);
+ return getConstantVectorExpression(longValue, TypeInfoFactory.longTypeInfo, VectorExpressionDescriptor.Mode.PROJECTION);
}
// Float family, timestamp are handled via descriptor based lookup, int family needs
// special handling.
@@ -1912,10 +1913,10 @@ public class VectorizationContext {
* needs to be done differently than the standard way where all arguments are
* passed to the VectorExpression constructor.
*/
- private VectorExpression getBetweenFilterExpression(List<ExprNodeDesc> childExpr, Mode mode, TypeInfo returnType)
+ private VectorExpression getBetweenFilterExpression(List<ExprNodeDesc> childExpr, VectorExpressionDescriptor.Mode mode, TypeInfo returnType)
throws HiveException {
- if (mode == Mode.PROJECTION) {
+ if (mode == VectorExpressionDescriptor.Mode.PROJECTION) {
// Projection mode is not yet supported for [NOT] BETWEEN. Return null so Vectorizer
// knows to revert to row-at-a-time execution.
@@ -2000,17 +2001,17 @@ public class VectorizationContext {
} else if (isDateFamily(colType) && notKeywordPresent) {
cl = FilterLongColumnNotBetween.class;
}
- return createVectorExpression(cl, childrenAfterNot, Mode.PROJECTION, returnType);
+ return createVectorExpression(cl, childrenAfterNot, VectorExpressionDescriptor.Mode.PROJECTION, returnType);
}
/*
* Return vector expression for a custom (i.e. not built-in) UDF.
*/
- private VectorExpression getCustomUDFExpression(ExprNodeGenericFuncDesc expr, Mode mode)
+ private VectorExpression getCustomUDFExpression(ExprNodeGenericFuncDesc expr, VectorExpressionDescriptor.Mode mode)
throws HiveException {
boolean isFilter = false; // Assume.
- if (mode == Mode.FILTER) {
+ if (mode == VectorExpressionDescriptor.Mode.FILTER) {
// Is output type a BOOLEAN?
TypeInfo resultTypeInfo = expr.getTypeInfo();
@@ -2043,7 +2044,7 @@ public class VectorizationContext {
for (int i = 0; i < childExprList.size(); i++) {
ExprNodeDesc child = childExprList.get(i);
if (child instanceof ExprNodeGenericFuncDesc) {
- VectorExpression e = getVectorExpression(child, Mode.PROJECTION);
+ VectorExpression e = getVectorExpression(child, VectorExpressionDescriptor.Mode.PROJECTION);
vectorExprs.add(e);
variableArgPositions.add(i);
exprResultColumnNums.add(e.getOutputColumn());
@@ -2384,67 +2385,125 @@ public class VectorizationContext {
}
}
- // TODO: When we support vectorized STRUCTs and can handle more in the reduce-side (MERGEPARTIAL):
- // TODO: Write reduce-side versions of AVG. Currently, only map-side (HASH) versions are in table.
- // TODO: And, investigate if different reduce-side versions are needed for var* and std*, or if map-side aggregate can be used.. Right now they are conservatively
- // marked map-side (HASH).
+
+ /*
+ * In the aggregatesDefinition table, Mode is GenericUDAFEvaluator.Mode.
+ *
+ * It is the different modes for an aggregate UDAF (User Defined Aggregation Function).
+ *
+ * (Notice the these names are a subset of GroupByDesc.Mode...)
+ *
+ * PARTIAL1 Original data --> Partial aggregation data
+ *
+ * PARTIAL2 Partial aggregation data --> Partial aggregation data
+ *
+ * FINAL Partial aggregation data --> Full aggregation data
+ *
+ * COMPLETE Original data --> Full aggregation data
+ *
+ *
+ * SIMPLEST CASE --> The data type/semantics of original data, partial aggregation
+ * data, and full aggregation data ARE THE SAME. E.g. MIN, MAX, SUM. The different
+ * modes can be handled by one aggregation class.
+ *
+ * This case has a null for the Mode.
+ *
+ * FOR OTHERS --> The data type/semantics of partial aggregation data and full aggregation data
+ * ARE THE SAME but different than original data. This results in 2 aggregation classes:
+ *
+ * 1) A class that takes original rows and outputs partial/full aggregation
+ * (PARTIAL1/COMPLETE)
+ *
+ * and
+ *
+ * 2) A class that takes partial aggregation and produces full aggregation
+ * (PARTIAL2/FINAL).
+ *
+ * E.g. COUNT(*) and COUNT(column)
+ *
+ * OTHERWISE FULL --> The data type/semantics of partial aggregation data is different than
+ * original data and full aggregation data.
+ *
+ * E.g. AVG uses a STRUCT with count and sum for partial aggregation data. It divides
+ * sum by count to produce the average for final aggregation.
+ *
+ */
static ArrayList<AggregateDefinition> aggregatesDefinition = new ArrayList<AggregateDefinition>() {{
- add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.INT_DATE_INTERVAL_YEAR_MONTH, null, VectorUDAFMinLong.class));
- add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMinDouble.class));
- add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMinString.class));
- add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMinDecimal.class));
- add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.TIMESTAMP, null, VectorUDAFMinTimestamp.class));
- add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.INT_DATE_INTERVAL_YEAR_MONTH, null, VectorUDAFMaxLong.class));
- add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMaxDouble.class));
- add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMaxString.class));
- add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMaxDecimal.class));
- add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.TIMESTAMP, null, VectorUDAFMaxTimestamp.class));
- add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.NONE, GroupByDesc.Mode.HASH, VectorUDAFCountStar.class));
- add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_DATE_INTERVAL_YEAR_MONTH, GroupByDesc.Mode.HASH, VectorUDAFCount.class));
- add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.MERGEPARTIAL, VectorUDAFCountMerge.class));
- add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class));
- add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class));
- add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFCount.class));
- add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.TIMESTAMP, GroupByDesc.Mode.HASH, VectorUDAFCount.class));
- add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INTERVAL_DAY_TIME, GroupByDesc.Mode.HASH, VectorUDAFCount.class));
- add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, null, VectorUDAFSumLong.class));
- add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFSumDouble.class));
- add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFSumDecimal.class));
- add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFAvgLong.class));
- add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFAvgDouble.class));
- add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFAvgDecimal.class));
- add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.TIMESTAMP, GroupByDesc.Mode.HASH, VectorUDAFAvgTimestamp.class));
- add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class));
- add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class));
- add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopDouble.class));
- add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopDouble.class));
- add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarPopDecimal.class));
- add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarPopDecimal.class));
- add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.TIMESTAMP, GroupByDesc.Mode.HASH, VectorUDAFVarPopTimestamp.class));
- add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.TIMESTAMP, GroupByDesc.Mode.HASH, VectorUDAFVarPopTimestamp.class));
- add(new AggregateDefinition("var_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarSampLong.class));
- add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarSampDouble.class));
- add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarSampDecimal.class));
- add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.TIMESTAMP, GroupByDesc.Mode.HASH, VectorUDAFVarSampTimestamp.class));
- add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class));
- add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class));
- add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class));
- add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class));
- add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class));
- add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class));
- add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class));
- add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class));
- add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class));
- add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.TIMESTAMP, GroupByDesc.Mode.HASH, VectorUDAFStdPopTimestamp.class));
- add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.TIMESTAMP, GroupByDesc.Mode.HASH, VectorUDAFStdPopTimestamp.class));
- add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.TIMESTAMP, GroupByDesc.Mode.HASH, VectorUDAFStdPopTimestamp.class));
- add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdSampLong.class));
- add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdSampDouble.class));
- add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdSampDecimal.class));
- add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.TIMESTAMP, GroupByDesc.Mode.HASH, VectorUDAFStdSampTimestamp.class));
+
+ // MIN, MAX, and SUM have the same representation for partial and full aggregation, so the
+ // same class can be used for all modes (PARTIAL1, PARTIAL2, FINAL, and COMPLETE).
+ add(new AggregateDefinition("min", ArgumentType.INT_DATE_INTERVAL_YEAR_MONTH, null, VectorUDAFMinLong.class));
+ add(new AggregateDefinition("min", ArgumentType.FLOAT_FAMILY, null, VectorUDAFMinDouble.class));
+ add(new AggregateDefinition("min", ArgumentType.STRING_FAMILY, null, VectorUDAFMinString.class));
+ add(new AggregateDefinition("min", ArgumentType.DECIMAL, null, VectorUDAFMinDecimal.class));
+ add(new AggregateDefinition("min", ArgumentType.TIMESTAMP, null, VectorUDAFMinTimestamp.class));
+ add(new AggregateDefinition("max", ArgumentType.INT_DATE_INTERVAL_YEAR_MONTH, null, VectorUDAFMaxLong.class));
+ add(new AggregateDefinition("max", ArgumentType.FLOAT_FAMILY, null, VectorUDAFMaxDouble.class));
+ add(new AggregateDefinition("max", ArgumentType.STRING_FAMILY, null, VectorUDAFMaxString.class));
+ add(new AggregateDefinition("max", ArgumentType.DECIMAL, null, VectorUDAFMaxDecimal.class));
+ add(new AggregateDefinition("max", ArgumentType.TIMESTAMP, null, VectorUDAFMaxTimestamp.class));
+ add(new AggregateDefinition("sum", ArgumentType.INT_FAMILY, null, VectorUDAFSumLong.class));
+ add(new AggregateDefinition("sum", ArgumentType.FLOAT_FAMILY, null, VectorUDAFSumDouble.class));
+ add(new AggregateDefinition("sum", ArgumentType.DECIMAL, null, VectorUDAFSumDecimal.class));
+
+ // COUNT(column) doesn't count rows whose column value is NULL.
+ add(new AggregateDefinition("count", ArgumentType.ALL_FAMILY, Mode.PARTIAL1, VectorUDAFCount.class));
+ add(new AggregateDefinition("count", ArgumentType.ALL_FAMILY, Mode.COMPLETE, VectorUDAFCount.class));
+
+ // COUNT(*) counts all rows regardless of whether the column value(s) are NULL.
+ add(new AggregateDefinition("count", ArgumentType.NONE, Mode.PARTIAL1, VectorUDAFCountStar.class));
+ add(new AggregateDefinition("count", ArgumentType.NONE, Mode.COMPLETE, VectorUDAFCountStar.class));
+
+ // Merge the counts produced by either COUNT(column) or COUNT(*) modes PARTIAL1 or PARTIAL2.
+ add(new AggregateDefinition("count", ArgumentType.INT_FAMILY, Mode.PARTIAL2, VectorUDAFCountMerge.class));
+ add(new AggregateDefinition("count", ArgumentType.INT_FAMILY, Mode.FINAL, VectorUDAFCountMerge.class));
+
+ // Since the partial aggregation produced by AVG is a STRUCT with count and sum and the
+ // STRUCT data type isn't vectorized yet, we currently only support PARTIAL1. When we do
+ // support STRUCTs for average partial aggregation, we'll need 4 variations:
+ //
+ // PARTIAL1 Original data --> STRUCT Average Partial Aggregation
+ // PARTIAL2 STRUCT Average Partial Aggregation --> STRUCT Average Partial Aggregation
+ // FINAL STRUCT Average Partial Aggregation --> Full Aggregation
+ // COMPLETE Original data --> Full Aggregation
+ //
+ add(new AggregateDefinition("avg", ArgumentType.INT_FAMILY, Mode.PARTIAL1, VectorUDAFAvgLong.class));
+ add(new AggregateDefinition("avg", ArgumentType.FLOAT_FAMILY, Mode.PARTIAL1, VectorUDAFAvgDouble.class));
+ add(new AggregateDefinition("avg", ArgumentType.DECIMAL, Mode.PARTIAL1, VectorUDAFAvgDecimal.class));
+ add(new AggregateDefinition("avg", ArgumentType.TIMESTAMP, Mode.PARTIAL1, VectorUDAFAvgTimestamp.class));
+
+ // We haven't had a chance to examine the VAR* and STD* area and expand it beyond PARTIAL1.
+ add(new AggregateDefinition("variance", ArgumentType.INT_FAMILY, Mode.PARTIAL1, VectorUDAFVarPopLong.class));
+ add(new AggregateDefinition("var_pop", ArgumentType.INT_FAMILY, Mode.PARTIAL1, VectorUDAFVarPopLong.class));
+ add(new AggregateDefinition("variance", ArgumentType.FLOAT_FAMILY, Mode.PARTIAL1, VectorUDAFVarPopDouble.class));
+ add(new AggregateDefinition("var_pop", ArgumentType.FLOAT_FAMILY, Mode.PARTIAL1, VectorUDAFVarPopDouble.class));
+ add(new AggregateDefinition("variance", ArgumentType.DECIMAL, Mode.PARTIAL1, VectorUDAFVarPopDecimal.class));
+ add(new AggregateDefinition("var_pop", ArgumentType.DECIMAL, Mode.PARTIAL1, VectorUDAFVarPopDecimal.class));
+ add(new AggregateDefinition("variance", ArgumentType.TIMESTAMP, Mode.PARTIAL1, VectorUDAFVarPopTimestamp.class));
+ add(new AggregateDefinition("var_pop", ArgumentType.TIMESTAMP, Mode.PARTIAL1, VectorUDAFVarPopTimestamp.class));
+ add(new AggregateDefinition("var_samp", ArgumentType.INT_FAMILY, Mode.PARTIAL1, VectorUDAFVarSampLong.class));
+ add(new AggregateDefinition("var_samp" , ArgumentType.FLOAT_FAMILY, Mode.PARTIAL1, VectorUDAFVarSampDouble.class));
+ add(new AggregateDefinition("var_samp" , ArgumentType.DECIMAL, Mode.PARTIAL1, VectorUDAFVarSampDecimal.class));
+ add(new AggregateDefinition("var_samp" , ArgumentType.TIMESTAMP, Mode.PARTIAL1, VectorUDAFVarSampTimestamp.class));
+ add(new AggregateDefinition("std", ArgumentType.INT_FAMILY, Mode.PARTIAL1, VectorUDAFStdPopLong.class));
+ add(new AggregateDefinition("stddev", ArgumentType.INT_FAMILY, Mode.PARTIAL1, VectorUDAFStdPopLong.class));
+ add(new AggregateDefinition("stddev_pop", ArgumentType.INT_FAMILY, Mode.PARTIAL1, VectorUDAFStdPopLong.class));
+ add(new AggregateDefinition("std", ArgumentType.FLOAT_FAMILY, Mode.PARTIAL1, VectorUDAFStdPopDouble.class));
+ add(new AggregateDefinition("stddev", ArgumentType.FLOAT_FAMILY, Mode.PARTIAL1, VectorUDAFStdPopDouble.class));
+ add(new AggregateDefinition("stddev_pop", ArgumentType.FLOAT_FAMILY, Mode.PARTIAL1, VectorUDAFStdPopDouble.class));
+ add(new AggregateDefinition("std", ArgumentType.DECIMAL, Mode.PARTIAL1, VectorUDAFStdPopDecimal.class));
+ add(new AggregateDefinition("stddev", ArgumentType.DECIMAL, Mode.PARTIAL1, VectorUDAFStdPopDecimal.class));
+ add(new AggregateDefinition("stddev_pop", ArgumentType.DECIMAL, Mode.PARTIAL1, VectorUDAFStdPopDecimal.class));
+ add(new AggregateDefinition("std", ArgumentType.TIMESTAMP, Mode.PARTIAL1, VectorUDAFStdPopTimestamp.class));
+ add(new AggregateDefinition("stddev", ArgumentType.TIMESTAMP, Mode.PARTIAL1, VectorUDAFStdPopTimestamp.class));
+ add(new AggregateDefinition("stddev_pop", ArgumentType.TIMESTAMP, Mode.PARTIAL1, VectorUDAFStdPopTimestamp.class));
+ add(new AggregateDefinition("stddev_samp", ArgumentType.INT_FAMILY, Mode.PARTIAL1, VectorUDAFStdSampLong.class));
+ add(new AggregateDefinition("stddev_samp", ArgumentType.FLOAT_FAMILY, Mode.PARTIAL1, VectorUDAFStdSampDouble.class));
+ add(new AggregateDefinition("stddev_samp", ArgumentType.DECIMAL, Mode.PARTIAL1, VectorUDAFStdSampDecimal.class));
+ add(new AggregateDefinition("stddev_samp", ArgumentType.TIMESTAMP, Mode.PARTIAL1, VectorUDAFStdSampTimestamp.class));
}};
- public VectorAggregateExpression getAggregatorExpression(AggregationDesc desc, boolean isReduceMergePartial)
+ public VectorAggregateExpression getAggregatorExpression(AggregationDesc desc)
throws HiveException {
ArrayList<ExprNodeDesc> paramDescList = desc.getParameters();
@@ -2452,7 +2511,7 @@ public class VectorizationContext {
for (int i = 0; i< paramDescList.size(); ++i) {
ExprNodeDesc exprDesc = paramDescList.get(i);
- vectorParams[i] = this.getVectorExpression(exprDesc, Mode.PROJECTION);
+ vectorParams[i] = this.getVectorExpression(exprDesc, VectorExpressionDescriptor.Mode.PROJECTION);
}
String aggregateName = desc.getGenericUDAFName();
@@ -2466,15 +2525,16 @@ public class VectorizationContext {
}
}
+ GenericUDAFEvaluator.Mode udafEvaluatorMode = desc.getMode();
for (AggregateDefinition aggDef : aggregatesDefinition) {
if (aggregateName.equalsIgnoreCase(aggDef.getName()) &&
((aggDef.getType() == VectorExpressionDescriptor.ArgumentType.NONE &&
inputType == VectorExpressionDescriptor.ArgumentType.NONE) ||
(aggDef.getType().isSameTypeOrFamily(inputType)))) {
- if (aggDef.getMode() == GroupByDesc.Mode.HASH && isReduceMergePartial) {
- continue;
- } else if (aggDef.getMode() == GroupByDesc.Mode.MERGEPARTIAL && !isReduceMergePartial) {
+ // A null means all modes are ok.
+ GenericUDAFEvaluator.Mode aggDefUdafEvaluatorMode = aggDef.getUdafEvaluatorMode();
+ if (aggDefUdafEvaluatorMode != null && aggDefUdafEvaluatorMode != udafEvaluatorMode) {
continue;
}
@@ -2495,7 +2555,9 @@ public class VectorizationContext {
}
throw new HiveException("Vector aggregate not implemented: \"" + aggregateName +
- "\" for type: \"" + inputType.name() + " (reduce-merge-partial = " + isReduceMergePartial + ")");
+ "\" for type: \"" + inputType.name() +
+ " (UDAF evaluator mode = " +
+ (udafEvaluatorMode == null ? "NULL" : udafEvaluatorMode.name()) + ")");
}
public int firstOutputColumnIndex() {
http://git-wip-us.apache.org/repos/asf/hive/blob/8136a10c/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index c1d6582..00203ae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -33,6 +33,7 @@ import java.util.Set;
import java.util.Stack;
import java.util.regex.Pattern;
+import org.apache.calcite.util.Pair;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,6 +97,8 @@ import org.apache.hadoop.hive.ql.plan.JoinDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc.ProcessingMode;
+import org.apache.hadoop.hive.ql.plan.VectorPartitionConversion;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
@@ -1540,76 +1543,131 @@ public class Vectorizer implements PhysicalPlanResolver {
LOG.info("Pruning grouping set id not supported in vector mode");
return false;
}
+ if (desc.getMode() != GroupByDesc.Mode.HASH && desc.isDistinct()) {
+ LOG.info("DISTINCT not supported in vector mode");
+ return false;
+ }
boolean ret = validateExprNodeDesc(desc.getKeys());
if (!ret) {
- LOG.info("Cannot vectorize groupby key expression");
+ LOG.info("Cannot vectorize groupby key expression " + desc.getKeys().toString());
return false;
}
- if (!isReduce) {
-
- // MapWork
-
- ret = validateHashAggregationDesc(desc.getAggregators());
- if (!ret) {
- return false;
- }
- } else {
-
- // ReduceWork
-
- boolean isComplete = desc.getMode() == GroupByDesc.Mode.COMPLETE;
- if (desc.getMode() != GroupByDesc.Mode.HASH) {
-
- // Reduce Merge-Partial GROUP BY.
-
- // A merge-partial GROUP BY is fed by grouping by keys from reduce-shuffle. It is the
- // first (or root) operator for its reduce task.
- // TODO: Technically, we should also handle FINAL, PARTIAL1, PARTIAL2 and PARTIALS
- // that are not hash or complete, but aren't merge-partial, somehow.
-
- if (desc.isDistinct()) {
- LOG.info("Vectorized Reduce MergePartial GROUP BY does not support DISTINCT");
- return false;
- }
-
- boolean hasKeys = (desc.getKeys().size() > 0);
-
- // Do we support merge-partial aggregation AND the output is primitive?
- ret = validateReduceMergePartialAggregationDesc(desc.getAggregators(), hasKeys);
- if (!ret) {
- return false;
- }
+ /**
+ *
+ * GROUP BY DEFINITIONS:
+ *
+ * GroupByDesc.Mode enumeration:
+ *
+ * The different modes of a GROUP BY operator.
+ *
+ * These descriptions are hopefully less cryptic than the comments for GroupByDesc.Mode.
+ *
+ * COMPLETE Aggregates original rows into full aggregation row(s).
+ *
+ * If the key length is 0, this is also called Global aggregation and
+ * 1 output row is produced.
+ *
+ * When the key length is > 0, the original rows come in ALREADY GROUPED.
+ *
+ * An example for key length > 0 is a GROUP BY being applied to the
+ * ALREADY GROUPED rows coming from an upstream JOIN operator. Or,
+ * ALREADY GROUPED rows coming from upstream MERGEPARTIAL GROUP BY
+ * operator.
+ *
+ * PARTIAL1 The first of 2 (or more) phases that aggregates ALREADY GROUPED
+ * original rows into partial aggregations.
+ *
+ * Subsequent phases PARTIAL2 (optional) and MERGEPARTIAL will merge
+ * the partial aggregations and output full aggregations.
+ *
+ * PARTIAL2 Accept ALREADY GROUPED partial aggregations and merge them into another
+ * partial aggregation. Output the merged partial aggregations.
+ *
+ * (Haven't seen this one used)
+ *
+ * PARTIALS (Behaves for non-distinct the same as PARTIAL2; and behaves for
+ * distinct the same as PARTIAL1.)
+ *
+ * FINAL Accept ALREADY GROUPED original rows and aggregate them into
+ * full aggregations.
+ *
+ * Example is a GROUP BY being applied to rows from a sorted table, where
+ * the group key is the table sort key (or a prefix).
+ *
+ * HASH Accept UNORDERED original rows and aggregate them into a memory table.
+ * Output the partial aggregations on closeOp (or low memory).
+ *
+ * Similar to PARTIAL1 except original rows are UNORDERED.
+ *
+ * Commonly used in both Mapper and Reducer nodes. Always followed by
+ * a Reducer with MERGEPARTIAL GROUP BY.
+ *
+ * MERGEPARTIAL Always first operator of a Reducer. Data is grouped by reduce-shuffle.
+ *
+ * (Behaves for non-distinct aggregations the same as FINAL; and behaves
+ * for distinct aggregations the same as COMPLETE.)
+ *
+ * The output is full aggregation(s).
+ *
+ * Used in Reducers after a stage with a HASH GROUP BY operator.
+ *
+ *
+ * VectorGroupByDesc.ProcessingMode for VectorGroupByOperator:
+ *
+ * GLOBAL No key. All rows --> 1 full aggregation on end of input
+ *
+ * HASH Rows aggregated in to hash table on group key -->
+ * 1 partial aggregation per key (normally, unless there is spilling)
+ *
+ * MERGE_PARTIAL As first operator in a REDUCER, partial aggregations come grouped from
+ * reduce-shuffle -->
+ * aggregate the partial aggregations and emit full aggregation on
+ * endGroup / closeOp
+ *
+ * STREAMING Rows come from PARENT operator ALREADY GROUPED -->
+ * aggregate the rows and emit full aggregation on key change / closeOp
+ *
+ * NOTE: Hash can spill partial result rows prematurely if it runs low on memory.
+ * NOTE: Streaming has to compare keys where MergePartial gets an endGroup call.
+ *
+ *
+ * DECIDER: Which VectorGroupByDesc.ProcessingMode for VectorGroupByOperator?
+ *
+ * Decides using GroupByDesc.Mode and whether there are keys with the
+ * VectorGroupByDesc.groupByDescModeToVectorProcessingMode method.
+ *
+ * Mode.COMPLETE --> (numKeys == 0 ? ProcessingMode.GLOBAL : ProcessingMode.STREAMING)
+ *
+ * Mode.HASH --> ProcessingMode.HASH
+ *
+ * Mode.MERGEPARTIAL --> (numKeys == 0 ? ProcessingMode.GLOBAL : ProcessingMode.MERGE_PARTIAL)
+ *
+ * Mode.PARTIAL1,
+ * Mode.PARTIAL2,
+ * Mode.PARTIALS,
+ * Mode.FINAL --> ProcessingMode.STREAMING
+ *
+ */
+ boolean hasKeys = (desc.getKeys().size() > 0);
- if (hasKeys) {
- if (op.getParentOperators().size() > 0 && !isComplete) {
- LOG.info("Vectorized Reduce MergePartial GROUP BY keys can only handle a key group when it is fed by reduce-shuffle");
- return false;
- }
+ ProcessingMode processingMode =
+ VectorGroupByDesc.groupByDescModeToVectorProcessingMode(desc.getMode(), hasKeys);
- LOG.info("Vectorized Reduce MergePartial GROUP BY will process key groups");
+ Pair<Boolean,Boolean> retPair =
+ validateAggregationDescs(desc.getAggregators(), processingMode, hasKeys);
+ if (!retPair.left) {
+ return false;
+ }
- // Primitive output validation above means we can output VectorizedRowBatch to the
- // children operators.
- vectorDesc.setVectorOutput(true);
- } else {
- LOG.info("Vectorized Reduce MergePartial GROUP BY will do global aggregation");
- }
- if (!isComplete) {
- vectorDesc.setIsReduceMergePartial(true);
- } else {
- vectorDesc.setIsReduceStreaming(true);
- }
- } else {
+ // If all the aggregation outputs are primitive, we can output VectorizedRowBatch.
+ // Otherwise, we the rest of the operator tree will be row mode.
+ vectorDesc.setVectorOutput(retPair.right);
- // Reduce Hash GROUP BY or global aggregation.
+ vectorDesc.setProcessingMode(processingMode);
- ret = validateHashAggregationDesc(desc.getAggregators());
- if (!ret) {
- return false;
- }
- }
- }
+ LOG.info("Vector GROUP BY operator will use processing mode " + processingMode.name() +
+ ", isVectorOutput " + vectorDesc.isVectorOutput());
return true;
}
@@ -1633,23 +1691,19 @@ public class Vectorizer implements PhysicalPlanResolver {
return true;
}
-
- private boolean validateHashAggregationDesc(List<AggregationDesc> descs) {
- return validateAggregationDesc(descs, /* isReduceMergePartial */ false, false);
- }
-
- private boolean validateReduceMergePartialAggregationDesc(List<AggregationDesc> descs, boolean hasKeys) {
- return validateAggregationDesc(descs, /* isReduceMergePartial */ true, hasKeys);
- }
-
- private boolean validateAggregationDesc(List<AggregationDesc> descs, boolean isReduceMergePartial, boolean hasKeys) {
+ private Pair<Boolean,Boolean> validateAggregationDescs(List<AggregationDesc> descs,
+ ProcessingMode processingMode, boolean hasKeys) {
+ boolean outputIsPrimitive = true;
for (AggregationDesc d : descs) {
- boolean ret = validateAggregationDesc(d, isReduceMergePartial, hasKeys);
- if (!ret) {
- return false;
+ Pair<Boolean,Boolean> retPair = validateAggregationDesc(d, processingMode, hasKeys);
+ if (!retPair.left) {
+ return retPair;
+ }
+ if (!retPair.right) {
+ outputIsPrimitive = false;
}
}
- return true;
+ return new Pair<Boolean, Boolean>(true, outputIsPrimitive);
}
private boolean validateExprNodeDescRecursive(ExprNodeDesc desc, VectorExpressionDescriptor.Mode mode) {
@@ -1787,38 +1841,45 @@ public class Vectorizer implements PhysicalPlanResolver {
return (outputObjInspector.getCategory() == ObjectInspector.Category.PRIMITIVE);
}
- private boolean validateAggregationDesc(AggregationDesc aggDesc, boolean isReduceMergePartial,
- boolean hasKeys) {
+ private Pair<Boolean,Boolean> validateAggregationDesc(AggregationDesc aggDesc, ProcessingMode processingMode,
+ boolean hasKeys) {
String udfName = aggDesc.getGenericUDAFName().toLowerCase();
if (!supportedAggregationUdfs.contains(udfName)) {
LOG.info("Cannot vectorize groupby aggregate expression: UDF " + udfName + " not supported");
- return false;
+ return new Pair<Boolean,Boolean>(false, false);
}
if (aggDesc.getParameters() != null && !validateExprNodeDesc(aggDesc.getParameters())) {
LOG.info("Cannot vectorize groupby aggregate expression: UDF parameters not supported");
- return false;
+ return new Pair<Boolean,Boolean>(false, false);
}
// See if we can vectorize the aggregation.
VectorizationContext vc = new ValidatorVectorizationContext();
VectorAggregateExpression vectorAggrExpr;
try {
- vectorAggrExpr = vc.getAggregatorExpression(aggDesc, isReduceMergePartial);
+ vectorAggrExpr = vc.getAggregatorExpression(aggDesc);
} catch (Exception e) {
// We should have already attempted to vectorize in validateAggregationDesc.
if (LOG.isDebugEnabled()) {
LOG.debug("Vectorization of aggreation should have succeeded ", e);
}
- return false;
+ return new Pair<Boolean,Boolean>(false, false);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Aggregation " + aggDesc.getExprString() + " --> " +
+ " vector expression " + vectorAggrExpr.toString());
}
- if (isReduceMergePartial && hasKeys && !validateAggregationIsPrimitive(vectorAggrExpr)) {
+ boolean outputIsPrimitive = validateAggregationIsPrimitive(vectorAggrExpr);
+ if (processingMode == ProcessingMode.MERGE_PARTIAL &&
+ hasKeys &&
+ !outputIsPrimitive) {
LOG.info("Vectorized Reduce MergePartial GROUP BY keys can only handle aggregate outputs that are primitive types");
- return false;
+ return new Pair<Boolean,Boolean>(false, false);
}
- return true;
+ return new Pair<Boolean,Boolean>(true, outputIsPrimitive);
}
private boolean validateDataType(String type, VectorExpressionDescriptor.Mode mode) {
http://git-wip-us.apache.org/repos/asf/hive/blob/8136a10c/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorGroupByDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorGroupByDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorGroupByDesc.java
index e613a4e..08f8ebf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorGroupByDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorGroupByDesc.java
@@ -30,23 +30,45 @@ public class VectorGroupByDesc extends AbstractVectorDesc {
private static long serialVersionUID = 1L;
- private boolean isReduceMergePartial;
+ /**
+ * GLOBAL No key. All rows --> 1 full aggregation on end of input
+ *
+ * HASH Rows aggregated in to hash table on group key -->
+ * 1 partial aggregation per key (normally, unless there is spilling)
+ *
+ * MERGE_PARTIAL As first operator in a REDUCER, partial aggregations come grouped from
+ * reduce-shuffle -->
+ * aggregate the partial aggregations and emit full aggregation on
+ * endGroup / closeOp
+ *
+ * STREAMING Rows come from PARENT operator already grouped -->
+ * aggregate the rows and emit full aggregation on key change / closeOp
+ *
+ * NOTE: Hash can spill partial result rows prematurely if it runs low on memory.
+ * NOTE: Streaming has to compare keys where MergePartial gets an endGroup call.
+ */
+ public static enum ProcessingMode {
+ NONE,
+ GLOBAL,
+ HASH,
+ MERGE_PARTIAL,
+ STREAMING
+ };
- private boolean isVectorOutput;
+ private ProcessingMode processingMode;
- private boolean isReduceStreaming;
+ private boolean isVectorOutput;
public VectorGroupByDesc() {
- this.isReduceMergePartial = false;
+ this.processingMode = ProcessingMode.NONE;
this.isVectorOutput = false;
}
- public boolean isReduceMergePartial() {
- return isReduceMergePartial;
+ public void setProcessingMode(ProcessingMode processingMode) {
+ this.processingMode = processingMode;
}
-
- public void setIsReduceMergePartial(boolean isReduceMergePartial) {
- this.isReduceMergePartial = isReduceMergePartial;
+ public ProcessingMode getProcessingMode() {
+ return processingMode;
}
public boolean isVectorOutput() {
@@ -57,11 +79,39 @@ public class VectorGroupByDesc extends AbstractVectorDesc {
this.isVectorOutput = isVectorOutput;
}
- public void setIsReduceStreaming(boolean isReduceStreaming) {
- this.isReduceStreaming = isReduceStreaming;
- }
-
- public boolean isReduceStreaming() {
- return isReduceStreaming;
+ /**
+ * Which ProcessingMode for VectorGroupByOperator?
+ *
+ * Decides using GroupByDesc.Mode and whether there are keys.
+ *
+ * Mode.COMPLETE --> (numKeys == 0 ? ProcessingMode.GLOBAL : ProcessingMode.STREAMING)
+ *
+ * Mode.HASH --> ProcessingMode.HASH
+ *
+ * Mode.MERGEPARTIAL --> (numKeys == 0 ? ProcessingMode.GLOBAL : ProcessingMode.MERGE_PARTIAL)
+ *
+ * Mode.PARTIAL1,
+ * Mode.PARTIAL2,
+ * Mode.PARTIALS,
+ * Mode.FINAL --> ProcessingMode.STREAMING
+ *
+ */
+ public static ProcessingMode groupByDescModeToVectorProcessingMode(GroupByDesc.Mode mode,
+ boolean hasKeys) {
+ switch (mode) {
+ case COMPLETE:
+ return (hasKeys ? ProcessingMode.STREAMING : ProcessingMode.GLOBAL);
+ case HASH:
+ return ProcessingMode.HASH;
+ case MERGEPARTIAL:
+ return (hasKeys ? ProcessingMode.MERGE_PARTIAL : ProcessingMode.GLOBAL);
+ case PARTIAL1:
+ case PARTIAL2:
+ case PARTIALS:
+ case FINAL:
+ return ProcessingMode.STREAMING;
+ default:
+ throw new RuntimeException("Unexpected GROUP BY mode " + mode.name());
+ }
}
}