You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2017/01/30 07:33:34 UTC
[3/3] hive git commit: HIVE-15698: Vectorization support for
min/max/bloomfilter runtime filtering (Jason Dere, reviewed by Matt McCline)
HIVE-15698: Vectorization support for min/max/bloomfilter runtime filtering (Jason Dere, reviewed by Matt McCline)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/62ebd1ab
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/62ebd1ab
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/62ebd1ab
Branch: refs/heads/master
Commit: 62ebd1abb87d951024a371ba37a451f536a594e6
Parents: 79eb224
Author: Jason Dere <jd...@hortonworks.com>
Authored: Sun Jan 29 23:33:02 2017 -0800
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Sun Jan 29 23:33:02 2017 -0800
----------------------------------------------------------------------
.../apache/hadoop/hive/ant/GenVectorCode.java | 83 ++
.../test/resources/testconfiguration.properties | 1 +
.../FilterColumnBetweenDynamicValue.txt | 101 ++
.../FilterDecimalColumnBetween.txt | 16 +
.../FilterStringColumnBetween.txt | 8 +-
.../FilterTimestampColumnBetween.txt | 16 +
.../FilterTruncStringColumnBetween.txt | 8 +-
.../ql/exec/tez/DynamicValueRegistryTez.java | 12 +-
.../exec/vector/VectorExpressionDescriptor.java | 3 +-
.../ql/exec/vector/VectorFilterOperator.java | 2 +
.../ql/exec/vector/VectorSelectOperator.java | 1 +
.../ql/exec/vector/VectorizationContext.java | 77 +-
.../DynamicValueVectorExpression.java | 314 +++++++
.../vector/expressions/VectorExpression.java | 13 +-
.../VectorInBloomFilterColDynamicValue.java | 285 ++++++
.../aggregates/VectorUDAFBloomFilter.java | 474 ++++++++++
.../aggregates/VectorUDAFBloomFilterMerge.java | 365 ++++++++
.../hive/ql/optimizer/physical/Vectorizer.java | 2 +
.../ql/udf/generic/GenericUDAFBloomFilter.java | 10 +-
.../ql/udf/generic/GenericUDFInBloomFilter.java | 12 +-
.../ql/optimizer/physical/TestVectorizer.java | 31 +
.../test/queries/clientpositive/explainuser_3.q | 1 +
.../vectorized_dynamic_semijoin_reduction.q | 43 +
.../results/clientpositive/llap/mergejoin.q.out | 66 +-
.../results/clientpositive/llap/orc_llap.q.out | 12 +-
.../llap/vector_binary_join_groupby.q.out | 4 +-
.../vectorized_dynamic_partition_pruning.q.out | 4 +-
.../vectorized_dynamic_semijoin_reduction.q.out | 932 +++++++++++++++++++
.../clientpositive/tez/explainuser_3.q.out | 17 +-
.../vector_binary_join_groupby.q.out | 1 +
.../apache/hive/common/util/BloomFilter.java | 36 +
.../hive/common/util/TestBloomFilter.java | 125 +++
32 files changed, 3000 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ant/src/org/apache/hadoop/hive/ant/GenVectorCode.java
----------------------------------------------------------------------
diff --git a/ant/src/org/apache/hadoop/hive/ant/GenVectorCode.java b/ant/src/org/apache/hadoop/hive/ant/GenVectorCode.java
index e9fe8fa..133ef0a 100644
--- a/ant/src/org/apache/hadoop/hive/ant/GenVectorCode.java
+++ b/ant/src/org/apache/hadoop/hive/ant/GenVectorCode.java
@@ -789,6 +789,15 @@ public class GenVectorCode extends Task {
{"FilterTimestampColumnBetween", ""},
{"FilterTimestampColumnBetween", "!"},
+ // This is for runtime min/max pushdown - don't need to do NOT BETWEEN
+ {"FilterColumnBetweenDynamicValue", "long", ""},
+ {"FilterColumnBetweenDynamicValue", "double", ""},
+ {"FilterColumnBetweenDynamicValue", "decimal", ""},
+ {"FilterColumnBetweenDynamicValue", "string", ""},
+ {"FilterColumnBetweenDynamicValue", "char", ""},
+ {"FilterColumnBetweenDynamicValue", "varchar", ""},
+ {"FilterColumnBetweenDynamicValue", "timestamp", ""},
+
{"ColumnCompareColumn", "Equal", "long", "double", "=="},
{"ColumnCompareColumn", "Equal", "double", "double", "=="},
{"ColumnCompareColumn", "NotEqual", "long", "double", "!="},
@@ -1164,6 +1173,8 @@ public class GenVectorCode extends Task {
} else if (tdesc[0].equals("FilterColumnBetween")) {
generateFilterColumnBetween(tdesc);
+ } else if (tdesc[0].equals("FilterColumnBetweenDynamicValue")) {
+ generateFilterColumnBetweenDynamicValue(tdesc);
} else if (tdesc[0].equals("ScalarArithmeticColumn") || tdesc[0].equals("ScalarDivideColumn")) {
generateScalarArithmeticColumn(tdesc);
} else if (tdesc[0].equals("FilterColumnCompareColumn")) {
@@ -1379,6 +1390,72 @@ public class GenVectorCode extends Task {
className, templateString);
}
+ private void generateFilterColumnBetweenDynamicValue(String[] tdesc) throws Exception {
+ String operandType = tdesc[1];
+ String optionalNot = tdesc[2];
+
+ String className = "Filter" + getCamelCaseType(operandType) + "Column" +
+ (optionalNot.equals("!") ? "Not" : "") + "BetweenDynamicValue";
+
+ String typeName = getCamelCaseType(operandType);
+ String defaultValue;
+ String vectorType;
+ String getPrimitiveMethod;
+ String getValueMethod;
+
+ if (operandType.equals("long")) {
+ defaultValue = "0";
+ vectorType = "long";
+ getPrimitiveMethod = "getLong";
+ getValueMethod = "";
+ } else if (operandType.equals("double")) {
+ defaultValue = "0";
+ vectorType = "double";
+ getPrimitiveMethod = "getDouble";
+ getValueMethod = "";
+ } else if (operandType.equals("decimal")) {
+ defaultValue = "null";
+ vectorType = "HiveDecimal";
+ getPrimitiveMethod = "getHiveDecimal";
+ getValueMethod = "";
+ } else if (operandType.equals("string")) {
+ defaultValue = "null";
+ vectorType = "byte[]";
+ getPrimitiveMethod = "getString";
+ getValueMethod = ".getBytes()";
+ } else if (operandType.equals("char")) {
+ defaultValue = "null";
+ vectorType = "byte[]";
+ getPrimitiveMethod = "getHiveChar";
+ getValueMethod = ".getStrippedValue().getBytes()"; // Does vectorization use stripped char values?
+ } else if (operandType.equals("varchar")) {
+ defaultValue = "null";
+ vectorType = "byte[]";
+ getPrimitiveMethod = "getHiveVarchar";
+ getValueMethod = ".getValue().getBytes()";
+ } else if (operandType.equals("timestamp")) {
+ defaultValue = "null";
+ vectorType = "Timestamp";
+ getPrimitiveMethod = "getTimestamp";
+ getValueMethod = "";
+ } else {
+ throw new IllegalArgumentException("Type " + operandType + " not supported");
+ }
+
+ // Read the template into a string, expand it, and write it.
+ File templateFile = new File(joinPath(this.expressionTemplateDirectory, tdesc[0] + ".txt"));
+ String templateString = readFile(templateFile);
+ templateString = templateString.replaceAll("<ClassName>", className);
+ templateString = templateString.replaceAll("<TypeName>", typeName);
+ templateString = templateString.replaceAll("<DefaultValue>", defaultValue);
+ templateString = templateString.replaceAll("<VectorType>", vectorType);
+ templateString = templateString.replaceAll("<GetPrimitiveMethod>", getPrimitiveMethod);
+ templateString = templateString.replaceAll("<GetValueMethod>", getValueMethod);
+
+ writeFile(templateFile.lastModified(), expressionOutputDirectory, expressionClassesDirectory,
+ className, templateString);
+ }
+
private void generateColumnCompareColumn(String[] tdesc) throws Exception {
String operatorName = tdesc[1];
String operandType1 = tdesc[2];
@@ -3084,6 +3161,12 @@ public class GenVectorCode extends Task {
return "Timestamp";
} else if (type.equals("date")) {
return "Date";
+ } else if (type.equals("string")) {
+ return "String";
+ } else if (type.equals("char")) {
+ return "Char";
+ } else if (type.equals("varchar")) {
+ return "VarChar";
} else {
return type;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index e966959..8b3f589 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -611,6 +611,7 @@ minillaplocal.query.files=acid_globallimit.q,\
vector_udf1.q,\
vectorization_short_regress.q,\
vectorized_dynamic_partition_pruning.q,\
+ vectorized_dynamic_semijoin_reduction.q,\
vectorized_ptf.q,\
windowing.q,\
windowing_gby.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/gen/vectorization/ExpressionTemplates/FilterColumnBetweenDynamicValue.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/ExpressionTemplates/FilterColumnBetweenDynamicValue.txt b/ql/src/gen/vectorization/ExpressionTemplates/FilterColumnBetweenDynamicValue.txt
new file mode 100644
index 0000000..97ab7aa
--- /dev/null
+++ b/ql/src/gen/vectorization/ExpressionTemplates/FilterColumnBetweenDynamicValue.txt
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions.gen;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.gen.Filter<TypeName>ColumnBetween;
+import org.apache.hadoop.hive.ql.plan.DynamicValue;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.sql.Timestamp;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+
+public class <ClassName> extends Filter<TypeName>ColumnBetween {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(<ClassName>.class);
+
+ protected DynamicValue leftDynamicValue;
+ protected DynamicValue rightDynamicValue;
+ protected transient boolean initialized = false;
+ protected transient boolean isLeftOrRightNull = false;
+
+ public <ClassName>(int colNum, DynamicValue leftValue, DynamicValue rightValue) {
+ super(colNum, <DefaultValue>, <DefaultValue>);
+ this.leftDynamicValue = leftValue;
+ this.rightDynamicValue = rightValue;
+ }
+
+ public <ClassName>() {
+ }
+
+ public DynamicValue getLeftDynamicValue() {
+ return leftDynamicValue;
+ }
+
+ public void setLeftDynamicValue(DynamicValue leftValue) {
+ this.leftDynamicValue = leftValue;
+ }
+
+ public DynamicValue getRightDynamicValue() {
+ return rightDynamicValue;
+ }
+
+ public void getRightDynamicValue(DynamicValue rightValue) {
+ this.rightDynamicValue = rightValue;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ super.init(conf);
+ leftDynamicValue.setConf(conf);
+ rightDynamicValue.setConf(conf);
+ }
+
+ @Override
+ public void evaluate(VectorizedRowBatch batch) {
+ if (!initialized) {
+ Object lVal = leftDynamicValue.getValue();
+ Object rVal = rightDynamicValue.getValue();
+ if (lVal == null || rVal == null) {
+ isLeftOrRightNull = true;
+ } else {
+ <VectorType> min = PrimitiveObjectInspectorUtils.<GetPrimitiveMethod>(
+ lVal, leftDynamicValue.getObjectInspector())<GetValueMethod>;
+ setLeftValue(min);
+
+ <VectorType> max = PrimitiveObjectInspectorUtils.<GetPrimitiveMethod>(
+ rVal, rightDynamicValue.getObjectInspector())<GetValueMethod>;
+ setRightValue(max);
+ }
+ initialized = true;
+ }
+
+ // Special case for dynamic values - min/max can be null
+ if (isLeftOrRightNull) {
+ // Entire batch is filtered out
+ batch.size = 0;
+ }
+
+ super.evaluate(batch);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/gen/vectorization/ExpressionTemplates/FilterDecimalColumnBetween.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/ExpressionTemplates/FilterDecimalColumnBetween.txt b/ql/src/gen/vectorization/ExpressionTemplates/FilterDecimalColumnBetween.txt
index d68edfa..62d2254 100644
--- a/ql/src/gen/vectorization/ExpressionTemplates/FilterDecimalColumnBetween.txt
+++ b/ql/src/gen/vectorization/ExpressionTemplates/FilterDecimalColumnBetween.txt
@@ -154,6 +154,22 @@ public class <ClassName> extends VectorExpression {
return "boolean";
}
+ public HiveDecimal getLeftValue() {
+ return leftValue;
+ }
+
+ public void setLeftValue(HiveDecimal value) {
+ this.leftValue = value;
+ }
+
+ public HiveDecimal getRightValue() {
+ return rightValue;
+ }
+
+ public void setRightValue(HiveDecimal value) {
+ this.rightValue = value;
+ }
+
@Override
public VectorExpressionDescriptor.Descriptor getDescriptor() {
return (new VectorExpressionDescriptor.Builder())
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/gen/vectorization/ExpressionTemplates/FilterStringColumnBetween.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/ExpressionTemplates/FilterStringColumnBetween.txt b/ql/src/gen/vectorization/ExpressionTemplates/FilterStringColumnBetween.txt
index e8049da..16d4aaf 100644
--- a/ql/src/gen/vectorization/ExpressionTemplates/FilterStringColumnBetween.txt
+++ b/ql/src/gen/vectorization/ExpressionTemplates/FilterStringColumnBetween.txt
@@ -161,19 +161,19 @@ public class <ClassName> extends VectorExpression {
this.colNum = colNum;
}
- public byte[] getLeft() {
+ public byte[] getLeftValue() {
return left;
}
- public void setLeft(byte[] value) {
+ public void setLeftValue(byte[] value) {
this.left = value;
}
- public byte[] getRight() {
+ public byte[] getRightValue() {
return right;
}
- public void setRight(byte[] value) {
+ public void setRightValue(byte[] value) {
this.right = value;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/gen/vectorization/ExpressionTemplates/FilterTimestampColumnBetween.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/ExpressionTemplates/FilterTimestampColumnBetween.txt b/ql/src/gen/vectorization/ExpressionTemplates/FilterTimestampColumnBetween.txt
index 4298d79..806148f 100644
--- a/ql/src/gen/vectorization/ExpressionTemplates/FilterTimestampColumnBetween.txt
+++ b/ql/src/gen/vectorization/ExpressionTemplates/FilterTimestampColumnBetween.txt
@@ -153,6 +153,22 @@ public class <ClassName> extends VectorExpression {
return "boolean";
}
+ public Timestamp getLeftValue() {
+ return leftValue;
+ }
+
+ public void setLeftValue(Timestamp value) {
+ this.leftValue = value;
+ }
+
+ public Timestamp getRightValue() {
+ return rightValue;
+ }
+
+ public void setRightValue(Timestamp value) {
+ this.rightValue = value;
+ }
+
@Override
public VectorExpressionDescriptor.Descriptor getDescriptor() {
return (new VectorExpressionDescriptor.Builder())
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/gen/vectorization/ExpressionTemplates/FilterTruncStringColumnBetween.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/ExpressionTemplates/FilterTruncStringColumnBetween.txt b/ql/src/gen/vectorization/ExpressionTemplates/FilterTruncStringColumnBetween.txt
index 94a174d..d350dcb 100644
--- a/ql/src/gen/vectorization/ExpressionTemplates/FilterTruncStringColumnBetween.txt
+++ b/ql/src/gen/vectorization/ExpressionTemplates/FilterTruncStringColumnBetween.txt
@@ -163,19 +163,19 @@ public class <ClassName> extends VectorExpression {
this.colNum = colNum;
}
- public byte[] getLeft() {
+ public byte[] getLeftValue() {
return left;
}
- public void setLeft(byte[] value) {
+ public void setLeftValue(byte[] value) {
this.left = value;
}
- public byte[] getRight() {
+ public byte[] getRightValue() {
return right;
}
- public void setRight(byte[] value) {
+ public void setRightValue(byte[] value) {
this.right = value;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java
index 7bbedf6..b7687c5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicValueRegistryTez.java
@@ -122,9 +122,15 @@ public class DynamicValueRegistryTez implements DynamicValueRegistry {
setValue(runtimeValuesInfo.getDynamicValueIDs().get(colIdx), val);
}
}
- // For now, expecting a single row (min/max, aggregated bloom filter)
- if (rowCount != 1) {
- throw new IllegalStateException("Expected 1 row from " + inputSourceName + ", got " + rowCount);
+ // For now, expecting a single row (min/max, aggregated bloom filter), or no rows
+ if (rowCount == 0) {
+ LOG.debug("No input rows from " + inputSourceName + ", filling dynamic values with nulls");
+ for (int colIdx = 0; colIdx < colExprEvaluators.size(); ++colIdx) {
+ ExprNodeEvaluator eval = colExprEvaluators.get(colIdx);
+ setValue(runtimeValuesInfo.getDynamicValueIDs().get(colIdx), null);
+ }
+ } else if (rowCount > 1) {
+ throw new IllegalStateException("Expected 0 or 1 rows from " + inputSourceName + ", got " + rowCount);
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java
index 217af3f..f4499d7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java
@@ -183,7 +183,8 @@ public class VectorExpressionDescriptor {
public enum InputExpressionType {
NONE(0),
COLUMN(1),
- SCALAR(2);
+ SCALAR(2),
+ DYNAMICVALUE(3);
private final int value;
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
index 261246b..2598445 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
@@ -72,6 +72,8 @@ public class VectorFilterOperator extends FilterOperator {
try {
heartbeatInterval = HiveConf.getIntVar(hconf,
HiveConf.ConfVars.HIVESENDHEARTBEAT);
+
+ conditionEvaluator.init(hconf);
} catch (Throwable e) {
throw new HiveException(e);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
index f7fec8f..bb382b1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
@@ -113,6 +113,7 @@ public class VectorSelectOperator extends Operator<SelectDesc> implements
projectedColumns = new int [vExpressions.length];
for (int i = 0; i < projectedColumns.length; i++) {
+ vExpressions[i].init(hconf);
projectedColumns[i] = vExpressions[i].getOutputColumn();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
index c887757..484f615 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
@@ -54,6 +54,8 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.*;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFAvgDecimal;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFAvgTimestamp;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFBloomFilter;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFBloomFilterMerge;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCount;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCountMerge;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCountStar;
@@ -97,6 +99,7 @@ import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.udf.SettableUDF;
@@ -585,6 +588,8 @@ public class VectorizationContext {
} else if (exprDesc instanceof ExprNodeConstantDesc) {
ve = getConstantVectorExpression(((ExprNodeConstantDesc) exprDesc).getValue(), exprDesc.getTypeInfo(),
mode);
+ } else if (exprDesc instanceof ExprNodeDynamicValueDesc) {
+ ve = getDynamicValueVectorExpression((ExprNodeDynamicValueDesc) exprDesc, mode);
}
if (ve == null) {
throw new HiveException(
@@ -1094,6 +1099,21 @@ public class VectorizationContext {
}
}
+ private VectorExpression getDynamicValueVectorExpression(ExprNodeDynamicValueDesc dynamicValueExpr,
+ VectorExpressionDescriptor.Mode mode) throws HiveException {
+ String typeName = dynamicValueExpr.getTypeInfo().getTypeName();
+ VectorExpressionDescriptor.ArgumentType vectorArgType = VectorExpressionDescriptor.ArgumentType.fromHiveTypeName(typeName);
+ if (vectorArgType == VectorExpressionDescriptor.ArgumentType.NONE) {
+ throw new HiveException("No vector argument type for type name " + typeName);
+ }
+ int outCol = -1;
+ if (mode == VectorExpressionDescriptor.Mode.PROJECTION) {
+ outCol = ocm.allocateOutputColumn(dynamicValueExpr.getTypeInfo());
+ }
+
+ return new DynamicValueVectorExpression(outCol, dynamicValueExpr.getTypeInfo(), dynamicValueExpr.getDynamicValue());
+ }
+
/**
* Used as a fast path for operations that don't modify their input, like unary +
* and casting boolean to long. IdentityExpression and its children are always
@@ -1181,6 +1201,8 @@ public class VectorizationContext {
builder.setInputExpressionType(i, InputExpressionType.COLUMN);
} else if (child instanceof ExprNodeConstantDesc) {
builder.setInputExpressionType(i, InputExpressionType.SCALAR);
+ } else if (child instanceof ExprNodeDynamicValueDesc) {
+ builder.setInputExpressionType(i, InputExpressionType.DYNAMICVALUE);
} else {
throw new HiveException("Cannot handle expression type: " + child.getClass().getSimpleName());
}
@@ -1225,6 +1247,8 @@ public class VectorizationContext {
} else if (child instanceof ExprNodeConstantDesc) {
Object scalarValue = getVectorTypeScalarValue((ExprNodeConstantDesc) child);
arguments[i] = (null == scalarValue) ? getConstantVectorExpression(null, child.getTypeInfo(), childrenMode) : scalarValue;
+ } else if (child instanceof ExprNodeDynamicValueDesc) {
+ arguments[i] = ((ExprNodeDynamicValueDesc) child).getDynamicValue();
} else {
throw new HiveException("Cannot handle expression type: " + child.getClass().getSimpleName());
}
@@ -2092,8 +2116,13 @@ public class VectorizationContext {
return null;
}
+ boolean hasDynamicValues = false;
+
// We don't currently support the BETWEEN ends being columns. They must be scalars.
- if (!(childExpr.get(2) instanceof ExprNodeConstantDesc) ||
+ if ((childExpr.get(2) instanceof ExprNodeDynamicValueDesc) &&
+ (childExpr.get(3) instanceof ExprNodeDynamicValueDesc)) {
+ hasDynamicValues = true;
+ } else if (!(childExpr.get(2) instanceof ExprNodeConstantDesc) ||
!(childExpr.get(3) instanceof ExprNodeConstantDesc)) {
return null;
}
@@ -2138,35 +2167,51 @@ public class VectorizationContext {
// determine class
Class<?> cl = null;
if (isIntFamily(colType) && !notKeywordPresent) {
- cl = FilterLongColumnBetween.class;
+ cl = (hasDynamicValues ?
+ FilterLongColumnBetweenDynamicValue.class :
+ FilterLongColumnBetween.class);
} else if (isIntFamily(colType) && notKeywordPresent) {
cl = FilterLongColumnNotBetween.class;
} else if (isFloatFamily(colType) && !notKeywordPresent) {
- cl = FilterDoubleColumnBetween.class;
+ cl = (hasDynamicValues ?
+ FilterDoubleColumnBetweenDynamicValue.class :
+ FilterDoubleColumnBetween.class);
} else if (isFloatFamily(colType) && notKeywordPresent) {
cl = FilterDoubleColumnNotBetween.class;
} else if (colType.equals("string") && !notKeywordPresent) {
- cl = FilterStringColumnBetween.class;
+ cl = (hasDynamicValues ?
+ FilterStringColumnBetweenDynamicValue.class :
+ FilterStringColumnBetween.class);
} else if (colType.equals("string") && notKeywordPresent) {
cl = FilterStringColumnNotBetween.class;
} else if (varcharTypePattern.matcher(colType).matches() && !notKeywordPresent) {
- cl = FilterVarCharColumnBetween.class;
+ cl = (hasDynamicValues ?
+ FilterVarCharColumnBetweenDynamicValue.class :
+ FilterVarCharColumnBetween.class);
} else if (varcharTypePattern.matcher(colType).matches() && notKeywordPresent) {
cl = FilterVarCharColumnNotBetween.class;
} else if (charTypePattern.matcher(colType).matches() && !notKeywordPresent) {
- cl = FilterCharColumnBetween.class;
+ cl = (hasDynamicValues ?
+ FilterCharColumnBetweenDynamicValue.class :
+ FilterCharColumnBetween.class);
} else if (charTypePattern.matcher(colType).matches() && notKeywordPresent) {
cl = FilterCharColumnNotBetween.class;
} else if (colType.equals("timestamp") && !notKeywordPresent) {
- cl = FilterTimestampColumnBetween.class;
+ cl = (hasDynamicValues ?
+ FilterTimestampColumnBetweenDynamicValue.class :
+ FilterTimestampColumnBetween.class);
} else if (colType.equals("timestamp") && notKeywordPresent) {
cl = FilterTimestampColumnNotBetween.class;
} else if (isDecimalFamily(colType) && !notKeywordPresent) {
- cl = FilterDecimalColumnBetween.class;
+ cl = (hasDynamicValues ?
+ FilterDecimalColumnBetweenDynamicValue.class :
+ FilterDecimalColumnBetween.class);
} else if (isDecimalFamily(colType) && notKeywordPresent) {
cl = FilterDecimalColumnNotBetween.class;
} else if (isDateFamily(colType) && !notKeywordPresent) {
- cl = FilterLongColumnBetween.class;
+ cl = (hasDynamicValues ?
+ FilterLongColumnBetweenDynamicValue.class :
+ FilterLongColumnBetween.class);
} else if (isDateFamily(colType) && notKeywordPresent) {
cl = FilterLongColumnNotBetween.class;
}
@@ -2224,6 +2269,12 @@ public class VectorizationContext {
} else if (child instanceof ExprNodeConstantDesc) {
// this is a constant (or null)
argDescs[i].setConstant((ExprNodeConstantDesc) child);
+ } else if (child instanceof ExprNodeDynamicValueDesc) {
+ VectorExpression e = getVectorExpression(child, VectorExpressionDescriptor.Mode.PROJECTION);
+ vectorExprs.add(e);
+ variableArgPositions.add(i);
+ exprResultColumnNums.add(e.getOutputColumn());
+ argDescs[i].setVariable(e.getOutputColumn());
} else {
throw new HiveException("Unable to vectorize custom UDF. Encountered unsupported expr desc : "
+ child);
@@ -2651,6 +2702,14 @@ public class VectorizationContext {
add(new AggregateDefinition("stddev_samp", ArgumentType.FLOAT_FAMILY, Mode.PARTIAL1, VectorUDAFStdSampDouble.class));
add(new AggregateDefinition("stddev_samp", ArgumentType.DECIMAL, Mode.PARTIAL1, VectorUDAFStdSampDecimal.class));
add(new AggregateDefinition("stddev_samp", ArgumentType.TIMESTAMP, Mode.PARTIAL1, VectorUDAFStdSampTimestamp.class));
+
+ // UDAFBloomFilter. Original data is one type, partial/final is another,
+ // so this requires 2 aggregation classes (partial1/complete), (partial2/final)
+ add(new AggregateDefinition("bloom_filter", ArgumentType.ALL_FAMILY, Mode.PARTIAL1, VectorUDAFBloomFilter.class));
+ add(new AggregateDefinition("bloom_filter", ArgumentType.ALL_FAMILY, Mode.COMPLETE, VectorUDAFBloomFilter.class));
+ add(new AggregateDefinition("bloom_filter", ArgumentType.BINARY, Mode.PARTIAL2, VectorUDAFBloomFilterMerge.class));
+ add(new AggregateDefinition("bloom_filter", ArgumentType.BINARY, Mode.FINAL, VectorUDAFBloomFilterMerge.class));
+
}};
public VectorAggregateExpression getAggregatorExpression(AggregationDesc desc)
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/DynamicValueVectorExpression.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/DynamicValueVectorExpression.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/DynamicValueVectorExpression.java
new file mode 100644
index 0000000..1a34118
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/DynamicValueVectorExpression.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions;
+
+import java.sql.Timestamp;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
+import org.apache.hadoop.hive.ql.exec.vector.*;
+import org.apache.hadoop.hive.ql.plan.DynamicValue;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Constant is represented as a vector with repeating values.
+ */
+public class DynamicValueVectorExpression extends VectorExpression {
+ private static final Logger LOG = LoggerFactory.getLogger(DynamicValueVectorExpression.class);
+
+ private static final long serialVersionUID = 1L;
+
+ DynamicValue dynamicValue;
+ TypeInfo typeInfo;
+ transient private boolean initialized = false;
+
+ private int outputColumn;
+ protected long longValue = 0;
+ private double doubleValue = 0;
+ private byte[] bytesValue = null;
+ private HiveDecimal decimalValue = null;
+ private Timestamp timestampValue = null;
+ private HiveIntervalDayTime intervalDayTimeValue = null;
+ private boolean isNullValue = false;
+
+ private ColumnVector.Type type;
+ private int bytesValueLength = 0;
+
+ public DynamicValueVectorExpression() {
+ super();
+ }
+
+ public DynamicValueVectorExpression(int outputColumn, TypeInfo typeInfo, DynamicValue dynamicValue) {
+ this();
+ this.outputColumn = outputColumn;
+ this.type = VectorizationContext.getColumnVectorTypeFromTypeInfo(typeInfo);
+ this.dynamicValue = dynamicValue;
+ this.typeInfo = typeInfo;
+ }
+
+ private void evaluateLong(VectorizedRowBatch vrg) {
+ LongColumnVector cv = (LongColumnVector) vrg.cols[outputColumn];
+ cv.isRepeating = true;
+ cv.noNulls = !isNullValue;
+ if (!isNullValue) {
+ cv.vector[0] = longValue;
+ cv.isNull[0] = false;
+ } else {
+ cv.isNull[0] = true;
+ }
+ }
+
+ private void evaluateDouble(VectorizedRowBatch vrg) {
+ DoubleColumnVector cv = (DoubleColumnVector) vrg.cols[outputColumn];
+ cv.isRepeating = true;
+ cv.noNulls = !isNullValue;
+ if (!isNullValue) {
+ cv.vector[0] = doubleValue;
+ cv.isNull[0] = false;
+ } else {
+ cv.isNull[0] = true;
+ }
+ }
+
+ private void evaluateBytes(VectorizedRowBatch vrg) {
+ BytesColumnVector cv = (BytesColumnVector) vrg.cols[outputColumn];
+ cv.isRepeating = true;
+ cv.noNulls = !isNullValue;
+ cv.initBuffer();
+ if (!isNullValue) {
+ cv.setVal(0, bytesValue, 0, bytesValueLength);
+ cv.isNull[0] = false;
+ } else {
+ cv.isNull[0] = true;
+ }
+ }
+
+ private void evaluateDecimal(VectorizedRowBatch vrg) {
+ DecimalColumnVector dcv = (DecimalColumnVector) vrg.cols[outputColumn];
+ dcv.isRepeating = true;
+ dcv.noNulls = !isNullValue;
+ if (!isNullValue) {
+ dcv.vector[0].set(decimalValue);
+ dcv.isNull[0] = false;
+ } else {
+ dcv.isNull[0] = true;
+ }
+ }
+
+ private void evaluateTimestamp(VectorizedRowBatch vrg) {
+ TimestampColumnVector dcv = (TimestampColumnVector) vrg.cols[outputColumn];
+ dcv.isRepeating = true;
+ dcv.noNulls = !isNullValue;
+ if (!isNullValue) {
+ dcv.set(0, timestampValue);
+ dcv.isNull[0] = false;
+ } else {
+ dcv.isNull[0] = true;
+ }
+ }
+
+ private void evaluateIntervalDayTime(VectorizedRowBatch vrg) {
+ IntervalDayTimeColumnVector dcv = (IntervalDayTimeColumnVector) vrg.cols[outputColumn];
+ dcv.isRepeating = true;
+ dcv.noNulls = !isNullValue;
+ if (!isNullValue) {
+ dcv.set(0, intervalDayTimeValue);
+ dcv.isNull[0] = false;
+ } else {
+ dcv.isNull[0] = true;
+ }
+ }
+
+ private void initValue() {
+ Object val = dynamicValue.getValue();
+
+ if (val == null) {
+ isNullValue = true;
+ } else {
+ PrimitiveObjectInspector poi = dynamicValue.getObjectInspector();
+ byte[] bytesVal;
+ switch (poi.getPrimitiveCategory()) {
+ case BOOLEAN:
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ longValue = PrimitiveObjectInspectorUtils.getLong(val, poi);
+ break;
+ case FLOAT:
+ case DOUBLE:
+ doubleValue = PrimitiveObjectInspectorUtils.getDouble(val, poi);
+ break;
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ bytesVal = PrimitiveObjectInspectorUtils.getString(val, poi).getBytes();
+ setBytesValue(bytesVal);
+ break;
+ case BINARY:
+ bytesVal = PrimitiveObjectInspectorUtils.getBinary(val, poi).copyBytes();
+ setBytesValue(bytesVal);
+ break;
+ case DECIMAL:
+ decimalValue = PrimitiveObjectInspectorUtils.getHiveDecimal(val, poi);
+ break;
+ case DATE:
+ longValue = DateWritable.dateToDays(PrimitiveObjectInspectorUtils.getDate(val, poi));
+ case TIMESTAMP:
+ timestampValue = PrimitiveObjectInspectorUtils.getTimestamp(val, poi);
+ break;
+ case INTERVAL_YEAR_MONTH:
+ longValue = PrimitiveObjectInspectorUtils.getHiveIntervalYearMonth(val, poi).getTotalMonths();
+ break;
+ case INTERVAL_DAY_TIME:
+ intervalDayTimeValue = PrimitiveObjectInspectorUtils.getHiveIntervalDayTime(val, poi);
+ break;
+ default:
+ throw new IllegalStateException("Unsupported type " + poi.getPrimitiveCategory());
+ }
+ }
+
+ initialized = true;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ super.init(conf);
+ dynamicValue.setConf(conf);
+ }
+
+ @Override
+ public void evaluate(VectorizedRowBatch vrg) {
+ if (!initialized) {
+ initValue();
+ }
+
+ switch (type) {
+ case LONG:
+ evaluateLong(vrg);
+ break;
+ case DOUBLE:
+ evaluateDouble(vrg);
+ break;
+ case BYTES:
+ evaluateBytes(vrg);
+ break;
+ case DECIMAL:
+ evaluateDecimal(vrg);
+ break;
+ case TIMESTAMP:
+ evaluateTimestamp(vrg);
+ break;
+ case INTERVAL_DAY_TIME:
+ evaluateIntervalDayTime(vrg);
+ break;
+ default:
+ throw new IllegalStateException("Unsupported type " + type);
+ }
+ }
+
+ @Override
+ public int getOutputColumn() {
+ return outputColumn;
+ }
+
+ public long getLongValue() {
+ return longValue;
+ }
+
+ public void setLongValue(long longValue) {
+ this.longValue = longValue;
+ }
+
+ public double getDoubleValue() {
+ return doubleValue;
+ }
+
+ public void setDoubleValue(double doubleValue) {
+ this.doubleValue = doubleValue;
+ }
+
+ public byte[] getBytesValue() {
+ return bytesValue;
+ }
+
+ public void setBytesValue(byte[] bytesValue) {
+ this.bytesValue = bytesValue.clone();
+ this.bytesValueLength = bytesValue.length;
+ }
+
+ public void setDecimalValue(HiveDecimal decimalValue) {
+ this.decimalValue = decimalValue;
+ }
+
+ public HiveDecimal getDecimalValue() {
+ return decimalValue;
+ }
+
+ public void setTimestampValue(Timestamp timestampValue) {
+ this.timestampValue = timestampValue;
+ }
+
+ public Timestamp getTimestampValue() {
+ return timestampValue;
+ }
+
+ public void setIntervalDayTimeValue(HiveIntervalDayTime intervalDayTimeValue) {
+ this.intervalDayTimeValue = intervalDayTimeValue;
+ }
+
+ public HiveIntervalDayTime getIntervalDayTimeValue() {
+ return intervalDayTimeValue;
+ }
+
+ public String getTypeString() {
+ return getOutputType();
+ }
+
+ public void setOutputColumn(int outputColumn) {
+ this.outputColumn = outputColumn;
+ }
+
+ @Override
+ public VectorExpressionDescriptor.Descriptor getDescriptor() {
+ return (new VectorExpressionDescriptor.Builder()).build();
+ }
+
+ public DynamicValue getDynamicValue() {
+ return dynamicValue;
+ }
+
+ public void setDynamicValue(DynamicValue dynamicValue) {
+ this.dynamicValue = dynamicValue;
+ }
+
+ public TypeInfo getTypeInfo() {
+ return typeInfo;
+ }
+
+ public void setTypeInfo(TypeInfo typeInfo) {
+ this.typeInfo = typeInfo;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java
index 8fca8a1..218f306 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpression.java
@@ -22,6 +22,8 @@ import java.io.Serializable;
import java.util.Map;
import com.google.common.collect.ImmutableMap;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
@@ -31,7 +33,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
public abstract class VectorExpression implements Serializable {
public enum Type {
STRING, CHAR, VARCHAR, TIMESTAMP, DATE, LONG, DOUBLE, DECIMAL,
- INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME, OTHER;
+ INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME, BINARY, OTHER;
private static Map<String, Type> types = ImmutableMap.<String, Type>builder()
.put("string", STRING)
.put("char", CHAR)
@@ -43,6 +45,7 @@ public abstract class VectorExpression implements Serializable {
.put("decimal", DECIMAL)
.put("interval_year_month", INTERVAL_YEAR_MONTH)
.put("interval_day_time", INTERVAL_DAY_TIME)
+ .put("binary", BINARY)
.build();
public static Type getValue(String name) {
@@ -76,6 +79,14 @@ public abstract class VectorExpression implements Serializable {
*/
public abstract void evaluate(VectorizedRowBatch batch);
+ public void init(Configuration conf) {
+ if (childExpressions != null) {
+ for (VectorExpression child : childExpressions) {
+ child.init(conf);
+ }
+ }
+ }
+
/**
* Returns the index of the output column in the array
* of column vectors. If not applicable, -1 is returned.
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorInBloomFilterColDynamicValue.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorInBloomFilterColDynamicValue.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorInBloomFilterColDynamicValue.java
new file mode 100644
index 0000000..188a87e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorInBloomFilterColDynamicValue.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions;
+
+import java.io.ByteArrayInputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Descriptor;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.plan.DynamicValue;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
+import org.apache.hive.common.util.BloomFilter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VectorInBloomFilterColDynamicValue extends VectorExpression {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(VectorInBloomFilterColDynamicValue.class);
+
+ protected int colNum;
+ protected DynamicValue bloomFilterDynamicValue;
+ protected transient boolean initialized = false;
+ protected transient BloomFilter bloomFilter;
+ protected transient BloomFilterCheck bfCheck;
+
+ public VectorInBloomFilterColDynamicValue(int colNum, DynamicValue bloomFilterDynamicValue) {
+ this.colNum = colNum;
+ this.bloomFilterDynamicValue = bloomFilterDynamicValue;
+ }
+
+ public VectorInBloomFilterColDynamicValue() {
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ super.init(conf);
+ bloomFilterDynamicValue.setConf(conf);
+
+ // Instantiate BloomFilterCheck based on input column type
+ VectorExpression.Type colType = this.getInputTypes()[0];
+ switch (colType) {
+ case LONG:
+ case DATE:
+ bfCheck = new LongBloomFilterCheck();
+ break;
+ case DOUBLE:
+ bfCheck = new DoubleBloomFilterCheck();
+ break;
+ case DECIMAL:
+ bfCheck = new DecimalBloomFilterCheck();
+ break;
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
+ bfCheck = new BytesBloomFilterCheck();
+ break;
+ case TIMESTAMP:
+ bfCheck = new TimestampBloomFilterCheck();
+ break;
+ default:
+ throw new IllegalStateException("Unsupported type " + colType);
+ }
+ }
+
+ private void initValue() {
+ try {
+ Object val = bloomFilterDynamicValue.getValue();
+ if (val != null) {
+ BinaryObjectInspector boi = (BinaryObjectInspector) bloomFilterDynamicValue.getObjectInspector();
+ byte[] bytes = boi.getPrimitiveJavaObject(val);
+ bloomFilter = BloomFilter.deserialize(new ByteArrayInputStream(bytes));
+ } else {
+ bloomFilter = null;
+ }
+ initialized = true;
+ } catch (Exception err) {
+ throw new RuntimeException(err);
+ }
+ }
+
+ @Override
+ public void evaluate(VectorizedRowBatch batch) {
+ if (childExpressions != null) {
+ super.evaluateChildren(batch);
+ }
+
+ if (!initialized) {
+ initValue();
+ }
+
+ ColumnVector inputColVector = batch.cols[colNum];
+ int[] sel = batch.selected;
+ boolean[] nullPos = inputColVector.isNull;
+ int n = batch.size;
+
+ // return immediately if batch is empty
+ if (n == 0) {
+ return;
+ }
+
+ // In case the dynamic value resolves to a null value
+ if (bloomFilter == null) {
+ batch.size = 0;
+ }
+
+ if (inputColVector.noNulls) {
+ if (inputColVector.isRepeating) {
+
+ // All must be selected otherwise size would be zero. Repeating property will not change.
+ if (!(bfCheck.checkValue(inputColVector, 0))) {
+
+ //Entire batch is filtered out.
+ batch.size = 0;
+ }
+ } else if (batch.selectedInUse) {
+ int newSize = 0;
+ for(int j=0; j != n; j++) {
+ int i = sel[j];
+ if (bfCheck.checkValue(inputColVector, i)) {
+ sel[newSize++] = i;
+ }
+ }
+ batch.size = newSize;
+ } else {
+ int newSize = 0;
+ for(int i = 0; i != n; i++) {
+ if (bfCheck.checkValue(inputColVector, i)) {
+ sel[newSize++] = i;
+ }
+ }
+ if (newSize < n) {
+ batch.size = newSize;
+ batch.selectedInUse = true;
+ }
+ }
+ } else {
+ if (inputColVector.isRepeating) {
+
+ // All must be selected otherwise size would be zero. Repeating property will not change.
+ if (!nullPos[0]) {
+ if (!(bfCheck.checkValue(inputColVector, 0))) {
+
+ //Entire batch is filtered out.
+ batch.size = 0;
+ }
+ } else {
+ batch.size = 0;
+ }
+ } else if (batch.selectedInUse) {
+ int newSize = 0;
+ for(int j=0; j != n; j++) {
+ int i = sel[j];
+ if (!nullPos[i]) {
+ if (bfCheck.checkValue(inputColVector, i)) {
+ sel[newSize++] = i;
+ }
+ }
+ }
+
+ //Change the selected vector
+ batch.size = newSize;
+ } else {
+ int newSize = 0;
+ for(int i = 0; i != n; i++) {
+ if (!nullPos[i]) {
+ if (bfCheck.checkValue(inputColVector, i)) {
+ sel[newSize++] = i;
+ }
+ }
+ }
+ if (newSize < n) {
+ batch.size = newSize;
+ batch.selectedInUse = true;
+ }
+ }
+ }
+ }
+
+ @Override
+ public int getOutputColumn() {
+ return -1;
+ }
+
+ @Override
+ public String getOutputType() {
+ return "boolean";
+ }
+
+ public int getColNum() {
+ return colNum;
+ }
+
+ public void setColNum(int colNum) {
+ this.colNum = colNum;
+ }
+
+ @Override
+ public Descriptor getDescriptor() {
+ VectorExpressionDescriptor.Builder b = new VectorExpressionDescriptor.Builder();
+ b.setMode(VectorExpressionDescriptor.Mode.FILTER)
+ .setNumArguments(2)
+ .setArgumentTypes(
+ VectorExpressionDescriptor.ArgumentType.ALL_FAMILY,
+ VectorExpressionDescriptor.ArgumentType.BINARY)
+ .setInputExpressionTypes(
+ VectorExpressionDescriptor.InputExpressionType.COLUMN,
+ VectorExpressionDescriptor.InputExpressionType.DYNAMICVALUE);
+ return b.build();
+ }
+
+ // Type-specific handling
+ abstract class BloomFilterCheck {
+ abstract public boolean checkValue(ColumnVector columnVector, int idx);
+ }
+
+ class BytesBloomFilterCheck extends BloomFilterCheck {
+ @Override
+ public boolean checkValue(ColumnVector columnVector, int idx) {
+ BytesColumnVector col = (BytesColumnVector) columnVector;
+ return bloomFilter.testBytes(col.vector[idx], col.start[idx], col.length[idx]);
+ }
+ }
+
+ class LongBloomFilterCheck extends BloomFilterCheck {
+ @Override
+ public boolean checkValue(ColumnVector columnVector, int idx) {
+ LongColumnVector col = (LongColumnVector) columnVector;
+ return bloomFilter.testLong(col.vector[idx]);
+ }
+ }
+
+ class DoubleBloomFilterCheck extends BloomFilterCheck {
+ @Override
+ public boolean checkValue(ColumnVector columnVector, int idx) {
+ DoubleColumnVector col = (DoubleColumnVector) columnVector;
+ return bloomFilter.testDouble(col.vector[idx]);
+ }
+ }
+
+ class DecimalBloomFilterCheck extends BloomFilterCheck {
+ private byte[] scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
+
+ @Override
+ public boolean checkValue(ColumnVector columnVector, int idx) {
+ DecimalColumnVector col = (DecimalColumnVector) columnVector;
+ int startIdx = col.vector[idx].toBytes(scratchBuffer);
+ return bloomFilter.testBytes(scratchBuffer, startIdx, scratchBuffer.length - startIdx);
+ }
+ }
+
+ class TimestampBloomFilterCheck extends BloomFilterCheck {
+ @Override
+ public boolean checkValue(ColumnVector columnVector, int idx) {
+ TimestampColumnVector col = (TimestampColumnVector) columnVector;
+ return bloomFilter.testLong(col.time[idx]);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java
new file mode 100644
index 0000000..3ecb82e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java
@@ -0,0 +1,474 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hive.common.util.BloomFilter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VectorUDAFBloomFilter extends VectorAggregateExpression {
+
+ private static final Logger LOG = LoggerFactory.getLogger(VectorUDAFBloomFilter.class);
+
+ private static final long serialVersionUID = 1L;
+
+ private VectorExpression inputExpression;
+ private long expectedEntries = -1;
+ private ValueProcessor valueProcessor;
+ transient private int bitSetSize = -1;
+ transient private BytesWritable bw = new BytesWritable();
+ transient private ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+
+ /**
+ * class for storing the current aggregate value.
+ */
+ private static final class Aggregation implements AggregationBuffer {
+ private static final long serialVersionUID = 1L;
+
+ BloomFilter bf;
+
+ public Aggregation(long expectedEntries) {
+ bf = new BloomFilter(expectedEntries);
+ }
+
+ @Override
+ public int getVariableSize() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void reset() {
+ bf.reset();
+ }
+ }
+
+ public VectorUDAFBloomFilter(VectorExpression inputExpression) {
+ this();
+ this.inputExpression = inputExpression;
+
+ // Instantiate the ValueProcessor based on the input type
+ VectorExpressionDescriptor.ArgumentType inputType =
+ VectorExpressionDescriptor.ArgumentType.fromHiveTypeName(inputExpression.getOutputType());
+ switch (inputType) {
+ case INT_FAMILY:
+ case DATE:
+ valueProcessor = new ValueProcessorLong();
+ break;
+ case FLOAT_FAMILY:
+ valueProcessor = new ValueProcessorDouble();
+ break;
+ case DECIMAL:
+ valueProcessor = new ValueProcessorDecimal();
+ break;
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ case STRING_FAMILY:
+ case BINARY:
+ valueProcessor = new ValueProcessorBytes();
+ break;
+ case TIMESTAMP:
+ valueProcessor = new ValueProcessorTimestamp();
+ break;
+ default:
+ throw new IllegalStateException("Unsupported type " + inputType);
+ }
+ }
+
+ public VectorUDAFBloomFilter() {
+ super();
+ }
+
+ @Override
+ public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+ if (expectedEntries < 0) {
+ throw new IllegalStateException("expectedEntries not initialized");
+ }
+ return new Aggregation(expectedEntries);
+ }
+
+ @Override
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
+ throws HiveException {
+
+ inputExpression.evaluate(batch);
+
+ ColumnVector inputColumn = batch.cols[this.inputExpression.getOutputColumn()];
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ Aggregation myagg = (Aggregation) agg;
+
+ if (inputColumn.isRepeating) {
+ if (inputColumn.noNulls) {
+ valueProcessor.processValue(myagg, inputColumn, 0);
+ }
+ return;
+ }
+
+ if (!batch.selectedInUse && inputColumn.noNulls) {
+ iterateNoSelectionNoNulls(myagg, inputColumn, batchSize);
+ }
+ else if (!batch.selectedInUse) {
+ iterateNoSelectionHasNulls(myagg, inputColumn, batchSize);
+ }
+ else if (inputColumn.noNulls){
+ iterateSelectionNoNulls(myagg, inputColumn, batchSize, batch.selected);
+ }
+ else {
+ iterateSelectionHasNulls(myagg, inputColumn, batchSize, batch.selected);
+ }
+ }
+
+ private void iterateNoSelectionNoNulls(
+ Aggregation myagg,
+ ColumnVector inputColumn,
+ int batchSize) {
+ for (int i=0; i< batchSize; ++i) {
+ valueProcessor.processValue(myagg, inputColumn, i);
+ }
+ }
+
+ private void iterateNoSelectionHasNulls(
+ Aggregation myagg,
+ ColumnVector inputColumn,
+ int batchSize) {
+
+ for (int i=0; i< batchSize; ++i) {
+ if (!inputColumn.isNull[i]) {
+ valueProcessor.processValue(myagg, inputColumn, i);
+ }
+ }
+ }
+
+ private void iterateSelectionNoNulls(
+ Aggregation myagg,
+ ColumnVector inputColumn,
+ int batchSize,
+ int[] selected) {
+
+ for (int j=0; j< batchSize; ++j) {
+ int i = selected[j];
+ valueProcessor.processValue(myagg, inputColumn, i);
+ }
+ }
+
+ private void iterateSelectionHasNulls(
+ Aggregation myagg,
+ ColumnVector inputColumn,
+ int batchSize,
+ int[] selected) {
+
+ for (int j=0; j< batchSize; ++j) {
+ int i = selected[j];
+ if (!inputColumn.isNull[i]) {
+ valueProcessor.processValue(myagg, inputColumn, i);
+ }
+ }
+ }
+
+ @Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets, int aggregateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ inputExpression.evaluate(batch);
+
+ ColumnVector inputColumn = batch.cols[this.inputExpression.getOutputColumn()];
+
+ if (inputColumn.noNulls) {
+ if (inputColumn.isRepeating) {
+ iterateNoNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ inputColumn, batchSize);
+ } else {
+ if (batch.selectedInUse) {
+ iterateNoNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ inputColumn, batch.selected, batchSize);
+ } else {
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ inputColumn, batchSize);
+ }
+ }
+ } else {
+ if (inputColumn.isRepeating) {
+ // All nulls, no-op for min/max
+ } else {
+ if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ inputColumn, batchSize, batch.selected);
+ } else {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ inputColumn, batchSize);
+ }
+ }
+ }
+ }
+
+ private void iterateNoNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ ColumnVector inputColumn,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ valueProcessor.processValue(myagg, inputColumn, 0);
+ }
+ }
+
+ private void iterateNoNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ ColumnVector inputColumn,
+ int[] selection,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ int row = selection[i];
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ valueProcessor.processValue(myagg, inputColumn, row);
+ }
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ ColumnVector inputColumn,
+ int batchSize) {
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ valueProcessor.processValue(myagg, inputColumn, i);
+ }
+ }
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ ColumnVector inputColumn,
+ int batchSize,
+ int[] selection) {
+
+ for (int i=0; i < batchSize; ++i) {
+ int row = selection[i];
+ if (!inputColumn.isNull[row]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ valueProcessor.processValue(myagg, inputColumn, i);
+ }
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ ColumnVector inputColumn,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!inputColumn.isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ valueProcessor.processValue(myagg, inputColumn, i);
+ }
+ }
+ }
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex);
+ return myagg;
+ }
+
+ @Override
+ public void reset(AggregationBuffer agg) throws HiveException {
+ agg.reset();
+ }
+
+ @Override
+ public Object evaluateOutput(AggregationBuffer agg) throws HiveException {
+ try {
+ Aggregation bfAgg = (Aggregation) agg;
+ byteStream.reset();
+ BloomFilter.serialize(byteStream, bfAgg.bf);
+ byte[] bytes = byteStream.toByteArray();
+ bw.set(bytes, 0, bytes.length);
+ return bw;
+ } catch (IOException err) {
+ throw new HiveException("Error encountered while serializing bloomfilter", err);
+ }
+ }
+
+ @Override
+ public ObjectInspector getOutputObjectInspector() {
+ return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
+ }
+
+ @Override
+ public int getAggregationBufferFixedSize() {
+ if (bitSetSize < 0) {
+ // Not pretty, but we need a way to get the size
+ try {
+ Aggregation agg = (Aggregation) getNewAggregationBuffer();
+ bitSetSize = agg.bf.getBitSet().length;
+ } catch (Exception e) {
+ throw new RuntimeException("Unexpected error while creating AggregationBuffer", e);
+ }
+ }
+
+ // BloomFilter: object(BitSet: object(data: long[]), numBits: int, numHashFunctions: int)
+ JavaDataModel model = JavaDataModel.get();
+ int bloomFilterSize = JavaDataModel.alignUp(model.object() + model.lengthForLongArrayOfSize(bitSetSize),
+ model.memoryAlign());
+ return JavaDataModel.alignUp(
+ model.object() + bloomFilterSize + model.primitive1() + model.primitive1(),
+ model.memoryAlign());
+ }
+
+ @Override
+ public void init(AggregationDesc desc) throws HiveException {
+ GenericUDAFBloomFilterEvaluator udafBloomFilter =
+ (GenericUDAFBloomFilterEvaluator) desc.getGenericUDAFEvaluator();
+ expectedEntries = udafBloomFilter.getExpectedEntries();
+ }
+
+ public VectorExpression getInputExpression() {
+ return inputExpression;
+ }
+
+ public void setInputExpression(VectorExpression inputExpression) {
+ this.inputExpression = inputExpression;
+ }
+
+ public long getExpectedEntries() {
+ return expectedEntries;
+ }
+
+ public void setExpectedEntries(long expectedEntries) {
+ this.expectedEntries = expectedEntries;
+ }
+
+ // Type-specific handling done here
+ private static abstract class ValueProcessor {
+ abstract protected void processValue(Aggregation myagg, ColumnVector inputColumn, int index);
+ }
+
+ //
+ // Type-specific implementations
+ //
+
+ public static class ValueProcessorBytes extends ValueProcessor {
+ @Override
+ protected void processValue(Aggregation myagg, ColumnVector columnVector, int i) {
+ BytesColumnVector inputColumn = (BytesColumnVector) columnVector;
+ myagg.bf.addBytes(inputColumn.vector[i], inputColumn.start[i], inputColumn.length[i]);
+ }
+ }
+
+ public static class ValueProcessorLong extends ValueProcessor {
+ @Override
+ protected void processValue(Aggregation myagg, ColumnVector columnVector, int i) {
+ LongColumnVector inputColumn = (LongColumnVector) columnVector;
+ myagg.bf.addLong(inputColumn.vector[i]);
+ }
+ }
+
+ public static class ValueProcessorDouble extends ValueProcessor {
+ @Override
+ protected void processValue(Aggregation myagg, ColumnVector columnVector, int i) {
+ DoubleColumnVector inputColumn = (DoubleColumnVector) columnVector;
+ myagg.bf.addDouble(inputColumn.vector[i]);
+ }
+ }
+
+ public static class ValueProcessorDecimal extends ValueProcessor {
+ private byte[] scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
+
+ @Override
+ protected void processValue(Aggregation myagg, ColumnVector columnVector, int i) {
+ DecimalColumnVector inputColumn = (DecimalColumnVector) columnVector;
+ int startIdx = inputColumn.vector[i].toBytes(scratchBuffer);
+ myagg.bf.addBytes(scratchBuffer, startIdx, scratchBuffer.length - startIdx);
+ }
+ }
+
+ public static class ValueProcessorTimestamp extends ValueProcessor {
+ @Override
+ protected void processValue(Aggregation myagg, ColumnVector columnVector, int i) {
+ TimestampColumnVector inputColumn = (TimestampColumnVector) columnVector;
+ myagg.bf.addLong(inputColumn.time[i]);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
new file mode 100644
index 0000000..ad190b7
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.BloomFilter;
+
+public class VectorUDAFBloomFilterMerge extends VectorAggregateExpression {
+
+ private static final long serialVersionUID = 1L;
+
+ private VectorExpression inputExpression;
+ private long expectedEntries = -1;
+ transient private int aggBufferSize = -1;
+ transient private BytesWritable bw = new BytesWritable();
+
+ /**
+ * class for storing the current aggregate value.
+ */
+ private static final class Aggregation implements AggregationBuffer {
+ private static final long serialVersionUID = 1L;
+
+ byte[] bfBytes;
+
+ public Aggregation(long expectedEntries) {
+ try {
+ BloomFilter bf = new BloomFilter(expectedEntries);
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ BloomFilter.serialize(bytesOut, bf);
+ bfBytes = bytesOut.toByteArray();
+ } catch (Exception err) {
+ throw new IllegalArgumentException("Error creating aggregation buffer", err);
+ }
+ }
+
+ @Override
+ public int getVariableSize() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void reset() {
+ // Do not change the initial bytes which contain NumHashFunctions/NumBits!
+ Arrays.fill(bfBytes, BloomFilter.START_OF_SERIALIZED_LONGS, bfBytes.length, (byte) 0);
+ }
+ }
+
+ public VectorUDAFBloomFilterMerge(VectorExpression inputExpression) {
+ this();
+ this.inputExpression = inputExpression;
+ }
+
+ public VectorUDAFBloomFilterMerge() {
+ super();
+ }
+
+ @Override
+ public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+ if (expectedEntries < 0) {
+ throw new IllegalStateException("expectedEntries not initialized");
+ }
+ return new Aggregation(expectedEntries);
+ }
+
+ @Override
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
+ throws HiveException {
+
+ inputExpression.evaluate(batch);
+
+ ColumnVector inputColumn = batch.cols[this.inputExpression.getOutputColumn()];
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ Aggregation myagg = (Aggregation) agg;
+
+ if (inputColumn.isRepeating) {
+ if (inputColumn.noNulls) {
+ processValue(myagg, inputColumn, 0);
+ }
+ return;
+ }
+
+ if (!batch.selectedInUse && inputColumn.noNulls) {
+ iterateNoSelectionNoNulls(myagg, inputColumn, batchSize);
+ }
+ else if (!batch.selectedInUse) {
+ iterateNoSelectionHasNulls(myagg, inputColumn, batchSize);
+ }
+ else if (inputColumn.noNulls){
+ iterateSelectionNoNulls(myagg, inputColumn, batchSize, batch.selected);
+ }
+ else {
+ iterateSelectionHasNulls(myagg, inputColumn, batchSize, batch.selected);
+ }
+ }
+
+ private void iterateNoSelectionNoNulls(
+ Aggregation myagg,
+ ColumnVector inputColumn,
+ int batchSize) {
+ for (int i=0; i< batchSize; ++i) {
+ processValue(myagg, inputColumn, i);
+ }
+ }
+
+ private void iterateNoSelectionHasNulls(
+ Aggregation myagg,
+ ColumnVector inputColumn,
+ int batchSize) {
+
+ for (int i=0; i< batchSize; ++i) {
+ if (!inputColumn.isNull[i]) {
+ processValue(myagg, inputColumn, i);
+ }
+ }
+ }
+
+ private void iterateSelectionNoNulls(
+ Aggregation myagg,
+ ColumnVector inputColumn,
+ int batchSize,
+ int[] selected) {
+
+ for (int j=0; j< batchSize; ++j) {
+ int i = selected[j];
+ processValue(myagg, inputColumn, i);
+ }
+ }
+
+ private void iterateSelectionHasNulls(
+ Aggregation myagg,
+ ColumnVector inputColumn,
+ int batchSize,
+ int[] selected) {
+
+ for (int j=0; j< batchSize; ++j) {
+ int i = selected[j];
+ if (!inputColumn.isNull[i]) {
+ processValue(myagg, inputColumn, i);
+ }
+ }
+ }
+
+ @Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets, int aggregateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ inputExpression.evaluate(batch);
+
+ ColumnVector inputColumn = batch.cols[this.inputExpression.getOutputColumn()];
+
+ if (inputColumn.noNulls) {
+ if (inputColumn.isRepeating) {
+ iterateNoNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ inputColumn, batchSize);
+ } else {
+ if (batch.selectedInUse) {
+ iterateNoNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ inputColumn, batch.selected, batchSize);
+ } else {
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ inputColumn, batchSize);
+ }
+ }
+ } else {
+ if (inputColumn.isRepeating) {
+ // All nulls, no-op for min/max
+ } else {
+ if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ inputColumn, batchSize, batch.selected);
+ } else {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ inputColumn, batchSize);
+ }
+ }
+ }
+ }
+
+ private void iterateNoNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ ColumnVector inputColumn,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ processValue(myagg, inputColumn, 0);
+ }
+ }
+
+ private void iterateNoNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ ColumnVector inputColumn,
+ int[] selection,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ int row = selection[i];
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ processValue(myagg, inputColumn, row);
+ }
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ ColumnVector inputColumn,
+ int batchSize) {
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ processValue(myagg, inputColumn, i);
+ }
+ }
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ ColumnVector inputColumn,
+ int batchSize,
+ int[] selection) {
+
+ for (int i=0; i < batchSize; ++i) {
+ int row = selection[i];
+ if (!inputColumn.isNull[row]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ processValue(myagg, inputColumn, i);
+ }
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ ColumnVector inputColumn,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!inputColumn.isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ processValue(myagg, inputColumn, i);
+ }
+ }
+ }
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex);
+ return myagg;
+ }
+
+ @Override
+ public void reset(AggregationBuffer agg) throws HiveException {
+ agg.reset();
+ }
+
+ @Override
+ public Object evaluateOutput(AggregationBuffer agg) throws HiveException {
+ Aggregation bfAgg = (Aggregation) agg;
+ bw.set(bfAgg.bfBytes, 0, bfAgg.bfBytes.length);
+ return bw;
+ }
+
+ @Override
+ public ObjectInspector getOutputObjectInspector() {
+ return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
+ }
+
+ @Override
+ public int getAggregationBufferFixedSize() {
+ if (aggBufferSize < 0) {
+ // Not pretty, but we need a way to get the size
+ try {
+ Aggregation agg = (Aggregation) getNewAggregationBuffer();
+ aggBufferSize = agg.bfBytes.length;
+ } catch (Exception e) {
+ throw new RuntimeException("Unexpected error while creating AggregationBuffer", e);
+ }
+ }
+
+ return aggBufferSize;
+ }
+
+ @Override
+ public void init(AggregationDesc desc) throws HiveException {
+ GenericUDAFBloomFilterEvaluator udafBloomFilter =
+ (GenericUDAFBloomFilterEvaluator) desc.getGenericUDAFEvaluator();
+ expectedEntries = udafBloomFilter.getExpectedEntries();
+ }
+
+ void processValue(Aggregation myagg, ColumnVector columnVector, int i) {
+ // columnVector entry is byte array representing serialized BloomFilter.
+ // BloomFilter.mergeBloomFilterBytes() does a simple byte ORing
+ // which should be faster than deserialize/merge.
+ BytesColumnVector inputColumn = (BytesColumnVector) columnVector;
+ BloomFilter.mergeBloomFilterBytes(myagg.bfBytes, 0, myagg.bfBytes.length,
+ inputColumn.vector[i], inputColumn.start[i], inputColumn.length[i]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index e3d9d7f..439950b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -335,6 +335,7 @@ public class Vectorizer implements PhysicalPlanResolver {
supportedGenericUDFs.add(GenericUDFNvl.class);
supportedGenericUDFs.add(GenericUDFElt.class);
supportedGenericUDFs.add(GenericUDFInitCap.class);
+ supportedGenericUDFs.add(GenericUDFInBloomFilter.class);
// For type casts
supportedGenericUDFs.add(UDFToLong.class);
@@ -368,6 +369,7 @@ public class Vectorizer implements PhysicalPlanResolver {
supportedAggregationUdfs.add("stddev");
supportedAggregationUdfs.add("stddev_pop");
supportedAggregationUdfs.add("stddev_samp");
+ supportedAggregationUdfs.add("bloom_filter");
}
private class VectorTaskColumnInfo {
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java
index fb9a140..deb0f76 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
@@ -72,6 +73,8 @@ public class GenericUDAFBloomFilter implements GenericUDAFResolver2 {
// Bloom filter rest
private ByteArrayOutputStream result = new ByteArrayOutputStream();
+ private transient byte[] scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
+
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
super.init(m, parameters);
@@ -167,9 +170,10 @@ public class GenericUDAFBloomFilter implements GenericUDAFResolver2 {
bf.addDouble(vDouble);
break;
case DECIMAL:
- HiveDecimal vDecimal = ((HiveDecimalObjectInspector)inputOI).
- getPrimitiveJavaObject(parameters[0]);
- bf.addString(vDecimal.toString());
+ HiveDecimalWritable vDecimal = ((HiveDecimalObjectInspector)inputOI).
+ getPrimitiveWritableObject(parameters[0]);
+ int startIdx = vDecimal.toBytes(scratchBuffer);
+ bf.addBytes(scratchBuffer, startIdx, scratchBuffer.length - startIdx);
break;
case DATE:
DateWritable vDate = ((DateObjectInspector)inputOI).
http://git-wip-us.apache.org/repos/asf/hive/blob/62ebd1ab/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java
index 1b7de6c..3e6e069 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInBloomFilter.java
@@ -22,8 +22,11 @@ import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedExpressions;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorInBloomFilterColDynamicValue;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
@@ -41,6 +44,7 @@ import java.sql.Timestamp;
/**
* GenericUDF to lookup a value in BloomFilter
*/
+@VectorizedExpressions({VectorInBloomFilterColDynamicValue.class})
public class GenericUDFInBloomFilter extends GenericUDF {
private static final Logger LOG = LoggerFactory.getLogger(GenericUDFInBloomFilter.class);
@@ -48,6 +52,7 @@ public class GenericUDFInBloomFilter extends GenericUDF {
private transient ObjectInspector bloomFilterObjectInspector;
private transient BloomFilter bloomFilter;
private transient boolean initializedBloomFilter;
+ private transient byte[] scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
@@ -133,9 +138,10 @@ public class GenericUDFInBloomFilter extends GenericUDF {
get(arguments[0].get());
return bloomFilter.testDouble(vDouble);
case DECIMAL:
- HiveDecimal vDecimal = ((HiveDecimalObjectInspector) valObjectInspector).
- getPrimitiveJavaObject(arguments[0].get());
- return bloomFilter.testString(vDecimal.toString());
+ HiveDecimalWritable vDecimal = ((HiveDecimalObjectInspector) valObjectInspector).
+ getPrimitiveWritableObject(arguments[0].get());
+ int startIdx = vDecimal.toBytes(scratchBuffer);
+ return bloomFilter.testBytes(scratchBuffer, startIdx, scratchBuffer.length - startIdx);
case DATE:
DateWritable vDate = ((DateObjectInspector) valObjectInspector).
getPrimitiveWritableObject(arguments[0].get());