You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2018/06/07 23:14:08 UTC
[drill] 01/03: DRILL-6375 : Support for ANY_VALUE aggregate function
This is an automated email from the ASF dual-hosted git repository.
sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit a27a1047b16621f6c3c6c181c97f8713231f6c6c
Author: Gautam Parai <gp...@maprtech.com>
AuthorDate: Tue Apr 3 19:18:31 2018 -0700
DRILL-6375 : Support for ANY_VALUE aggregate function
closes #1256
---
.../apache/drill/exec/expr/fn/HiveFuncHolder.java | 2 +-
.../java-exec/src/main/codegen/data/AggrTypes1.tdd | 46 +++
.../src/main/codegen/data/DecimalAggrTypes1.tdd | 6 +
.../main/codegen/templates/AggrTypeFunctions1.java | 22 +-
.../codegen/templates/ComplexAggrFunctions1.java | 120 ++++++++
.../templates/DateIntervalAggrFunctions1.java | 11 +-
.../Decimal/DecimalAggrTypeFunctions1.java | 97 +++++++
.../codegen/templates/VarCharAggrFunctions1.java | 11 +
.../compile/sig/ConstantExpressionIdentifier.java | 7 +
.../apache/drill/exec/expr/EvaluationVisitor.java | 17 +-
.../drill/exec/expr/fn/AbstractFuncHolder.java | 2 +-
.../drill/exec/expr/fn/DrillAggFuncHolder.java | 27 +-
.../expr/fn/DrillComplexWriterAggFuncHolder.java | 142 ++++++++++
.../apache/drill/exec/expr/fn/DrillFuncHolder.java | 34 ++-
.../drill/exec/expr/fn/FunctionConverter.java | 4 +-
.../apache/drill/exec/expr/fn/impl/Mappify.java | 2 +-
.../drill/exec/expr/fn/impl/MappifyUtility.java | 34 ++-
.../physical/impl/aggregate/StreamingAggBatch.java | 67 ++++-
.../drill/exec/planner/physical/HashAggPrule.java | 14 +-
.../drill/exec/vector/complex/MapUtility.java | 308 ++++++++++++++++++++-
.../physical/impl/agg/TestAggWithAnyValue.java | 149 ++++++++++
.../test/resources/store/json/test_anyvalue.json | 50 ++++
.../codegen/templates/RepeatedValueVectors.java | 1 +
.../src/main/codegen/templates/ValueHolders.java | 11 +-
.../exec/expr/holders/RepeatedListHolder.java | 2 +
.../drill/exec/expr/holders/RepeatedMapHolder.java | 3 +
.../drill/common/expression/parser/ExprLexer.g | 1 +
.../drill/common/expression/parser/ExprParser.g | 5 +
.../common/expression/AnyValueExpression.java | 69 +++++
.../common/expression/ExpressionStringBuilder.java | 8 +
.../common/expression/FunctionCallFactory.java | 8 +
.../expression/visitors/AbstractExprVisitor.java | 6 +
.../expression/visitors/AggregateChecker.java | 6 +
.../visitors/ConditionalExprOptimizer.java | 7 +
.../expression/visitors/ConstantChecker.java | 6 +
.../common/expression/visitors/ExprVisitor.java | 2 +
.../expression/visitors/ExpressionValidator.java | 7 +
37 files changed, 1265 insertions(+), 49 deletions(-)
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/expr/fn/HiveFuncHolder.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/expr/fn/HiveFuncHolder.java
index 8e7b645..80f299e 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/expr/fn/HiveFuncHolder.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/expr/fn/HiveFuncHolder.java
@@ -130,7 +130,7 @@ public class HiveFuncHolder extends AbstractFuncHolder {
* @return workspace variables
*/
@Override
- public JVar[] renderStart(ClassGenerator<?> g, HoldingContainer[] inputVariables){
+ public JVar[] renderStart(ClassGenerator<?> g, HoldingContainer[] inputVariables, FieldReference fieldReference){
JVar[] workspaceJVars = new JVar[5];
workspaceJVars[0] = g.declareClassField("returnOI", g.getModel()._ref(ObjectInspector.class));
diff --git a/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd b/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd
index 202f539..3fb2601 100644
--- a/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd
+++ b/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd
@@ -88,6 +88,52 @@
{inputType: "Interval", outputType: "NullableInterval", runningType: "Interval", major: "Date", initialValue: "0"},
{inputType: "NullableInterval", outputType: "NullableInterval", runningType: "Interval", major: "Date", initialValue: "0"}
]
+ },
+ {className: "AnyValue", funcName: "any_value", types: [
+ {inputType: "Bit", outputType: "NullableBit", runningType: "Bit", major: "Numeric"},
+ {inputType: "Int", outputType: "NullableInt", runningType: "Int", major: "Numeric"},
+ {inputType: "BigInt", outputType: "NullableBigInt", runningType: "BigInt", major: "Numeric"},
+ {inputType: "NullableBit", outputType: "NullableBit", runningType: "Bit", major: "Numeric"},
+ {inputType: "NullableInt", outputType: "NullableInt", runningType: "Int", major: "Numeric"},
+ {inputType: "NullableBigInt", outputType: "NullableBigInt", runningType: "BigInt", major: "Numeric"},
+ {inputType: "Float4", outputType: "NullableFloat4", runningType: "Float4", major: "Numeric"},
+ {inputType: "Float8", outputType: "NullableFloat8", runningType: "Float8", major: "Numeric"},
+ {inputType: "NullableFloat4", outputType: "NullableFloat4", runningType: "Float4", major: "Numeric"},
+ {inputType: "NullableFloat8", outputType: "NullableFloat8", runningType: "Float8", major: "Numeric"},
+ {inputType: "Date", outputType: "NullableDate", runningType: "Date", major: "Date", initialValue: "0"},
+ {inputType: "NullableDate", outputType: "NullableDate", runningType: "Date", major: "Date", initialValue: "0"},
+ {inputType: "TimeStamp", outputType: "NullableTimeStamp", runningType: "TimeStamp", major: "Date", initialValue: "0"},
+ {inputType: "NullableTimeStamp", outputType: "NullableTimeStamp", runningType: "TimeStamp", major: "Date", initialValue: "0"},
+ {inputType: "Time", outputType: "NullableTime", runningType: "Time", major: "Date", initialValue: "0"},
+ {inputType: "NullableTime", outputType: "NullableTime", runningType: "Time", major: "Date", initialValue: "0"},
+ {inputType: "IntervalDay", outputType: "NullableIntervalDay", runningType: "IntervalDay", major: "Date", initialValue: "0"},
+ {inputType: "NullableIntervalDay", outputType: "NullableIntervalDay", runningType: "IntervalDay", major: "Date", initialValue: "0"},
+ {inputType: "IntervalYear", outputType: "NullableIntervalYear", runningType: "IntervalYear", major: "Date", initialValue: "0"},
+ {inputType: "NullableIntervalYear", outputType: "NullableIntervalYear", runningType: "IntervalYear", major: "Date", initialValue: "0"},
+ {inputType: "Interval", outputType: "NullableInterval", runningType: "Interval", major: "Date", initialValue: "0"},
+ {inputType: "NullableInterval", outputType: "NullableInterval", runningType: "Interval", major: "Date", initialValue: "0"},
+ {inputType: "VarChar", outputType: "NullableVarChar", runningType: "VarChar", major: "VarBytes", initialValue: ""},
+ {inputType: "NullableVarChar", outputType: "NullableVarChar", runningType: "VarChar", major: "VarBytes", initialValue: ""},
+ {inputType: "VarBinary", outputType: "NullableVarBinary", runningType: "VarBinary", major: "VarBytes"},
+ {inputType: "NullableVarBinary", outputType: "NullableVarBinary", runningType: "VarBinary", major: "VarBytes"}
+ {inputType: "List", outputType: "List", runningType: "List", major: "Complex"}
+ {inputType: "Map", outputType: "Map", runningType: "Map", major: "Complex"}
+ {inputType: "RepeatedBit", outputType: "RepeatedBit", runningType: "RepeatedBit", major: "Complex"},
+ {inputType: "RepeatedInt", outputType: "RepeatedInt", runningType: "RepeatedInt", major: "Complex"},
+ {inputType: "RepeatedBigInt", outputType: "RepeatedBigInt", runningType: "RepeatedBigInt", major: "Complex"},
+ {inputType: "RepeatedFloat4", outputType: "RepeatedFloat4", runningType: "RepeatedFloat4", major: "Complex"},
+ {inputType: "RepeatedFloat8", outputType: "RepeatedFloat8", runningType: "RepeatedFloat8", major: "Complex"},
+ {inputType: "RepeatedDate", outputType: "RepeatedDate", runningType: "RepeatedDate", major: "Complex"},
+ {inputType: "RepeatedTimeStamp", outputType: "RepeatedTimeStamp", runningType: "RepeatedTimeStamp", major: "Complex"},
+ {inputType: "RepeatedTime", outputType: "RepeatedTime", runningType: "RepeatedTime", major: "Complex"},
+ {inputType: "RepeatedIntervalDay", outputType: "RepeatedIntervalDay", runningType: "RepeatedIntervalDay", major: "Complex"},
+ {inputType: "RepeatedIntervalYear", outputType: "RepeatedIntervalYear", runningType: "RepeatedIntervalYear", major: "Complex"},
+ {inputType: "RepeatedInterval", outputType: "RepeatedInterval", runningType: "RepeatedInterval", major: "Complex"},
+ {inputType: "RepeatedVarChar", outputType: "RepeatedVarChar", runningType: "RepeatedVarChar", major: "Complex"},
+ {inputType: "RepeatedVarBinary", outputType: "RepeatedVarBinary", runningType: "RepeatedVarBinary", major: "Complex"},
+ {inputType: "RepeatedList", outputType: "RepeatedList", runningType: "RepeatedList", major: "Complex"},
+ {inputType: "RepeatedMap", outputType: "RepeatedMap", runningType: "RepeatedMap", major: "Complex"}
+ ]
}
]
}
diff --git a/exec/java-exec/src/main/codegen/data/DecimalAggrTypes1.tdd b/exec/java-exec/src/main/codegen/data/DecimalAggrTypes1.tdd
index 7da2d07..003bbfa 100644
--- a/exec/java-exec/src/main/codegen/data/DecimalAggrTypes1.tdd
+++ b/exec/java-exec/src/main/codegen/data/DecimalAggrTypes1.tdd
@@ -35,6 +35,12 @@
{inputType: "VarDecimal", outputType: "NullableVarDecimal"},
{inputType: "NullableVarDecimal", outputType: "NullableVarDecimal"}
]
+ },
+ {className: "AnyValue", funcName: "any_value", types: [
+ {inputType: "VarDecimal", outputType: "NullableVarDecimal"},
+ {inputType: "NullableVarDecimal", outputType: "NullableVarDecimal"}
+ {inputType: "RepeatedVarDecimal", outputType: "RepeatedVarDecimal"}
+ ]
}
]
}
diff --git a/exec/java-exec/src/main/codegen/templates/AggrTypeFunctions1.java b/exec/java-exec/src/main/codegen/templates/AggrTypeFunctions1.java
index ebf20e5..59d3715 100644
--- a/exec/java-exec/src/main/codegen/templates/AggrTypeFunctions1.java
+++ b/exec/java-exec/src/main/codegen/templates/AggrTypeFunctions1.java
@@ -61,11 +61,11 @@ public static class ${type.inputType}${aggrtype.className} implements DrillAggFu
value = new ${type.runningType}Holder();
nonNullCount = new BigIntHolder();
nonNullCount.value = 0;
- <#if aggrtype.funcName == "sum">
+ <#if aggrtype.funcName == "sum" || aggrtype.funcName == "any_value">
value.value = 0;
<#elseif aggrtype.funcName == "min">
<#if type.runningType?starts_with("Bit")>
- value.value = 1;
+ value.value = 1;
<#elseif type.runningType?starts_with("Int")>
value.value = Integer.MAX_VALUE;
<#elseif type.runningType?starts_with("BigInt")>
@@ -77,7 +77,7 @@ public static class ${type.inputType}${aggrtype.className} implements DrillAggFu
</#if>
<#elseif aggrtype.funcName == "max">
<#if type.runningType?starts_with("Bit")>
- value.value = 0;
+ value.value = 0;
<#elseif type.runningType?starts_with("Int")>
value.value = Integer.MIN_VALUE;
<#elseif type.runningType?starts_with("BigInt")>
@@ -110,19 +110,21 @@ public static class ${type.inputType}${aggrtype.className} implements DrillAggFu
value.value = Float.isNaN(value.value) ? in.value : Math.min(value.value, in.value);
}
<#elseif type.inputType?contains("Float8")>
- if(!Double.isNaN(in.value)) {
- value.value = Double.isNaN(value.value) ? in.value : Math.min(value.value, in.value);
- }
- <#else>
+ if(!Double.isNaN(in.value)) {
+ value.value = Double.isNaN(value.value) ? in.value : Math.min(value.value, in.value);
+ }
+ <#else>
value.value = Math.min(value.value, in.value);
- </#if>
+ </#if>
<#elseif aggrtype.funcName == "max">
value.value = Math.max(value.value, in.value);
<#elseif aggrtype.funcName == "sum">
value.value += in.value;
<#elseif aggrtype.funcName == "count">
value.value++;
- <#else>
+ <#elseif aggrtype.funcName == "any_value">
+ value.value = in.value;
+ <#else>
// TODO: throw an error ?
</#if>
<#if type.inputType?starts_with("Nullable")>
@@ -143,7 +145,7 @@ public static class ${type.inputType}${aggrtype.className} implements DrillAggFu
@Override
public void reset() {
nonNullCount.value = 0;
- <#if aggrtype.funcName == "sum" || aggrtype.funcName == "count">
+ <#if aggrtype.funcName == "sum" || aggrtype.funcName == "count" || aggrtype.funcName == "any_value">
value.value = 0;
<#elseif aggrtype.funcName == "min">
<#if type.runningType?starts_with("Int")>
diff --git a/exec/java-exec/src/main/codegen/templates/ComplexAggrFunctions1.java b/exec/java-exec/src/main/codegen/templates/ComplexAggrFunctions1.java
new file mode 100644
index 0000000..6aa92e3
--- /dev/null
+++ b/exec/java-exec/src/main/codegen/templates/ComplexAggrFunctions1.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+<@pp.dropOutputFile />
+
+
+
+<#list aggrtypes1.aggrtypes as aggrtype>
+<@pp.changeOutputFile name="/org/apache/drill/exec/expr/fn/impl/gaggr/${aggrtype.className}ComplexFunctions.java" />
+
+<#include "/@includes/license.ftl" />
+
+/*
+ * This class is generated using freemarker and the ${.template_name} template.
+ */
+
+<#-- A utility class that is used to generate java code for aggr functions that maintain a single -->
+<#-- running counter to hold the result. This includes: ANY_VALUE. -->
+
+package org.apache.drill.exec.expr.fn.impl.gaggr;
+
+import org.apache.drill.exec.expr.DrillAggFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.annotations.Workspace;
+import org.apache.drill.exec.expr.holders.*;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.exec.vector.complex.MapUtility;
+import org.apache.drill.exec.vector.complex.writer.*;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.*;
+
+@SuppressWarnings("unused")
+
+public class ${aggrtype.className}ComplexFunctions {
+static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(${aggrtype.className}ComplexFunctions.class);
+
+<#list aggrtype.types as type>
+<#if type.major == "Complex">
+
+@FunctionTemplate(name = "${aggrtype.funcName}", scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+public static class ${type.inputType}${aggrtype.className} implements DrillAggFunc{
+ @Param ${type.inputType}Holder inHolder;
+ @Workspace BigIntHolder nonNullCount;
+ @Output ComplexWriter writer;
+
+ public void setup() {
+ nonNullCount = new BigIntHolder();
+ nonNullCount.value = 0;
+ }
+
+ @Override
+ public void add() {
+ <#if type.inputType?starts_with("Nullable")>
+ sout: {
+ if (inHolder.isSet == 0) {
+ // processing nullable input and the value is null, so don't do anything...
+ break sout;
+ }
+ </#if>
+ <#if aggrtype.funcName == "any_value">
+ <#if type.runningType?starts_with("Map")>
+ if (nonNullCount.value == 0) {
+ org.apache.drill.exec.expr.fn.impl.MappifyUtility.createMap(inHolder.reader, writer, "any_value");
+ }
+ <#elseif type.runningType?starts_with("RepeatedMap")>
+ if (nonNullCount.value == 0) {
+ org.apache.drill.exec.expr.fn.impl.MappifyUtility.createRepeatedMapOrList(inHolder.reader, writer, "any_value");
+ }
+ <#elseif type.runningType?starts_with("List")>
+ if (nonNullCount.value == 0) {
+ org.apache.drill.exec.expr.fn.impl.MappifyUtility.createList(inHolder.reader, writer, "any_value");
+ }
+ <#elseif type.runningType?starts_with("RepeatedList")>
+ if (nonNullCount.value == 0) {
+ org.apache.drill.exec.expr.fn.impl.MappifyUtility.createRepeatedMapOrList(inHolder.reader, writer, "any_value");
+ }
+ <#elseif type.runningType?starts_with("Repeated")>
+ if (nonNullCount.value == 0) {
+ org.apache.drill.exec.expr.fn.impl.MappifyUtility.createList(inHolder.reader, writer, "any_value");
+ }
+ </#if>
+ </#if>
+ nonNullCount.value = 1;
+ <#if type.inputType?starts_with("Nullable")>
+ } // end of sout block
+ </#if>
+ }
+
+ @Override
+ public void output() {
+ //Do nothing since the complex writer takes care of everything!
+ }
+
+ @Override
+ public void reset() {
+ <#if aggrtype.funcName == "any_value">
+ nonNullCount.value = 0;
+ </#if>
+ }
+}
+</#if>
+</#list>
+}
+</#list>
\ No newline at end of file
diff --git a/exec/java-exec/src/main/codegen/templates/DateIntervalAggrFunctions1.java b/exec/java-exec/src/main/codegen/templates/DateIntervalAggrFunctions1.java
index f526575..8080ea7 100644
--- a/exec/java-exec/src/main/codegen/templates/DateIntervalAggrFunctions1.java
+++ b/exec/java-exec/src/main/codegen/templates/DateIntervalAggrFunctions1.java
@@ -131,7 +131,16 @@ public static class ${type.inputType}${aggrtype.className} implements DrillAggFu
</#if>
<#elseif aggrtype.funcName == "count">
value.value++;
- <#else>
+ <#elseif aggrtype.funcName == "any_value">
+ <#if type.outputType?ends_with("Interval")>
+ value.days = in.days;
+ value.months = in.months;
+ value.milliseconds = in.milliseconds;
+ <#elseif type.outputType?ends_with("IntervalDay")>
+ value.days = in.days;
+ value.milliseconds = in.milliseconds;
+ </#if>
+ <#else>
// TODO: throw an error ?
</#if>
<#if type.inputType?starts_with("Nullable")>
diff --git a/exec/java-exec/src/main/codegen/templates/Decimal/DecimalAggrTypeFunctions1.java b/exec/java-exec/src/main/codegen/templates/Decimal/DecimalAggrTypeFunctions1.java
index 6b23f92..7f4ca15 100644
--- a/exec/java-exec/src/main/codegen/templates/Decimal/DecimalAggrTypeFunctions1.java
+++ b/exec/java-exec/src/main/codegen/templates/Decimal/DecimalAggrTypeFunctions1.java
@@ -39,6 +39,8 @@ import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
import org.apache.drill.exec.expr.annotations.Output;
import org.apache.drill.exec.expr.annotations.Param;
import org.apache.drill.exec.expr.annotations.Workspace;
+import org.apache.drill.exec.vector.complex.writer.*;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.*;
import javax.inject.Inject;
import io.netty.buffer.DrillBuf;
import org.apache.drill.exec.expr.holders.*;
@@ -124,6 +126,101 @@ public class Decimal${aggrtype.className}Functions {
nonNullCount.value = 0;
}
}
+ <#elseif aggrtype.funcName.contains("any_value") && type.inputType?starts_with("Repeated")>
+ @FunctionTemplate(name = "${aggrtype.funcName}",
+ scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE,
+ returnType = FunctionTemplate.ReturnType.DECIMAL_AGGREGATE)
+ public static class ${type.inputType}${aggrtype.className} implements DrillAggFunc {
+ @Param ${type.inputType}Holder in;
+ @Output ComplexWriter writer;
+ @Workspace BigIntHolder nonNullCount;
+
+ public void setup() {
+ nonNullCount = new BigIntHolder();
+ }
+
+ @Override
+ public void add() {
+ if (nonNullCount.value == 0) {
+ org.apache.drill.exec.expr.fn.impl.MappifyUtility.createList(in.reader, writer, "any_value");
+ }
+ nonNullCount.value = 1;
+ }
+
+ @Override
+ public void output() {
+ }
+
+ @Override
+ public void reset() {
+ nonNullCount.value = 0;
+ }
+ }
+ <#elseif aggrtype.funcName.contains("any_value")>
+ @FunctionTemplate(name = "${aggrtype.funcName}",
+ scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE,
+ returnType = FunctionTemplate.ReturnType.DECIMAL_AGGREGATE)
+ public static class ${type.inputType}${aggrtype.className} implements DrillAggFunc {
+ @Param ${type.inputType}Holder in;
+ @Inject DrillBuf buffer;
+ @Workspace ObjectHolder value;
+ @Workspace IntHolder scale;
+ @Workspace IntHolder precision;
+ @Output ${type.outputType}Holder out;
+ @Workspace BigIntHolder nonNullCount;
+
+ public void setup() {
+ value = new ObjectHolder();
+ value.obj = java.math.BigDecimal.ZERO;
+ nonNullCount = new BigIntHolder();
+ }
+
+ @Override
+ public void add() {
+ <#if type.inputType?starts_with("Nullable")>
+ sout: {
+ if (in.isSet == 0) {
+ // processing nullable input and the value is null, so don't do anything...
+ break sout;
+ }
+ </#if>
+ if (nonNullCount.value == 0) {
+ value.obj=org.apache.drill.exec.util.DecimalUtility
+ .getBigDecimalFromDrillBuf(in.buffer,in.start,in.end-in.start,in.scale);
+ scale.value = in.scale;
+ precision.value = in.precision;
+ }
+ nonNullCount.value = 1;
+ <#if type.inputType?starts_with("Nullable")>
+ } // end of sout block
+ </#if>
+ }
+
+ @Override
+ public void output() {
+ if (nonNullCount.value > 0) {
+ out.isSet = 1;
+ byte[] bytes = ((java.math.BigDecimal)value.obj).unscaledValue().toByteArray();
+ int len = bytes.length;
+ out.start = 0;
+ out.buffer = buffer.reallocIfNeeded(len);
+ out.buffer.setBytes(0, bytes);
+ out.end = len;
+ out.scale = scale.value;
+ out.precision = precision.value;
+ } else {
+ out.isSet = 0;
+ }
+ }
+
+ @Override
+ public void reset() {
+ scale.value = 0;
+ precision.value = 0;
+ value.obj = null;
+ nonNullCount.value = 0;
+ }
+ }
<#elseif aggrtype.funcName == "max" || aggrtype.funcName == "min">
@FunctionTemplate(name = "${aggrtype.funcName}",
diff --git a/exec/java-exec/src/main/codegen/templates/VarCharAggrFunctions1.java b/exec/java-exec/src/main/codegen/templates/VarCharAggrFunctions1.java
index a5afce9..de5d705 100644
--- a/exec/java-exec/src/main/codegen/templates/VarCharAggrFunctions1.java
+++ b/exec/java-exec/src/main/codegen/templates/VarCharAggrFunctions1.java
@@ -90,6 +90,16 @@ public static class ${type.inputType}${aggrtype.className} implements DrillAggFu
break sout;
}
</#if>
+ <#if aggrtype.className == "AnyValue">
+ if (nonNullCount.value == 0) {
+ nonNullCount.value = 1;
+ int inputLength = in.end - in.start;
+ org.apache.drill.exec.expr.fn.impl.DrillByteArray tmp = (org.apache.drill.exec.expr.fn.impl.DrillByteArray) value.obj;
+ byte[] tempArray = new byte[inputLength];
+ in.buffer.getBytes(in.start, tempArray, 0, inputLength);
+ tmp.setBytes(tempArray);
+ }
+ <#else>
nonNullCount.value = 1;
org.apache.drill.exec.expr.fn.impl.DrillByteArray tmp = (org.apache.drill.exec.expr.fn.impl.DrillByteArray) value.obj;
int cmp = 0;
@@ -121,6 +131,7 @@ public static class ${type.inputType}${aggrtype.className} implements DrillAggFu
tmp.setBytes(tempArray);
}
}
+ </#if>
<#if type.inputType?starts_with("Nullable")>
} // end of sout block
</#if>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/sig/ConstantExpressionIdentifier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/sig/ConstantExpressionIdentifier.java
index d764663..0175d51 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/sig/ConstantExpressionIdentifier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/sig/ConstantExpressionIdentifier.java
@@ -22,6 +22,7 @@ import java.util.IdentityHashMap;
import java.util.List;
import java.util.Set;
+import org.apache.drill.common.expression.AnyValueExpression;
import org.apache.drill.common.expression.BooleanOperator;
import org.apache.drill.common.expression.CastExpression;
import org.apache.drill.common.expression.ConvertExpression;
@@ -235,6 +236,12 @@ public class ConstantExpressionIdentifier implements ExprVisitor<Boolean, Identi
}
@Override
+ public Boolean visitAnyValueExpression(AnyValueExpression e,
+ IdentityHashMap<LogicalExpression, Object> value) throws RuntimeException {
+ return e.getInput().accept(this, value);
+ }
+
+ @Override
public Boolean visitParameter(ValueExpressions.ParameterExpression e, IdentityHashMap<LogicalExpression, Object> value) throws RuntimeException {
return false;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
index 64cfe66..4486972 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Set;
import java.util.Stack;
+import org.apache.drill.common.expression.AnyValueExpression;
import org.apache.drill.common.expression.BooleanOperator;
import org.apache.drill.common.expression.CastExpression;
import org.apache.drill.common.expression.ConvertExpression;
@@ -181,7 +182,7 @@ public class EvaluationVisitor {
AbstractFuncHolder holder = (AbstractFuncHolder) holderExpr.getHolder();
- JVar[] workspaceVars = holder.renderStart(generator, null);
+ JVar[] workspaceVars = holder.renderStart(generator, null, holderExpr.getFieldReference());
if (holder.isNested()) {
generator.getMappingSet().enterChild();
@@ -456,8 +457,7 @@ public class EvaluationVisitor {
generator.getEvalBlock().add(eval);
} else {
- JExpression vector = e.isSuperReader() ? vv1.component(componentVariable) : vv1;
- JExpression expr = vector.invoke("getReader");
+ JExpression expr = vv1.invoke("getReader");
PathSegment seg = e.getReadPath();
JVar isNull = null;
@@ -713,6 +713,17 @@ public class EvaluationVisitor {
return fc.accept(this, value);
}
+ @Override
+ public HoldingContainer visitAnyValueExpression(AnyValueExpression e, ClassGenerator<?> value)
+ throws RuntimeException {
+
+ List<LogicalExpression> newArgs = Lists.newArrayList();
+ newArgs.add(e.getInput()); // input_expr
+
+ FunctionCall fc = new FunctionCall(AnyValueExpression.ANY_VALUE, newArgs, e.getPosition());
+ return fc.accept(this, value);
+ }
+
private HoldingContainer visitBooleanAnd(BooleanOperator op,
ClassGenerator<?> generator) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/AbstractFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/AbstractFuncHolder.java
index 4902260..7dd58ac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/AbstractFuncHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/AbstractFuncHolder.java
@@ -32,7 +32,7 @@ import com.sun.codemodel.JVar;
public abstract class AbstractFuncHolder implements FuncHolder {
- public abstract JVar[] renderStart(ClassGenerator<?> g, HoldingContainer[] inputVariables);
+ public abstract JVar[] renderStart(ClassGenerator<?> g, HoldingContainer[] inputVariables, FieldReference fieldReference);
public void renderMiddle(ClassGenerator<?> g, HoldingContainer[] inputVariables, JVar[] workspaceJVars) {
// default implementation is add no code
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillAggFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillAggFuncHolder.java
index e1cd96f..1a5df67 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillAggFuncHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillAggFuncHolder.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.Types;
@@ -44,19 +45,19 @@ import com.sun.codemodel.JVar;
class DrillAggFuncHolder extends DrillFuncHolder {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillAggFuncHolder.class);
- private String setup() {
+ protected String setup() {
return meth("setup");
}
- private String reset() {
+ protected String reset() {
return meth("reset", false);
}
- private String add() {
+ protected String add() {
return meth("add");
}
- private String output() {
+ protected String output() {
return meth("output");
}
- private String cleanup() {
+ protected String cleanup() {
return meth("cleanup", false);
}
@@ -78,7 +79,7 @@ class DrillAggFuncHolder extends DrillFuncHolder {
}
@Override
- public JVar[] renderStart(ClassGenerator<?> g, HoldingContainer[] inputVariables) {
+ public JVar[] renderStart(ClassGenerator<?> g, HoldingContainer[] inputVariables, FieldReference fieldReference) {
if (!g.getMappingSet().isHashAggMapping()) { //Declare workspace vars for non-hash-aggregation.
JVar[] workspaceJVars = declareWorkspaceVariables(g);
generateBody(g, BlockType.SETUP, setup(), null, workspaceJVars, true);
@@ -128,12 +129,20 @@ class DrillAggFuncHolder extends DrillFuncHolder {
@Override
public HoldingContainer renderEnd(ClassGenerator<?> classGenerator, HoldingContainer[] inputVariables,
JVar[] workspaceJVars, FieldReference fieldReference) {
- HoldingContainer out = classGenerator.declare(getReturnType(), false);
+ HoldingContainer out = null;
+ JVar internalOutput = null;
+ if (getReturnType().getMinorType() != TypeProtos.MinorType.LATE) {
+ out = classGenerator.declare(getReturnType(), false);
+ }
JBlock sub = new JBlock();
+ if (getReturnType().getMinorType() != TypeProtos.MinorType.LATE) {
+ internalOutput = sub.decl(JMod.FINAL, classGenerator.getHolderType(getReturnType()), getReturnValue().getName(), JExpr._new(classGenerator.getHolderType(getReturnType())));
+ }
classGenerator.getEvalBlock().add(sub);
- JVar internalOutput = sub.decl(JMod.FINAL, classGenerator.getHolderType(getReturnType()), getReturnValue().getName(), JExpr._new(classGenerator.getHolderType(getReturnType())));
addProtectedBlock(classGenerator, sub, output(), null, workspaceJVars, false);
- sub.assign(out.getHolder(), internalOutput);
+ if (getReturnType().getMinorType() != TypeProtos.MinorType.LATE) {
+ sub.assign(out.getHolder(), internalOutput);
+ }
//hash aggregate uses workspace vectors. Initialization is done in "setup" and does not require "reset" block.
if (!classGenerator.getMappingSet().isHashAggMapping()) {
generateBody(classGenerator, BlockType.RESET, reset(), null, workspaceJVars, false);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterAggFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterAggFuncHolder.java
new file mode 100644
index 0000000..44766bd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterAggFuncHolder.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.expr.fn;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
+import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch;
+import org.apache.drill.exec.physical.impl.aggregate.StreamingAggTemplate;
+import org.apache.drill.exec.record.VectorAccessibleComplexWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
+
+import com.sun.codemodel.JBlock;
+import com.sun.codemodel.JClass;
+import com.sun.codemodel.JExpr;
+import com.sun.codemodel.JExpression;
+import com.sun.codemodel.JInvocation;
+import com.sun.codemodel.JVar;
+import com.sun.codemodel.JMod;
+
+public class DrillComplexWriterAggFuncHolder extends DrillAggFuncHolder {
+
+ // Complex writer to write out complex data-types e.g. repeated maps/lists
+ private JVar complexWriter;
+ // The index at which to write - important when group-by is present. Implicit assumption that the output indexes
+ // will be sequential starting from 0. i.e. the first group would be written at index 0, second group at index 1
+ // and so on.
+ private JVar writerIdx;
+ private JVar lastWriterIdx;
+ public DrillComplexWriterAggFuncHolder(FunctionAttributes functionAttributes, FunctionInitializer initializer) {
+ super(functionAttributes, initializer);
+ }
+
+ @Override
+ public boolean isComplexWriterFuncHolder() {
+ return true;
+ }
+
+ @Override
+ public JVar[] renderStart(ClassGenerator<?> classGenerator, HoldingContainer[] inputVariables, FieldReference fieldReference) {
+ if (!classGenerator.getMappingSet().isHashAggMapping()) { //Declare workspace vars for non-hash-aggregation.
+ JInvocation container = classGenerator.getMappingSet().getOutgoing().invoke("getOutgoingContainer");
+
+ complexWriter = classGenerator.declareClassField("complexWriter", classGenerator.getModel()._ref(ComplexWriter.class));
+ writerIdx = classGenerator.declareClassField("writerIdx", classGenerator.getModel()._ref(int.class));
+ lastWriterIdx = classGenerator.declareClassField("lastWriterIdx", classGenerator.getModel()._ref(int.class));
+ //Default name is "col", if not passed in a reference name for the output vector.
+ String refName = fieldReference == null ? "col" : fieldReference.getRootSegment().getPath();
+ JClass cwClass = classGenerator.getModel().ref(VectorAccessibleComplexWriter.class);
+ classGenerator.getSetupBlock().assign(complexWriter, cwClass.staticInvoke("getWriter").arg(refName).arg(container));
+ classGenerator.getSetupBlock().assign(writerIdx, JExpr.lit(0));
+ classGenerator.getSetupBlock().assign(lastWriterIdx, JExpr.lit(-1));
+
+ JVar[] workspaceJVars = declareWorkspaceVariables(classGenerator);
+ generateBody(classGenerator, ClassGenerator.BlockType.SETUP, setup(), null, workspaceJVars, true);
+ return workspaceJVars;
+ } else {
+ return super.renderStart(classGenerator, inputVariables, fieldReference);
+ }
+ }
+
+ @Override
+ public void renderMiddle(ClassGenerator<?> classGenerator, HoldingContainer[] inputVariables, JVar[] workspaceJVars) {
+
+ classGenerator.getEvalBlock().directStatement(String.format("//---- start of eval portion of %s function. ----//",
+ getRegisteredNames()[0]));
+
+ JBlock sub = new JBlock(true, true);
+ JBlock topSub = sub;
+ JClass aggBatchClass = null;
+
+ if (classGenerator.getCodeGenerator().getDefinition() == StreamingAggTemplate.TEMPLATE_DEFINITION) {
+ aggBatchClass = classGenerator.getModel().ref(StreamingAggBatch.class);
+ }
+ assert aggBatchClass != null : "ComplexWriterAggFuncHolder should only be used with an Aggregate Operator";
+
+ JExpression aggBatch = JExpr.cast(aggBatchClass, classGenerator.getMappingSet().getOutgoing());
+
+ classGenerator.getSetupBlock().add(aggBatch.invoke("addComplexWriter").arg(complexWriter));
+ // Only set the writer if there is a position change. Calling setPosition may cause underlying writers to allocate
+ // new vectors, thereby, losing the previously stored values
+ JBlock condAssignCW = classGenerator.getEvalBlock()._if(lastWriterIdx.ne(writerIdx))._then();
+ condAssignCW.add(complexWriter.invoke("setPosition").arg(writerIdx));
+ condAssignCW.assign(lastWriterIdx, writerIdx);
+ sub.decl(classGenerator.getModel()._ref(ComplexWriter.class), getReturnValue().getName(), complexWriter);
+
+ // add the subblock after the out declaration.
+ classGenerator.getEvalBlock().add(topSub);
+
+ addProtectedBlock(classGenerator, sub, add(), inputVariables, workspaceJVars, false);
+ classGenerator.getEvalBlock().directStatement(String.format("//---- end of eval portion of %s function. ----//",
+ getRegisteredNames()[0]));
+ }
+
+ @Override
+ public HoldingContainer renderEnd(ClassGenerator<?> classGenerator, HoldingContainer[] inputVariables,
+ JVar[] workspaceJVars, FieldReference fieldReference) {
+ HoldingContainer out = null;
+ JVar internalOutput = null;
+ if (getReturnType().getMinorType() != TypeProtos.MinorType.LATE) {
+ out = classGenerator.declare(getReturnType(), false);
+ }
+ JBlock sub = new JBlock();
+ if (getReturnType().getMinorType() != TypeProtos.MinorType.LATE) {
+ internalOutput = sub.decl(JMod.FINAL, classGenerator.getHolderType(getReturnType()), getReturnValue().getName(),
+ JExpr._new(classGenerator.getHolderType(getReturnType())));
+ }
+ classGenerator.getEvalBlock().add(sub);
+ if (getReturnType().getMinorType() == TypeProtos.MinorType.LATE) {
+ sub.assignPlus(writerIdx, JExpr.lit(1));
+ }
+ addProtectedBlock(classGenerator, sub, output(), null, workspaceJVars, false);
+ if (getReturnType().getMinorType() != TypeProtos.MinorType.LATE) {
+ sub.assign(out.getHolder(), internalOutput);
+ }
+ //hash aggregate uses workspace vectors. Initialization is done in "setup" and does not require "reset" block.
+ if (!classGenerator.getMappingSet().isHashAggMapping()) {
+ generateBody(classGenerator, ClassGenerator.BlockType.RESET, reset(), null, workspaceJVars, false);
+ }
+ generateBody(classGenerator, ClassGenerator.BlockType.CLEANUP, cleanup(), null, workspaceJVars, false);
+
+ return out;
+ }
+}
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
index 9df5305..240ff27 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
@@ -23,8 +23,10 @@ import java.util.List;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.FunctionHolderExpression;
import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
@@ -36,6 +38,9 @@ import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
import org.apache.drill.exec.expr.DrillFuncHolderExpr;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
+import org.apache.drill.exec.expr.holders.ListHolder;
+import org.apache.drill.exec.expr.holders.MapHolder;
+import org.apache.drill.exec.expr.holders.RepeatedMapHolder;
import org.apache.drill.exec.ops.UdfUtilities;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
@@ -80,7 +85,7 @@ public abstract class DrillFuncHolder extends AbstractFuncHolder {
}
@Override
- public JVar[] renderStart(ClassGenerator<?> g, HoldingContainer[] inputVariables) {
+ public JVar[] renderStart(ClassGenerator<?> g, HoldingContainer[] inputVariables, FieldReference fieldReference) {
return declareWorkspaceVariables(g);
}
@@ -186,12 +191,35 @@ public abstract class DrillFuncHolder extends AbstractFuncHolder {
ValueReference parameter = attributes.getParameters()[i];
HoldingContainer inputVariable = inputVariables[i];
- if (parameter.isFieldReader() && ! inputVariable.isReader() && ! Types.isComplex(inputVariable.getMajorType()) && inputVariable.getMinorType() != MinorType.UNION) {
+ if (parameter.isFieldReader() && ! inputVariable.isReader()
+ && ! Types.isComplex(inputVariable.getMajorType()) && inputVariable.getMinorType() != MinorType.UNION) {
JType singularReaderClass = g.getModel()._ref(TypeHelper.getHolderReaderImpl(inputVariable.getMajorType().getMinorType(),
inputVariable.getMajorType().getMode()));
JType fieldReadClass = g.getModel()._ref(FieldReader.class);
sub.decl(fieldReadClass, parameter.getName(), JExpr._new(singularReaderClass).arg(inputVariable.getHolder()));
- } else {
+ } else if (!parameter.isFieldReader() && inputVariable.isReader() && Types.isComplex(parameter.getType())) {
+ // For complex data-types (repeated maps/lists) the input to the aggregate will be a FieldReader. However, aggregate
+ // functions like ANY_VALUE, will assume the input to be a RepeatedMapHolder etc. Generate boilerplate code, to map
+ // from FieldReader to respective Holder.
+ if (parameter.getType().getMinorType() == MinorType.MAP) {
+ JType holderClass;
+ if (parameter.getType().getMode() == TypeProtos.DataMode.REPEATED) {
+ holderClass = g.getModel()._ref(RepeatedMapHolder.class);
+ JVar holderVar = sub.decl(holderClass, parameter.getName(), JExpr._new(holderClass));
+ sub.assign(holderVar.ref("reader"), inputVariable.getHolder());
+ } else {
+ holderClass = g.getModel()._ref(MapHolder.class);
+ JVar holderVar = sub.decl(holderClass, parameter.getName(), JExpr._new(holderClass));
+ sub.assign(holderVar.ref("reader"), inputVariable.getHolder());
+ }
+ } else if (parameter.getType().getMinorType() == MinorType.LIST) {
+ //TODO: Add support for REPEATED LISTs
+ JType holderClass = g.getModel()._ref(ListHolder.class);
+ JVar holderVar = sub.decl(holderClass, parameter.getName(), JExpr._new(holderClass));
+ sub.assign(holderVar.ref("reader"), inputVariable.getHolder());
+ }
+ }
+ else {
sub.decl(inputVariable.getHolder().type(), parameter.getName(), inputVariable.getHolder());
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
index ca5605a..b5a2f07 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
@@ -181,7 +181,9 @@ public class FunctionConverter {
switch (template.scope()) {
case POINT_AGGREGATE:
- return new DrillAggFuncHolder(functionAttributes, initializer);
+ return outputField.isComplexWriter() ?
+ new DrillComplexWriterAggFuncHolder(functionAttributes, initializer) :
+ new DrillAggFuncHolder(functionAttributes, initializer);
case SIMPLE:
return outputField.isComplexWriter() ?
new DrillComplexWriterFuncHolder(functionAttributes, initializer) :
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Mappify.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Mappify.java
index 703d62e..3db9f5a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Mappify.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Mappify.java
@@ -60,7 +60,7 @@ public class Mappify {
}
public void eval() {
- buffer = org.apache.drill.exec.expr.fn.impl.MappifyUtility.mappify(reader, writer, buffer);
+ buffer = org.apache.drill.exec.expr.fn.impl.MappifyUtility.mappify(reader, writer, buffer, "Mappify/kvgen");
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java
index 3745fe2..b3fca2b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java
@@ -37,7 +37,7 @@ public class MappifyUtility {
public static final String fieldKey = "key";
public static final String fieldValue = "value";
- public static DrillBuf mappify(FieldReader reader, BaseWriter.ComplexWriter writer, DrillBuf buffer) {
+ public static DrillBuf mappify(FieldReader reader, BaseWriter.ComplexWriter writer, DrillBuf buffer, String caller) {
// Currently we expect single map as input
if (DataMode.REPEATED == reader.getType().getMode() || !(reader.getType().getMinorType() == TypeProtos.MinorType.MAP)) {
throw new DrillRuntimeException("kvgen function only supports Simple maps as input");
@@ -72,7 +72,7 @@ public class MappifyUtility {
mapWriter.varChar(fieldKey).write(vh);
// Write the value to the map
- MapUtility.writeToMapFromReader(fieldReader, mapWriter);
+ MapUtility.writeToMapFromReader(fieldReader, mapWriter, caller);
mapWriter.end();
}
@@ -80,5 +80,35 @@ public class MappifyUtility {
return buffer;
}
+
+ public static void createRepeatedMapOrList(FieldReader reader, BaseWriter.ComplexWriter writer, String caller) {
+ if (DataMode.REPEATED != reader.getType().getMode()) {
+ throw new DrillRuntimeException("Do not invoke createRepeatedMapOrList() unless MINOR mode is REPEATED");
+ }
+ BaseWriter.ListWriter listWriter = writer.rootAsList();
+ MapUtility.writeToListFromReader(reader, listWriter, caller);
+ }
+
+ public static void createMap(FieldReader reader, BaseWriter.ComplexWriter writer, String caller) {
+ if (DataMode.REPEATED == reader.getType().getMode()) {
+ throw new DrillRuntimeException("Do not invoke createMap() with REPEATED MINOR mode");
+ }
+ if (reader.getType().getMinorType() == TypeProtos.MinorType.MAP) {
+ BaseWriter.MapWriter mapWriter = writer.rootAsMap();
+ // Iterate over the fields in the map
+ Iterator<String> fieldIterator = reader.iterator();
+ while (fieldIterator.hasNext()) {
+ String field = fieldIterator.next();
+ FieldReader fieldReader = reader.reader(field);
+ // Write the value to the map
+ MapUtility.writeToMapFromReader(fieldReader, mapWriter, field, caller);
+ }
+ }
+ }
+
+ public static void createList(FieldReader reader, BaseWriter.ComplexWriter writer, String caller) {
+ BaseWriter.ListWriter listWriter = writer.rootAsList();
+ MapUtility.writeToListFromReader(reader, listWriter, caller);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 34ab97e..caeed50 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -18,7 +18,9 @@
package org.apache.drill.exec.physical.impl.aggregate;
import java.io.IOException;
+import java.util.List;
+import com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollector;
@@ -35,6 +37,7 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.DrillFuncHolderExpr;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.HoldingContainerExpression;
import org.apache.drill.exec.expr.TypeHelper;
@@ -50,21 +53,26 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.UntypedNullHolder;
+import org.apache.drill.exec.vector.UntypedNullVector;
import org.apache.drill.exec.vector.ValueVector;
import com.sun.codemodel.JExpr;
import com.sun.codemodel.JVar;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggBatch.class);
private StreamingAggregator aggregator;
private final RecordBatch incoming;
+ private List<BaseWriter.ComplexWriter> complexWriters;
private boolean done = false;
private boolean first = true;
private int recordCount = 0;
@@ -107,6 +115,11 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
}
@Override
+ public VectorContainer getOutgoingContainer() {
+ return this.container;
+ }
+
+ @Override
public void buildSchema() throws SchemaChangeException {
IterOutcome outcome = next(incoming);
switch (outcome) {
@@ -131,6 +144,10 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
for (final VectorWrapper<?> w : container) {
w.getValueVector().allocateNew();
}
+
+ if (complexWriters != null) {
+ container.buildSchema(SelectionVectorMode.NONE);
+ }
}
@Override
@@ -177,7 +194,6 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
throw new IllegalStateException(String.format("unknown outcome %s", outcome));
}
}
-
AggOutcome out = aggregator.doWork();
recordCount = aggregator.getOutputCount();
logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
@@ -191,6 +207,11 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
// fall through
case RETURN_OUTCOME:
IterOutcome outcome = aggregator.getOutcome();
+ // In case of complex writer expression, vectors would be added to batch run-time.
+ // We have to re-build the schema.
+ if (complexWriters != null) {
+ container.buildSchema(SelectionVectorMode.NONE);
+ }
if (outcome == IterOutcome.NONE && first) {
first = false;
done = true;
@@ -213,6 +234,14 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
}
}
+ private void allocateComplexWriters() {
+ // Allocate the complex writers before processing the incoming batch
+ if (complexWriters != null) {
+ for (final BaseWriter.ComplexWriter writer : complexWriters) {
+ writer.allocate();
+ }
+ }
+ }
/**
* Method is invoked when we have a straight aggregate (no group by expression) and our input is empty.
@@ -272,9 +301,15 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
}
}
+ public void addComplexWriter(final BaseWriter.ComplexWriter writer) {
+ complexWriters.add(writer);
+ }
+
private StreamingAggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException, IOException{
ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getOptions());
cg.getCodeGenerator().plainJavaCapable(true);
+ // Uncomment out this line to debug the generated code.
+ //cg.getCodeGenerator().saveCodeForDebugging(true);
container.clear();
LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getKeys().size()];
@@ -307,12 +342,29 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
continue;
}
- final MaterializedField outputField = MaterializedField.create(ne.getRef().getLastSegment().getNameSegment().getPath(),
- expr.getMajorType());
- @SuppressWarnings("resource")
- ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
- TypedFieldId id = container.add(vector);
- valueExprs[i] = new ValueVectorWriteExpression(id, expr, true);
+ /* Populate the complex writers for complex exprs */
+ if (expr instanceof DrillFuncHolderExpr &&
+ ((DrillFuncHolderExpr) expr).getHolder().isComplexWriterFuncHolder()) {
+ // Need to process ComplexWriter function evaluation.
+ // Lazy initialization of the list of complex writers, if not done yet.
+ if (complexWriters == null) {
+ complexWriters = Lists.newArrayList();
+ } else {
+ complexWriters.clear();
+ }
+ // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer.
+ ((DrillFuncHolderExpr) expr).getFieldReference(ne.getRef());
+ MaterializedField field = MaterializedField.create(ne.getRef().getAsNamePart().getName(), UntypedNullHolder.TYPE);
+ container.add(new UntypedNullVector(field, container.getAllocator()));
+ valueExprs[i] = expr;
+ } else {
+ final MaterializedField outputField = MaterializedField.create(ne.getRef().getLastSegment().getNameSegment().getPath(),
+ expr.getMajorType());
+ @SuppressWarnings("resource")
+ ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
+ TypedFieldId id = container.add(vector);
+ valueExprs[i] = new ValueVectorWriteExpression(id, expr, true);
+ }
}
if (collector.hasErrors()) {
@@ -331,6 +383,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
container.buildSchema(SelectionVectorMode.NONE);
StreamingAggregator agg = context.getImplementationClass(cg);
agg.setup(oContext, incoming, this);
+ allocateComplexWriters();
return agg;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
index 02dd4de..19499d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.planner.physical;
import com.google.common.collect.Lists;
+import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.drill.exec.planner.logical.DrillAggregateRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
@@ -58,7 +59,8 @@ public class HashAggPrule extends AggPruleBase {
final DrillAggregateRel aggregate = call.rel(0);
final RelNode input = call.rel(1);
- if (aggregate.containsDistinctCall() || aggregate.getGroupCount() == 0) {
+ if (aggregate.containsDistinctCall() || aggregate.getGroupCount() == 0
+ || requiresStreamingAgg(aggregate)) {
// currently, don't use HashAggregate if any of the logical aggrs contains DISTINCT or
// if there are no grouping keys
return;
@@ -101,6 +103,16 @@ public class HashAggPrule extends AggPruleBase {
}
}
+ private boolean requiresStreamingAgg(DrillAggregateRel aggregate) {
+ //If contains ANY_VALUE aggregate, using HashAgg would not work
+ for (AggregateCall agg : aggregate.getAggCallList()) {
+ if (agg.getAggregation().getName().equalsIgnoreCase("any_value")) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private class TwoPhaseSubset extends SubsetTransformer<DrillAggregateRel, InvalidRelException> {
final RelTrait distOnAllKeys;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapUtility.java
index 543a6db..f4d29e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapUtility.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapUtility.java
@@ -28,14 +28,14 @@ import org.apache.drill.exec.vector.complex.reader.FieldReader;
import org.apache.drill.exec.vector.complex.writer.BaseWriter;
public class MapUtility {
- private final static String TYPE_MISMATCH_ERROR = "Mappify/kvgen does not support heterogeneous value types. All values in the input map must be of the same type. The field [%s] has a differing type [%s].";
+ private final static String TYPE_MISMATCH_ERROR = " does not support heterogeneous value types. All values in the input map must be of the same type. The field [%s] has a differing type [%s].";
/*
* Function to read a value from the field reader, detect the type, construct the appropriate value holder
* and use the value holder to write to the Map.
*/
// TODO : This should be templatized and generated using freemarker
- public static void writeToMapFromReader(FieldReader fieldReader, BaseWriter.MapWriter mapWriter) {
+ public static void writeToMapFromReader(FieldReader fieldReader, BaseWriter.MapWriter mapWriter, String caller) {
try {
MajorType valueMajorType = fieldReader.getType();
MinorType valueMinorType = valueMajorType.getMinorType();
@@ -228,11 +228,311 @@ public class MapUtility {
fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).list());
break;
default:
- throw new DrillRuntimeException(String.format("kvgen does not support input of type: %s", valueMinorType));
+ throw new DrillRuntimeException(String.format(caller
+ + " does not support input of type: %s", valueMinorType));
}
} catch (ClassCastException e) {
final MaterializedField field = fieldReader.getField();
- throw new DrillRuntimeException(String.format(TYPE_MISMATCH_ERROR, field.getName(), field.getType()));
+ throw new DrillRuntimeException(String.format(caller + TYPE_MISMATCH_ERROR, field.getName(), field.getType()));
+ }
+ }
+
+ public static void writeToMapFromReader(FieldReader fieldReader, BaseWriter.MapWriter mapWriter,
+ String fieldName, String caller) {
+ try {
+ MajorType valueMajorType = fieldReader.getType();
+ MinorType valueMinorType = valueMajorType.getMinorType();
+ boolean repeated = false;
+
+ if (valueMajorType.getMode() == TypeProtos.DataMode.REPEATED) {
+ repeated = true;
+ }
+
+ switch (valueMinorType) {
+ case TINYINT:
+ if (repeated) {
+ fieldReader.copyAsValue(mapWriter.list(fieldName).tinyInt());
+ } else {
+ fieldReader.copyAsValue(mapWriter.tinyInt(fieldName));
+ }
+ break;
+ case SMALLINT:
+ if (repeated) {
+ fieldReader.copyAsValue(mapWriter.list(fieldName).smallInt());
+ } else {
+ fieldReader.copyAsValue(mapWriter.smallInt(fieldName));
+ }
+ break;
+ case BIGINT:
+ if (repeated) {
+ fieldReader.copyAsValue(mapWriter.list(fieldName).bigInt());
+ } else {
+ fieldReader.copyAsValue(mapWriter.bigInt(fieldName));
+ }
+ break;
+ case INT:
+ if (repeated) {
+ fieldReader.copyAsValue(mapWriter.list(fieldName).integer());
+ } else {
+ fieldReader.copyAsValue(mapWriter.integer(fieldName));
+ }
+ break;
+ case UINT1:
+ if (repeated) {
+ fieldReader.copyAsValue(mapWriter.list(fieldName).uInt1());
+ } else {
+ fieldReader.copyAsValue(mapWriter.uInt1(fieldName));
+ }
+ break;
+ case UINT2:
+ if (repeated) {
+ fieldReader.copyAsValue(mapWriter.list(fieldName).uInt2());
+ } else {
+ fieldReader.copyAsValue(mapWriter.uInt2(fieldName));
+ }
+ break;
+ case UINT4:
+ if (repeated) {
+ fieldReader.copyAsValue(mapWriter.list(fieldName).uInt4());
+ } else {
+ fieldReader.copyAsValue(mapWriter.uInt4(fieldName));
+ }
+ break;
+ case UINT8:
+ if (repeated) {
+ fieldReader.copyAsValue(mapWriter.list(fieldName).uInt8());
+ } else {
+ fieldReader.copyAsValue(mapWriter.uInt8(fieldName));
+ }
+ break;
+ case DECIMAL9:
+ if (repeated) {
+ fieldReader.copyAsValue(mapWriter.list(fieldName).decimal9());
+ } else {
+ fieldReader.copyAsValue(mapWriter.decimal9(fieldName));
+ }
+ break;
+ case DECIMAL18:
+ if (repeated) {
+ fieldReader.copyAsValue(mapWriter.list(fieldName).decimal18());
+ } else {
+ fieldReader.copyAsValue(mapWriter.decimal18(fieldName));
+ }
+ break;
+ case DECIMAL28SPARSE:
+ if (repeated) {
+ fieldReader.copyAsValue(mapWriter.list(fieldName).decimal28Sparse());
+ } else {
+ fieldReader.copyAsValue(mapWriter.decimal28Sparse(fieldName));
+ }
+ break;
+ case DECIMAL38SPARSE:
+ if (repeated) {
+ fieldReader.copyAsValue(mapWriter.list(fieldName).decimal38Sparse());
+ } else {
+ fieldReader.copyAsValue(mapWriter.decimal38Sparse(fieldName));
+ }
+ break;
+ case VARDECIMAL:
+ if (repeated) {
+ fieldReader.copyAsValue(mapWriter.list(fieldName).varDecimal(valueMajorType.getScale(), valueMajorType.getPrecision()));
+ } else {
+ fieldReader.copyAsValue(mapWriter.varDecimal(fieldName, valueMajorType.getScale(), valueMajorType.getPrecision()));
+ }
+ break;
+ case DATE:
+ if (repeated) {
+ fieldReader.copyAsValue(mapWriter.list(fieldName).date());
+ } else {
+ fieldReader.copyAsValue(mapWriter.date(fieldName));
+ }
+ break;
+ case TIME:
+ if (repeated) {
+ fieldReader.copyAsValue(mapWriter.list(fieldName).time());
+ } else {
+ fieldReader.copyAsValue(mapWriter.time(fieldName));
+ }
+ break;
+ case TIMESTAMP:
+ if (repeated) {
+ fieldReader.copyAsValue(mapWriter.list(fieldName).timeStamp());
+ } else {
+ fieldReader.copyAsValue(mapWriter.timeStamp(fieldName));
+ }
+ break;
+ case INTERVAL:
+ if (repeated) {
+ fieldReader.copyAsValue(mapWriter.list(fieldName).interval());
+ } else {
+ fieldReader.copyAsValue(mapWriter.interval(fieldName));
+ }
+ break;
+ case INTERVALDAY:
+ if (repeated) {
+ fieldReader.copyAsValue(mapWriter.list(fieldName).intervalDay());
+ } else {
+ fieldReader.copyAsValue(mapWriter.intervalDay(fieldName));
+ }
+ break;
+ case INTERVALYEAR:
+ if (repeated) {
+ fieldReader.copyAsValue(mapWriter.list(fieldName).intervalYear());
+ } else {
+ fieldReader.copyAsValue(mapWriter.intervalYear(fieldName));
+ }
+ break;
+ case FLOAT4:
+ if (repeated) {
+ fieldReader.copyAsValue(mapWriter.list(fieldName).float4());
+ } else {
+ fieldReader.copyAsValue(mapWriter.float4(fieldName));
+ }
+ break;
+ case FLOAT8:
+ if (repeated) {
+ fieldReader.copyAsValue(mapWriter.list(fieldName).float8());
+ } else {
+ fieldReader.copyAsValue(mapWriter.float8(fieldName));
+ }
+ break;
+ case BIT:
+ if (repeated) {
+ fieldReader.copyAsValue(mapWriter.list(fieldName).bit());
+ } else {
+ fieldReader.copyAsValue(mapWriter.bit(fieldName));
+ }
+ break;
+ case VARCHAR:
+ if (repeated) {
+ fieldReader.copyAsValue(mapWriter.list(fieldName).varChar());
+ } else {
+ fieldReader.copyAsValue(mapWriter.varChar(fieldName));
+ }
+ break;
+ case VARBINARY:
+ if (repeated) {
+ fieldReader.copyAsValue(mapWriter.list(fieldName).varBinary());
+ } else {
+ fieldReader.copyAsValue(mapWriter.varBinary(fieldName));
+ }
+ break;
+ case MAP:
+ if (valueMajorType.getMode() == TypeProtos.DataMode.REPEATED) {
+ fieldReader.copyAsValue(mapWriter.list(fieldName).map());
+ } else {
+ fieldReader.copyAsValue(mapWriter.map(fieldName));
+ }
+ break;
+ case LIST:
+ fieldReader.copyAsValue(mapWriter.list(fieldName).list());
+ break;
+ default:
+ throw new DrillRuntimeException(String.format(caller
+ + " does not support input of type: %s", valueMinorType));
+ }
+ } catch (ClassCastException e) {
+ final MaterializedField field = fieldReader.getField();
+ throw new DrillRuntimeException(String.format(caller + TYPE_MISMATCH_ERROR, field.getName(), field.getType()));
+ }
+ }
+
+ public static void writeToListFromReader(FieldReader fieldReader, BaseWriter.ListWriter listWriter, String caller) {
+ try {
+ MajorType valueMajorType = fieldReader.getType();
+ MinorType valueMinorType = valueMajorType.getMinorType();
+ boolean repeated = false;
+
+ if (valueMajorType.getMode() == TypeProtos.DataMode.REPEATED) {
+ repeated = true;
+ }
+
+ switch (valueMinorType) {
+ case TINYINT:
+ fieldReader.copyAsValue(listWriter.tinyInt());
+ break;
+ case SMALLINT:
+ fieldReader.copyAsValue(listWriter.smallInt());
+ break;
+ case BIGINT:
+ fieldReader.copyAsValue(listWriter.bigInt());
+ break;
+ case INT:
+ fieldReader.copyAsValue(listWriter.integer());
+ break;
+ case UINT1:
+ fieldReader.copyAsValue(listWriter.uInt1());
+ break;
+ case UINT2:
+ fieldReader.copyAsValue(listWriter.uInt2());
+ break;
+ case UINT4:
+ fieldReader.copyAsValue(listWriter.uInt4());
+ break;
+ case UINT8:
+ fieldReader.copyAsValue(listWriter.uInt8());
+ break;
+ case DECIMAL9:
+ fieldReader.copyAsValue(listWriter.decimal9());
+ break;
+ case DECIMAL18:
+ fieldReader.copyAsValue(listWriter.decimal18());
+ break;
+ case DECIMAL28SPARSE:
+ fieldReader.copyAsValue(listWriter.decimal28Sparse());
+ break;
+ case DECIMAL38SPARSE:
+ fieldReader.copyAsValue(listWriter.decimal38Sparse());
+ break;
+ case VARDECIMAL:
+ fieldReader.copyAsValue(listWriter.varDecimal(valueMajorType.getScale(), valueMajorType.getPrecision()));
+ break;
+ case DATE:
+ fieldReader.copyAsValue(listWriter.date());
+ break;
+ case TIME:
+ fieldReader.copyAsValue(listWriter.time());
+ break;
+ case TIMESTAMP:
+ fieldReader.copyAsValue(listWriter.timeStamp());
+ break;
+ case INTERVAL:
+ fieldReader.copyAsValue(listWriter.interval());
+ break;
+ case INTERVALDAY:
+ fieldReader.copyAsValue(listWriter.intervalDay());
+ break;
+ case INTERVALYEAR:
+ fieldReader.copyAsValue(listWriter.intervalYear());
+ break;
+ case FLOAT4:
+ fieldReader.copyAsValue(listWriter.float4());
+ break;
+ case FLOAT8:
+ fieldReader.copyAsValue(listWriter.float8());
+ break;
+ case BIT:
+ fieldReader.copyAsValue(listWriter.bit());
+ break;
+ case VARCHAR:
+ fieldReader.copyAsValue(listWriter.varChar());
+ break;
+ case VARBINARY:
+ fieldReader.copyAsValue(listWriter.varBinary());
+ break;
+ case MAP:
+ fieldReader.copyAsValue(listWriter.map());
+ break;
+ case LIST:
+ fieldReader.copyAsValue(listWriter.list());
+ break;
+ default:
+ throw new DrillRuntimeException(String.format(caller
+ + " function does not support input of type: %s", valueMinorType));
+ }
+ } catch (ClassCastException e) {
+ final MaterializedField field = fieldReader.getField();
+ throw new DrillRuntimeException(String.format(caller + TYPE_MISMATCH_ERROR, field.getName(), field.getType()));
}
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAggWithAnyValue.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAggWithAnyValue.java
new file mode 100644
index 0000000..37c0b52
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAggWithAnyValue.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.agg;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
+import org.apache.drill.exec.util.JsonStringArrayList;
+import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.categories.OperatorTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.drill.test.TestBuilder;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+
+import java.math.BigDecimal;
+import java.util.List;
+
+@Category(OperatorTest.class)
+@RunWith(Enclosed.class)
+public class TestAggWithAnyValue {
+
+ public static class TestAggWithAnyValueMultipleBatches extends PhysicalOpUnitTestBase {
+
+ @Test
+ public void testStreamAggWithGroupBy() {
+ StreamingAggregate aggConf = new StreamingAggregate(null, parseExprs("age.`max`", "age"), parseExprs("any_value(a)", "any_a"), 2.0f);
+ List<String> inputJsonBatches = Lists.newArrayList(
+ "[{ \"age\": {\"min\":20, \"max\":60}, \"city\": \"San Bruno\", \"de\": \"987654321987654321987654321.10987654321\"," +
+ " \"a\": [{\"b\":50, \"c\":30},{\"b\":70, \"c\":40}], \"m\": [{\"n\": [10, 11, 12]}], \"f\": [{\"g\": {\"h\": [{\"k\": 70}, {\"k\": 80}]}}]," +
+ "\"p\": {\"q\": [21, 22, 23]}" + "}, " +
+ "{ \"age\": {\"min\":20, \"max\":60}, \"city\": \"Castro Valley\", \"de\": \"987654321987654321987654321.12987654321\"," +
+ " \"a\": [{\"b\":60, \"c\":40},{\"b\":80, \"c\":50}], \"m\": [{\"n\": [13, 14, 15]}], \"f\": [{\"g\": {\"h\": [{\"k\": 90}, {\"k\": 100}]}}]," +
+ "\"p\": {\"q\": [24, 25, 26]}" + "}]",
+ "[{ \"age\": {\"min\":43, \"max\":80}, \"city\": \"Palo Alto\", \"de\": \"987654321987654321987654321.00987654321\"," +
+ " \"a\": [{\"b\":10, \"c\":15}, {\"b\":20, \"c\":45}], \"m\": [{\"n\": [1, 2, 3]}], \"f\": [{\"g\": {\"h\": [{\"k\": 10}, {\"k\": 20}]}}]," +
+ "\"p\": {\"q\": [27, 28, 29]}" + "}, " +
+ "{ \"age\": {\"min\":43, \"max\":80}, \"city\": \"San Carlos\", \"de\": \"987654321987654321987654321.11987654321\"," +
+ " \"a\": [{\"b\":30, \"c\":25}, {\"b\":40, \"c\":55}], \"m\": [{\"n\": [4, 5, 6]}], \"f\": [{\"g\": {\"h\": [{\"k\": 30}, {\"k\": 40}]}}]," +
+ "\"p\": {\"q\": [30, 31, 32]}" + "}, " +
+ "{ \"age\": {\"min\":43, \"max\":80}, \"city\": \"Palo Alto\", \"de\": \"987654321987654321987654321.13987654321\"," +
+ " \"a\": [{\"b\":70, \"c\":85}, {\"b\":90, \"c\":145}], \"m\": [{\"n\": [7, 8, 9]}], \"f\": [{\"g\": {\"h\": [{\"k\": 50}, {\"k\": 60}]}}]," +
+ "\"p\": {\"q\": [33, 34, 35]}" + "}]");
+ opTestBuilder()
+ .physicalOperator(aggConf)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("age", "any_a")
+ .baselineValues(60l, TestBuilder.listOf(TestBuilder.mapOf("b", 50l, "c", 30l), TestBuilder.mapOf("b", 70l, "c", 40l)))
+ .baselineValues(80l, TestBuilder.listOf(TestBuilder.mapOf("b", 10l, "c", 15l), TestBuilder.mapOf("b", 20l, "c", 45l)))
+ .go();
+ }
+ }
+
+ public static class TestAggWithAnyValueSingleBatch extends BaseTestQuery {
+
+ @Test
+ public void testWithGroupBy() throws Exception {
+ String query = "select t1.age.`max` as age, count(*) as cnt, any_value(t1.a) as any_a, any_value(t1.city) as any_city, " +
+ "any_value(f) as any_f, any_value(m) as any_m, any_value(p) as any_p from cp.`store/json/test_anyvalue.json` t1 group by t1.age.`max`";
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("age", "cnt", "any_a", "any_city", "any_f", "any_m", "any_p")
+ .baselineValues(60l, 2l, TestBuilder.listOf(TestBuilder.mapOf("b", 50l, "c", 30l), TestBuilder.mapOf("b", 70l, "c", 40l)), "San Bruno",
+ TestBuilder.listOf(TestBuilder.mapOf("g", TestBuilder.mapOf("h", TestBuilder.listOf(TestBuilder.mapOf("k", 70l), TestBuilder.mapOf("k", 80l))))),
+ TestBuilder.listOf(TestBuilder.mapOf("n", TestBuilder.listOf(10l, 11l, 12l))),
+ TestBuilder.mapOf("q", TestBuilder.listOf(21l, 22l, 23l)))
+ .baselineValues(80l, 3l, TestBuilder.listOf(TestBuilder.mapOf("b", 10l, "c", 15l), TestBuilder.mapOf("b", 20l, "c", 45l)), "Palo Alto",
+ TestBuilder.listOf(TestBuilder.mapOf("g", TestBuilder.mapOf("h", TestBuilder.listOf(TestBuilder.mapOf("k", 10l), TestBuilder.mapOf("k", 20l))))),
+ TestBuilder.listOf(TestBuilder.mapOf("n", TestBuilder.listOf(1l, 2l, 3l))),
+ TestBuilder.mapOf("q", TestBuilder.listOf(27l, 28l, 29l)))
+ .go();
+ }
+
+ @Test
+ public void testWithoutGroupBy() throws Exception {
+ String query = "select count(*) as cnt, any_value(t1.a) as any_a, any_value(t1.city) as any_city, " +
+ "any_value(f) as any_f, any_value(m) as any_m, any_value(p) as any_p from cp.`store/json/test_anyvalue.json` t1";
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("cnt", "any_a", "any_city", "any_f", "any_m", "any_p")
+ .baselineValues(5l, TestBuilder.listOf(TestBuilder.mapOf("b", 10l, "c", 15l), TestBuilder.mapOf("b", 20l, "c", 45l)), "Palo Alto",
+ TestBuilder.listOf(TestBuilder.mapOf("g", TestBuilder.mapOf("h", TestBuilder.listOf(TestBuilder.mapOf("k", 10l), TestBuilder.mapOf("k", 20l))))),
+ TestBuilder.listOf(TestBuilder.mapOf("n", TestBuilder.listOf(1l, 2l, 3l))),
+ TestBuilder.mapOf("q", TestBuilder.listOf(27l, 28l, 29l)))
+ .go();
+ }
+
+ @Test
+ public void testDecimalWithGroupBy() throws Exception {
+ String query = "select t1.age.`max` as age, any_value(cast(t1.de as decimal(38, 11))) as any_decimal " +
+ "from cp.`store/json/test_anyvalue.json` t1 group by t1.age.`max`";
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("age", "any_decimal")
+ .baselineValues(60l, new BigDecimal("987654321987654321987654321.10987654321"))
+ .baselineValues(80l, new BigDecimal("987654321987654321987654321.00987654321"))
+ .go();
+ }
+
+ @Test
+ public void testRepeatedDecimalWithGroupBy() throws Exception {
+ JsonStringArrayList<BigDecimal> ints = new JsonStringArrayList<>();
+ ints.add(new BigDecimal("999999.999"));
+ ints.add(new BigDecimal("-999999.999"));
+ ints.add(new BigDecimal("0.000"));
+
+ JsonStringArrayList<BigDecimal> longs = new JsonStringArrayList<>();
+ longs.add(new BigDecimal("999999999.999999999"));
+ longs.add(new BigDecimal("-999999999.999999999"));
+ longs.add(new BigDecimal("0.000000000"));
+
+ JsonStringArrayList<BigDecimal> fixedLen = new JsonStringArrayList<>();
+ fixedLen.add(new BigDecimal("999999999999.999999"));
+ fixedLen.add(new BigDecimal("-999999999999.999999"));
+ fixedLen.add(new BigDecimal("0.000000"));
+
+ String query = "select any_value(decimal_int32) as any_dec_32, any_value(decimal_int64) as any_dec_64," +
+ " any_value(decimal_fixedLen) as any_dec_fixed, any_value(decimal_binary) as any_dec_bin" +
+ " from cp.`parquet/repeatedIntLondFixedLenBinaryDecimal.parquet`";
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("any_dec_32", "any_dec_64", "any_dec_fixed", "any_dec_bin")
+ .baselineValues(ints, longs, fixedLen, fixedLen)
+ .go();
+ }
+ }
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/test/resources/store/json/test_anyvalue.json b/exec/java-exec/src/test/resources/store/json/test_anyvalue.json
new file mode 100644
index 0000000..8e7bef9
--- /dev/null
+++ b/exec/java-exec/src/test/resources/store/json/test_anyvalue.json
@@ -0,0 +1,50 @@
+{
+ "age": {"min":43, "max":80},
+ "city": "Palo Alto",
+ "de": "987654321987654321987654321.00987654321",
+ "delist": ["987654321987654321987654321.19987654321", "987654321987654321987654321.20987654321"],
+ "a": [{"b":10, "c":15}, {"b":20, "c":45}],
+ "m": [{"n": [1, 2, 3]}],
+ "f": [{"g": {"h": [{"k": 10}, {"k": 20}]}}],
+ "p": {"q" : [27, 28, 29]}
+}
+{
+ "age": {"min":20, "max":60},
+ "city": "San Bruno",
+ "de": "987654321987654321987654321.10987654321",
+ "delist": ["987654321987654321987654321.17987654321", "987654321987654321987654321.18987654321"],
+ "a": [{"b":50, "c":30},{"b":70, "c":40}],
+ "m": [{"n": [10, 11, 12]}],
+ "f": [{"g": {"h": [{"k": 70}, {"k": 80}]}}],
+ "p": {"q" : [21, 22, 23]}
+}
+{
+ "age": {"min":43, "max":80},
+ "city": "San Carlos",
+ "de": "987654321987654321987654321.11987654321",
+ "delist": ["987654321987654321987654321.11987654321", "987654321987654321987654321.12987654321"],
+ "a": [{"b":30, "c":25}, {"b":40, "c":55}],
+ "m": [{"n": [4, 5, 6]}],
+ "f": [{"g": {"h": [{"k": 30}, {"k": 40}]}}],
+ "p": {"q" : [30, 31, 32]}
+}
+{
+ "age": {"min":20, "max":60},
+ "city": "Castro Valley",
+ "de": "987654321987654321987654321.12987654321",
+ "delist": ["987654321987654321987654321.13987654321", "987654321987654321987654321.14987654321"],
+ "a": [{"b":60, "c":40},{"b":80, "c":50}],
+ "m": [{"n": [13, 14, 15]}],
+ "f": [{"g": {"h": [{"k": 90}, {"k": 100}]}}],
+ "p": {"q" : [24, 25, 26]}
+}
+{
+ "age": {"min":43, "max":80},
+ "city": "Palo Alto",
+ "de": "987654321987654321987654321.13987654321",
+ "delist": ["987654321987654321987654321.15987654321", "987654321987654321987654321.16987654321"],
+ "a": [{"b":70, "c":85}, {"b":90, "c":145}],
+ "m": [{"n": [7, 8, 9]}],
+ "f": [{"g": {"h": [{"k": 50}, {"k": 60}]}}],
+ "p": {"q" : [33, 34, 35]}
+}
\ No newline at end of file
diff --git a/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java b/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java
index 4e6edb5..037332f 100644
--- a/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java
@@ -307,6 +307,7 @@ public final class Repeated${minor.class}Vector extends BaseRepeatedValueVector
holder.start = offsets.getAccessor().get(index);
holder.end = offsets.getAccessor().get(index+1);
holder.vector = values;
+ holder.reader = reader;
}
public void get(int index, int positionIndex, ${minor.class}Holder holder) {
diff --git a/exec/vector/src/main/codegen/templates/ValueHolders.java b/exec/vector/src/main/codegen/templates/ValueHolders.java
index 7635895..8562d1b 100644
--- a/exec/vector/src/main/codegen/templates/ValueHolders.java
+++ b/exec/vector/src/main/codegen/templates/ValueHolders.java
@@ -49,16 +49,17 @@ public final class ${className} implements ValueHolder {
/** The first index (inclusive) into the Vector. **/
public int start;
-
+
/** The last index (exclusive) into the Vector. **/
public int end;
-
+
/** The Vector holding the actual values. **/
public ${minor.class}Vector vector;
-
- <#else>
+
+ public FieldReader reader;
+<#else>
public static final int WIDTH = ${type.width};
-
+
<#if mode.name == "Optional">public int isSet;</#if>
<#assign fields = minor.fields!type.fields />
<#list fields as field>
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/expr/holders/RepeatedListHolder.java b/exec/vector/src/main/java/org/apache/drill/exec/expr/holders/RepeatedListHolder.java
index 52f590a..ce7e34d 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/expr/holders/RepeatedListHolder.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/expr/holders/RepeatedListHolder.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.expr.holders;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.vector.complex.ListVector;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
public final class RepeatedListHolder implements ValueHolder{
@@ -36,4 +37,5 @@ public final class RepeatedListHolder implements ValueHolder{
/** The Vector holding the actual values. **/
public ListVector vector;
+ public FieldReader reader;
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/expr/holders/RepeatedMapHolder.java b/exec/vector/src/main/java/org/apache/drill/exec/expr/holders/RepeatedMapHolder.java
index f8acaeb..516d135 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/expr/holders/RepeatedMapHolder.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/expr/holders/RepeatedMapHolder.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.expr.holders;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
public final class RepeatedMapHolder implements ValueHolder{
@@ -38,4 +39,6 @@ public final class RepeatedMapHolder implements ValueHolder{
/** The Vector holding the actual values. **/
public MapVector vector;
+ public FieldReader reader;
+
}
diff --git a/logical/src/main/antlr3/org/apache/drill/common/expression/parser/ExprLexer.g b/logical/src/main/antlr3/org/apache/drill/common/expression/parser/ExprLexer.g
index 2b497a1..93dba94 100644
--- a/logical/src/main/antlr3/org/apache/drill/common/expression/parser/ExprLexer.g
+++ b/logical/src/main/antlr3/org/apache/drill/common/expression/parser/ExprLexer.g
@@ -37,6 +37,7 @@ When : 'when';
Cast: 'cast';
Convert : 'convert_' ('from' | 'to');
+AnyValue : 'any_value' | 'ANY_VALUE';
Nullable: 'nullable';
Repeat: 'repeat';
As: 'as';
diff --git a/logical/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g b/logical/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g
index e73bdea..78a7cc3 100644
--- a/logical/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g
+++ b/logical/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g
@@ -81,6 +81,10 @@ convertCall returns [LogicalExpression e]
{ $e = FunctionCallFactory.createConvert($Convert.text, $String.text, $expression.e, pos($Convert));}
;
+anyValueCall returns [LogicalExpression e]
+ : AnyValue OParen exprList? CParen {$e = FunctionCallFactory.createExpression($AnyValue.text, pos($AnyValue), $exprList.listE); }
+ ;
+
castCall returns [LogicalExpression e]
@init{
List<LogicalExpression> exprs = new ArrayList<LogicalExpression>();
@@ -313,6 +317,7 @@ arraySegment returns [PathSegment seg]
lookup returns [LogicalExpression e]
: functionCall {$e = $functionCall.e ;}
| convertCall {$e = $convertCall.e; }
+ | anyValueCall {$e = $anyValueCall.e; }
| castCall {$e = $castCall.e; }
| pathSegment {$e = new SchemaPath($pathSegment.seg, pos($pathSegment.start) ); }
| String {$e = new ValueExpressions.QuotedString($String.text, $String.text.length(), pos($String) ); }
diff --git a/logical/src/main/java/org/apache/drill/common/expression/AnyValueExpression.java b/logical/src/main/java/org/apache/drill/common/expression/AnyValueExpression.java
new file mode 100644
index 0000000..4dff147
--- /dev/null
+++ b/logical/src/main/java/org/apache/drill/common/expression/AnyValueExpression.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.expression;
+
+import org.apache.drill.common.expression.visitors.ExprVisitor;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.Types;
+
+import java.util.Collections;
+import java.util.Iterator;
+
+public class AnyValueExpression extends LogicalExpressionBase implements Iterable<LogicalExpression>{
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AnyValueExpression.class);
+
+ public static final String ANY_VALUE = "any_value";
+ private final LogicalExpression input;
+ private final MajorType type;
+
+ /**
+ * @param input
+ * @param pos
+ */
+ public AnyValueExpression(LogicalExpression input, ExpressionPosition pos) {
+ super(pos);
+ this.input = input;
+ this.type = input.getMajorType();
+ }
+
+ @Override
+ public <T, V, E extends Exception> T accept(ExprVisitor<T, V, E> visitor, V value) throws E {
+ return visitor.visitAnyValueExpression(this, value);
+ }
+
+ @Override
+ public Iterator<LogicalExpression> iterator() {
+ return Collections.singleton(input).iterator();
+ }
+
+ public LogicalExpression getInput() {
+ return input;
+ }
+
+ @Override
+ public MajorType getMajorType() {
+ return type;
+ }
+
+ @Override
+ public String toString() {
+ return "AnyValueExpression [input=" + input + ", type=" + Types.toString(type) + "]";
+ }
+}
+
diff --git a/logical/src/main/java/org/apache/drill/common/expression/ExpressionStringBuilder.java b/logical/src/main/java/org/apache/drill/common/expression/ExpressionStringBuilder.java
index f09f887..fb5323b 100644
--- a/logical/src/main/java/org/apache/drill/common/expression/ExpressionStringBuilder.java
+++ b/logical/src/main/java/org/apache/drill/common/expression/ExpressionStringBuilder.java
@@ -252,6 +252,14 @@ public class ExpressionStringBuilder extends AbstractExprVisitor<Void, StringBui
}
@Override
+ public Void visitAnyValueExpression(AnyValueExpression e, StringBuilder sb) throws RuntimeException {
+ sb.append("any(");
+ e.getInput().accept(this, sb);
+ sb.append(")");
+ return null;
+ }
+
+ @Override
public Void visitCastExpression(CastExpression e, StringBuilder sb) throws RuntimeException {
MajorType mt = e.getMajorType();
diff --git a/logical/src/main/java/org/apache/drill/common/expression/FunctionCallFactory.java b/logical/src/main/java/org/apache/drill/common/expression/FunctionCallFactory.java
index 7d9f9a1..513da4c 100644
--- a/logical/src/main/java/org/apache/drill/common/expression/FunctionCallFactory.java
+++ b/logical/src/main/java/org/apache/drill/common/expression/FunctionCallFactory.java
@@ -85,6 +85,14 @@ public class FunctionCallFactory {
return new ConvertExpression(function, conversionType, expr, ep);
}
+ public static LogicalExpression createAnyValue(ExpressionPosition ep, LogicalExpression expr) {
+ return new AnyValueExpression(expr, ep);
+ }
+
+ public static LogicalExpression createAnyValue(String functionName, List<LogicalExpression> args) {
+ return createExpression(functionName, args);
+ }
+
public static LogicalExpression createExpression(String functionName, List<LogicalExpression> args){
return createExpression(functionName, ExpressionPosition.UNKNOWN, args);
}
diff --git a/logical/src/main/java/org/apache/drill/common/expression/visitors/AbstractExprVisitor.java b/logical/src/main/java/org/apache/drill/common/expression/visitors/AbstractExprVisitor.java
index 8458968..18483ce 100644
--- a/logical/src/main/java/org/apache/drill/common/expression/visitors/AbstractExprVisitor.java
+++ b/logical/src/main/java/org/apache/drill/common/expression/visitors/AbstractExprVisitor.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.common.expression.visitors;
+import org.apache.drill.common.expression.AnyValueExpression;
import org.apache.drill.common.expression.BooleanOperator;
import org.apache.drill.common.expression.CastExpression;
import org.apache.drill.common.expression.ConvertExpression;
@@ -165,6 +166,11 @@ public abstract class AbstractExprVisitor<T, VAL, EXCEP extends Exception> imple
}
@Override
+ public T visitAnyValueExpression(AnyValueExpression e, VAL value) throws EXCEP {
+ return visitUnknown(e, value);
+ }
+
+ @Override
public T visitNullConstant(TypedNullConstant e, VAL value) throws EXCEP {
return visitUnknown(e, value);
}
diff --git a/logical/src/main/java/org/apache/drill/common/expression/visitors/AggregateChecker.java b/logical/src/main/java/org/apache/drill/common/expression/visitors/AggregateChecker.java
index 2e6b60b..ac46e42 100644
--- a/logical/src/main/java/org/apache/drill/common/expression/visitors/AggregateChecker.java
+++ b/logical/src/main/java/org/apache/drill/common/expression/visitors/AggregateChecker.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.common.expression.visitors;
+import org.apache.drill.common.expression.AnyValueExpression;
import org.apache.drill.common.expression.BooleanOperator;
import org.apache.drill.common.expression.CastExpression;
import org.apache.drill.common.expression.ConvertExpression;
@@ -177,6 +178,11 @@ public final class AggregateChecker implements ExprVisitor<Boolean, ErrorCollect
}
@Override
+ public Boolean visitAnyValueExpression(AnyValueExpression e, ErrorCollector errors) throws RuntimeException {
+ return e.getInput().accept(this, errors);
+ }
+
+ @Override
public Boolean visitDateConstant(DateExpression intExpr, ErrorCollector errors) {
return false;
}
diff --git a/logical/src/main/java/org/apache/drill/common/expression/visitors/ConditionalExprOptimizer.java b/logical/src/main/java/org/apache/drill/common/expression/visitors/ConditionalExprOptimizer.java
index 05e3a73..1b6eab7 100644
--- a/logical/src/main/java/org/apache/drill/common/expression/visitors/ConditionalExprOptimizer.java
+++ b/logical/src/main/java/org/apache/drill/common/expression/visitors/ConditionalExprOptimizer.java
@@ -21,6 +21,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import org.apache.drill.common.expression.AnyValueExpression;
import org.apache.drill.common.expression.BooleanOperator;
import org.apache.drill.common.expression.CastExpression;
import org.apache.drill.common.expression.ConvertExpression;
@@ -100,6 +101,12 @@ public class ConditionalExprOptimizer extends AbstractExprVisitor<LogicalExpress
+ "It should have been converted to FunctionHolderExpression in materialization");
}
+ @Override
+ public LogicalExpression visitAnyValueExpression(AnyValueExpression cast, Void value) throws RuntimeException {
+ throw new UnsupportedOperationException("AnyValueExpression is not expected here. "
+ + "It should have been converted to FunctionHolderExpression in materialization");
+ }
+
private static Comparator<LogicalExpression> costComparator = new Comparator<LogicalExpression> () {
public int compare(LogicalExpression e1, LogicalExpression e2) {
return e1.getCumulativeCost() <= e2.getCumulativeCost() ? -1 : 1;
diff --git a/logical/src/main/java/org/apache/drill/common/expression/visitors/ConstantChecker.java b/logical/src/main/java/org/apache/drill/common/expression/visitors/ConstantChecker.java
index fbe7d72..a7648ac 100644
--- a/logical/src/main/java/org/apache/drill/common/expression/visitors/ConstantChecker.java
+++ b/logical/src/main/java/org/apache/drill/common/expression/visitors/ConstantChecker.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.common.expression.visitors;
+import org.apache.drill.common.expression.AnyValueExpression;
import org.apache.drill.common.expression.BooleanOperator;
import org.apache.drill.common.expression.CastExpression;
import org.apache.drill.common.expression.ConvertExpression;
@@ -202,6 +203,11 @@ final class ConstantChecker implements ExprVisitor<Boolean, ErrorCollector, Runt
}
@Override
+ public Boolean visitAnyValueExpression(AnyValueExpression e, ErrorCollector value) throws RuntimeException {
+ return e.getInput().accept(this, value);
+ }
+
+ @Override
public Boolean visitNullConstant(TypedNullConstant e, ErrorCollector value) throws RuntimeException {
return true;
}
diff --git a/logical/src/main/java/org/apache/drill/common/expression/visitors/ExprVisitor.java b/logical/src/main/java/org/apache/drill/common/expression/visitors/ExprVisitor.java
index c065bc8..cea83d8 100644
--- a/logical/src/main/java/org/apache/drill/common/expression/visitors/ExprVisitor.java
+++ b/logical/src/main/java/org/apache/drill/common/expression/visitors/ExprVisitor.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.common.expression.visitors;
+import org.apache.drill.common.expression.AnyValueExpression;
import org.apache.drill.common.expression.BooleanOperator;
import org.apache.drill.common.expression.CastExpression;
import org.apache.drill.common.expression.ConvertExpression;
@@ -75,4 +76,5 @@ public interface ExprVisitor<T, VAL, EXCEP extends Exception> {
T visitConvertExpression(ConvertExpression e, VAL value) throws EXCEP;
T visitParameter(ParameterExpression e, VAL value) throws EXCEP;
T visitTypedFieldExpr(TypedFieldExpr e, VAL value) throws EXCEP;
+ T visitAnyValueExpression(AnyValueExpression e, VAL value) throws EXCEP;
}
diff --git a/logical/src/main/java/org/apache/drill/common/expression/visitors/ExpressionValidator.java b/logical/src/main/java/org/apache/drill/common/expression/visitors/ExpressionValidator.java
index b3074fc..df72a34 100644
--- a/logical/src/main/java/org/apache/drill/common/expression/visitors/ExpressionValidator.java
+++ b/logical/src/main/java/org/apache/drill/common/expression/visitors/ExpressionValidator.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.common.expression.visitors;
+import org.apache.drill.common.expression.AnyValueExpression;
import org.apache.drill.common.expression.BooleanOperator;
import org.apache.drill.common.expression.CastExpression;
import org.apache.drill.common.expression.ConvertExpression;
@@ -239,6 +240,12 @@ public class ExpressionValidator implements ExprVisitor<Void, ErrorCollector, Ru
}
@Override
+ public Void visitAnyValueExpression(AnyValueExpression e, ErrorCollector value)
+ throws RuntimeException {
+ return e.getInput().accept(this, value);
+ }
+
+ @Override
public Void visitParameter(ValueExpressions.ParameterExpression e, ErrorCollector value) throws RuntimeException {
return null;
}
--
To stop receiving notification emails like this one, please contact
sorabh@apache.org.