You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/11/29 14:42:55 UTC
[1/3] flink git commit: [FLINK-4825] [table] Implement a RexExecutor
that uses Flink's code generation.
Repository: flink
Updated Branches:
refs/heads/master 910f733f5 -> ecfb5b5f6
[FLINK-4825] [table] Implement a RexExecutor that uses Flink's code generation.
This closes #2884
This closes #2874 (closing PR with Public API breaking changes)
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db441dec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db441dec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db441dec
Branch: refs/heads/master
Commit: db441decb41bf856400766023bfc7de77d6041aa
Parents: 910f733
Author: twalthr <tw...@apache.org>
Authored: Mon Nov 28 12:18:36 2016 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Nov 29 13:14:40 2016 +0100
----------------------------------------------------------------------
.../flink/api/table/FlinkRelBuilder.scala | 1 +
.../flink/api/table/TableEnvironment.scala | 5 +-
.../flink/api/table/codegen/Compiler.scala | 41 +++++++
.../api/table/codegen/ExpressionReducer.scala | 117 +++++++++++++++++++
.../api/table/plan/rules/FlinkRuleSets.scala | 16 ++-
.../flink/api/table/runtime/Compiler.scala | 42 -------
.../api/table/runtime/FlatJoinRunner.scala | 1 +
.../flink/api/table/runtime/FlatMapRunner.scala | 1 +
.../flink/api/table/runtime/MapRunner.scala | 1 +
.../table/runtime/io/ValuesInputFormat.scala | 2 +-
.../api/scala/batch/sql/SetOperatorsTest.scala | 2 +-
.../api/table/ExpressionReductionTest.scala | 20 ++--
.../table/expressions/ScalarOperatorsTest.scala | 2 +-
.../expressions/utils/ExpressionTestBase.scala | 3 +-
14 files changed, 185 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
index ea4eed0..da44ebb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
@@ -80,6 +80,7 @@ object FlinkRelBuilder {
// create context instances with Flink type factory
val planner = new VolcanoPlanner(Contexts.empty())
+ planner.setExecutor(config.getExecutor)
planner.addRelTraitDef(ConventionTraitDef.INSTANCE)
val cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory))
val calciteSchema = CalciteSchema.from(config.getDefaultSchema)
http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
index e8734f5..7b2b738 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
@@ -25,7 +25,7 @@ import org.apache.calcite.config.Lex
import org.apache.calcite.plan.RelOptPlanner
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex.RexExecutorImpl
-import org.apache.calcite.schema.{Schemas, SchemaPlus}
+import org.apache.calcite.schema.{SchemaPlus, Schemas}
import org.apache.calcite.schema.impl.AbstractTable
import org.apache.calcite.sql.SqlOperatorTable
import org.apache.calcite.sql.parser.SqlParser
@@ -38,6 +38,7 @@ import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv}
import org.apache.flink.api.scala.table.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv}
+import org.apache.flink.api.table.codegen.ExpressionReducer
import org.apache.flink.api.table.expressions.{Alias, Expression, UnresolvedFieldReference}
import org.apache.flink.api.table.functions.{ScalarFunction, UserDefinedFunction}
import org.apache.flink.api.table.plan.cost.DataSetCostFactory
@@ -71,7 +72,7 @@ abstract class TableEnvironment(val config: TableConfig) {
.typeSystem(new FlinkTypeSystem)
.operatorTable(getSqlOperatorTable)
// set the executor to evaluate constant expressions
- .executor(new RexExecutorImpl(Schemas.createDataContext(null)))
+ .executor(new ExpressionReducer(config))
.build
// the builder for Calcite RelNodes, Calcite's representation of a relational expression tree.
http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Compiler.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Compiler.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Compiler.scala
new file mode 100644
index 0000000..fce13ba
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Compiler.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.codegen
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.codehaus.commons.compiler.CompileException
+import org.codehaus.janino.SimpleCompiler
+
+trait Compiler[T] {
+
+ @throws(classOf[CompileException])
+ def compile(cl: ClassLoader, name: String, code: String): Class[T] = {
+ require(cl != null, "Classloader must not be null.")
+ val compiler = new SimpleCompiler()
+ compiler.setParentClassLoader(cl)
+ try {
+ compiler.cook(code)
+ } catch {
+ case e: CompileException =>
+ throw new InvalidProgramException("Table program cannot be compiled. " +
+ "This is a bug. Please file an issue.", e)
+ }
+ compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]]
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala
new file mode 100644
index 0000000..74756ef
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.codegen
+
+import java.util
+
+import org.apache.calcite.plan.RelOptPlanner
+import org.apache.calcite.rex.{RexBuilder, RexNode}
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.table.typeutils.{RowTypeInfo, TypeConverter}
+import org.apache.flink.api.table.{FlinkTypeFactory, Row, TableConfig}
+
+import scala.collection.JavaConverters._
+
+/**
+ * Evaluates constant expressions using Flink's [[CodeGenerator]].
+ */
+class ExpressionReducer(config: TableConfig)
+ extends RelOptPlanner.Executor with Compiler[MapFunction[Row, Row]] {
+
+ private val EMPTY_ROW_INFO = TypeConverter.DEFAULT_ROW_TYPE
+ private val EMPTY_ROW = new Row(0)
+
+ override def reduce(
+ rexBuilder: RexBuilder,
+ constExprs: util.List[RexNode],
+ reducedValues: util.List[RexNode]): Unit = {
+
+ val typeFactory = rexBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+
+ val literals = constExprs.asScala.map(e => (e.getType.getSqlTypeName, e)).flatMap {
+
+ // we need to cast here for RexBuilder.makeLiteral
+ case (SqlTypeName.DATE, e) =>
+ Some(
+ rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO), e)
+ )
+ case (SqlTypeName.TIME, e) =>
+ Some(
+ rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO), e)
+ )
+ case (SqlTypeName.TIMESTAMP, e) =>
+ Some(
+ rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.LONG_TYPE_INFO), e)
+ )
+
+ // we don't support object literals yet, we skip those constant expressions
+ case (SqlTypeName.ANY, _) | (SqlTypeName.ROW, _) => None
+
+ case (_, e) => Some(e)
+ }
+
+ val literalTypes = literals.map(e => FlinkTypeFactory.toTypeInfo(e.getType))
+ val resultType = new RowTypeInfo(literalTypes)
+
+ // generate MapFunction
+ val generator = new CodeGenerator(config, false, EMPTY_ROW_INFO)
+
+ val result = generator.generateResultExpression(
+ resultType,
+ resultType.getFieldNames,
+ literals)
+
+ val generatedFunction = generator.generateFunction[MapFunction[Row, Row]](
+ "ExpressionReducer",
+ classOf[MapFunction[Row, Row]],
+ s"""
+ |${result.code}
+ |return ${result.resultTerm};
+ |""".stripMargin,
+ resultType.asInstanceOf[TypeInformation[Any]])
+
+ val clazz = compile(getClass.getClassLoader, generatedFunction.name, generatedFunction.code)
+ val function = clazz.newInstance()
+
+ // execute
+ val reduced = function.map(EMPTY_ROW)
+
+ // add the reduced results or keep them unreduced
+ var i = 0
+ var reducedIdx = 0
+ while (i < constExprs.size()) {
+ val unreduced = constExprs.get(i)
+ unreduced.getType.getSqlTypeName match {
+ // we insert the original expression for object literals
+ case SqlTypeName.ANY | SqlTypeName.ROW =>
+ reducedValues.add(unreduced)
+ case _ =>
+ val literal = rexBuilder.makeLiteral(
+ reduced.productElement(reducedIdx),
+ unreduced.getType,
+ true)
+ reducedValues.add(literal)
+ reducedIdx += 1
+ }
+ i += 1
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index 638deac..5653083 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -75,11 +75,10 @@ object FlinkRuleSets {
SortRemoveRule.INSTANCE,
// simplify expressions rules
- // TODO uncomment if FLINK-4825 is solved
- // ReduceExpressionsRule.FILTER_INSTANCE,
- // ReduceExpressionsRule.PROJECT_INSTANCE,
- // ReduceExpressionsRule.CALC_INSTANCE,
- // ReduceExpressionsRule.JOIN_INSTANCE,
+ ReduceExpressionsRule.FILTER_INSTANCE,
+ ReduceExpressionsRule.PROJECT_INSTANCE,
+ ReduceExpressionsRule.CALC_INSTANCE,
+ ReduceExpressionsRule.JOIN_INSTANCE,
// prune empty results rules
PruneEmptyRules.AGGREGATE_INSTANCE,
@@ -137,10 +136,9 @@ object FlinkRuleSets {
ProjectRemoveRule.INSTANCE,
// simplify expressions rules
- // TODO uncomment if FLINK-4825 is solved
- // ReduceExpressionsRule.FILTER_INSTANCE,
- // ReduceExpressionsRule.PROJECT_INSTANCE,
- // ReduceExpressionsRule.CALC_INSTANCE,
+ ReduceExpressionsRule.FILTER_INSTANCE,
+ ReduceExpressionsRule.PROJECT_INSTANCE,
+ ReduceExpressionsRule.CALC_INSTANCE,
// merge and push unions rules
UnionEliminatorRule.INSTANCE,
http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/Compiler.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/Compiler.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/Compiler.scala
deleted file mode 100644
index c5d566e..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/Compiler.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.common.InvalidProgramException
-import org.apache.flink.api.common.functions.Function
-import org.codehaus.commons.compiler.CompileException
-import org.codehaus.janino.SimpleCompiler
-
-trait Compiler[T] {
-
- @throws(classOf[CompileException])
- def compile(cl: ClassLoader, name: String, code: String): Class[T] = {
- require(cl != null, "Classloader must not be null.")
- val compiler = new SimpleCompiler()
- compiler.setParentClassLoader(cl)
- try {
- compiler.cook(code)
- } catch {
- case e: CompileException =>
- throw new InvalidProgramException("Table program cannot be compiled. " +
- "This is a bug. Please file an issue.", e)
- }
- compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]]
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala
index c6a8fe8..2e57a0f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala
@@ -21,6 +21,7 @@ package org.apache.flink.api.table.runtime
import org.apache.flink.api.common.functions.{FlatJoinFunction, RichFlatJoinFunction}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.api.table.codegen.Compiler
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector
import org.slf4j.LoggerFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
index 2e942eb..e228e2b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
@@ -21,6 +21,7 @@ package org.apache.flink.api.table.runtime
import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.api.table.codegen.Compiler
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector
import org.slf4j.LoggerFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
index 944b415..9fd1876 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
@@ -21,6 +21,7 @@ package org.apache.flink.api.table.runtime
import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.api.table.codegen.Compiler
import org.apache.flink.configuration.Configuration
import org.slf4j.LoggerFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
index 34bff15..2a4be46 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.table.runtime.io
import org.apache.flink.api.common.io.{GenericInputFormat, NonParallelInput}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.api.table.runtime.Compiler
+import org.apache.flink.api.table.codegen.Compiler
import org.apache.flink.core.io.GenericInputSplit
import org.slf4j.LoggerFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
index 5bc6e4a..7b2b497 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
@@ -54,7 +54,7 @@ class SetOperatorsTest extends TableTestBase {
term("join", "b_long", "b_int", "b_string", "a_long"),
term("joinType", "InnerJoin")
),
- term("select", "a_long", "true AS $f0")
+ term("select", "true AS $f0", "a_long")
),
term("groupBy", "a_long"),
term("select", "a_long", "MIN($f0) AS $f1")
http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
index 9694687..b8156a2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
@@ -21,10 +21,8 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.utils.TableTestBase
import org.apache.flink.api.table.utils.TableTestUtil._
-import org.junit.{Ignore, Test}
+import org.junit.Test
-// TODO enable if FLINK-4825 is solved
-@Ignore
class ExpressionReductionTest extends TableTestBase {
@Test
@@ -64,7 +62,7 @@ class ExpressionReductionTest extends TableTestBase {
"true AS EXPR$9",
"2 AS EXPR$10",
"true AS EXPR$11",
- "'TRUEX' AS EXPR$12"
+ "'trueX' AS EXPR$12"
),
term("where", ">(a, 8)")
)
@@ -109,7 +107,7 @@ class ExpressionReductionTest extends TableTestBase {
"true AS EXPR$9",
"2 AS EXPR$10",
"true AS EXPR$11",
- "'TRUEX' AS EXPR$12"
+ "'trueX' AS EXPR$12"
)
)
@@ -164,7 +162,7 @@ class ExpressionReductionTest extends TableTestBase {
"false AS _c5",
"true AS _c6",
"2E0 AS _c7",
- "'TRUEX' AS _c8"
+ "'trueX' AS _c8"
),
term("where", ">(a, 8)")
)
@@ -200,7 +198,7 @@ class ExpressionReductionTest extends TableTestBase {
"false AS _c5",
"true AS _c6",
"2E0 AS _c7",
- "'TRUEX' AS _c8"
+ "'trueX' AS _c8"
)
)
@@ -262,7 +260,7 @@ class ExpressionReductionTest extends TableTestBase {
"true AS EXPR$9",
"2 AS EXPR$10",
"true AS EXPR$11",
- "'TRUEX' AS EXPR$12"
+ "'trueX' AS EXPR$12"
),
term("where", ">(a, 8)")
)
@@ -307,7 +305,7 @@ class ExpressionReductionTest extends TableTestBase {
"true AS EXPR$9",
"2 AS EXPR$10",
"true AS EXPR$11",
- "'TRUEX' AS EXPR$12"
+ "'trueX' AS EXPR$12"
)
)
@@ -362,7 +360,7 @@ class ExpressionReductionTest extends TableTestBase {
"false AS _c5",
"true AS _c6",
"2E0 AS _c7",
- "'TRUEX' AS _c8"
+ "'trueX' AS _c8"
),
term("where", ">(a, 8)")
)
@@ -398,7 +396,7 @@ class ExpressionReductionTest extends TableTestBase {
"false AS _c5",
"true AS _c6",
"2E0 AS _c7",
- "'TRUEX' AS _c8"
+ "'trueX' AS _c8"
)
)
http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala
index 1f5a069..7ad2212 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala
@@ -129,7 +129,7 @@ class ScalarOperatorsTest extends ExpressionTestBase {
testSqlApi(
"CASE 1 WHEN 1, 2 THEN '1 or 2' WHEN 2 THEN 'not possible' WHEN 3, 2 THEN '3' " +
"ELSE 'none of the above' END",
- "1 or 2")
+ "1 or 2 ")
testSqlApi("CASE WHEN 'a'='a' THEN 1 END", "1")
testSqlApi("CASE 2 WHEN 1 THEN 'a' WHEN 2 THEN 'bcd' END", "bcd")
testSqlApi("CASE f2 WHEN 1 THEN 11 WHEN 2 THEN 4 ELSE NULL END", "11")
http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
index d34e335..84b61da 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
@@ -28,12 +28,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.{DataSet => JDataSet}
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.table._
-import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedFunction}
+import org.apache.flink.api.table.codegen.{CodeGenerator, Compiler, GeneratedFunction}
import org.apache.flink.api.table.expressions.{Expression, ExpressionParser}
import org.apache.flink.api.table.functions.UserDefinedFunction
import org.apache.flink.api.table.plan.nodes.dataset.{DataSetCalc, DataSetConvention}
import org.apache.flink.api.table.plan.rules.FlinkRuleSets
-import org.apache.flink.api.table.runtime.Compiler
import org.apache.flink.api.table.typeutils.RowTypeInfo
import org.junit.Assert._
import org.junit.{After, Before}
[2/3] flink git commit: [FLINK-5184] [table] Fix compareSerialized()
of RowComparator.
Posted by fh...@apache.org.
[FLINK-5184] [table] Fix compareSerialized() of RowComparator.
This closes #2894
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0bb68479
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0bb68479
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0bb68479
Branch: refs/heads/master
Commit: 0bb684797dfb3e03dd4f9761a6bf1eb8ce9d1c0d
Parents: db441de
Author: godfreyhe <go...@163.com>
Authored: Tue Nov 29 19:27:58 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Nov 29 13:19:26 2016 +0100
----------------------------------------------------------------------
.../api/table/typeutils/RowComparator.scala | 16 +++-
.../flink/api/table/typeutils/RowTypeInfo.scala | 1 +
.../RowComparatorWithManyFieldsTest.scala | 82 ++++++++++++++++++++
3 files changed, 95 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0bb68479/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowComparator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowComparator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowComparator.scala
index cc97656..8bbe4d8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowComparator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowComparator.scala
@@ -32,6 +32,8 @@ import org.apache.flink.types.KeyFieldOutOfBoundsException
* Comparator for [[Row]].
*/
class RowComparator private (
+ /** the number of fields of the Row */
+ val numberOfFields: Int,
/** key positions describe which fields are keys in what order */
val keyPositions: Array[Int],
/** null-aware comparators for the key fields, in the same order as the key fields */
@@ -43,8 +45,8 @@ class RowComparator private (
extends CompositeTypeComparator[Row] with Serializable {
// null masks for serialized comparison
- private val nullMask1 = new Array[Boolean](serializers.length)
- private val nullMask2 = new Array[Boolean](serializers.length)
+ private val nullMask1 = new Array[Boolean](numberOfFields)
+ private val nullMask2 = new Array[Boolean](numberOfFields)
// cache for the deserialized key field objects
@transient
@@ -63,10 +65,12 @@ class RowComparator private (
* Intermediate constructor for creating auxiliary fields.
*/
def this(
+ numberOfFields: Int,
keyPositions: Array[Int],
comparators: Array[NullAwareComparator[Any]],
serializers: Array[TypeSerializer[Any]]) = {
this(
+ numberOfFields,
keyPositions,
comparators,
serializers,
@@ -76,6 +80,7 @@ class RowComparator private (
/**
* General constructor for RowComparator.
*
+ * @param numberOfFields the number of fields of the Row
* @param keyPositions key positions describe which fields are keys in what order
* @param comparators non-null-aware comparators for the key fields, in the same order as
* the key fields
@@ -83,11 +88,13 @@ class RowComparator private (
* @param orders sorting orders for the fields
*/
def this(
+ numberOfFields: Int,
keyPositions: Array[Int],
comparators: Array[TypeComparator[Any]],
serializers: Array[TypeSerializer[Any]],
orders: Array[Boolean]) = {
this(
+ numberOfFields,
keyPositions,
makeNullAware(comparators, orders),
serializers)
@@ -133,8 +140,8 @@ class RowComparator private (
val len = serializers.length
val keyLen = keyPositions.length
- readIntoNullMask(len, firstSource, nullMask1)
- readIntoNullMask(len, secondSource, nullMask2)
+ readIntoNullMask(numberOfFields, firstSource, nullMask1)
+ readIntoNullMask(numberOfFields, secondSource, nullMask2)
// deserialize
var i = 0
@@ -217,6 +224,7 @@ class RowComparator private (
val serializersCopy = serializers.map(_.duplicate())
new RowComparator(
+ numberOfFields,
keyPositions,
comparatorsCopy,
serializersCopy,
http://git-wip-us.apache.org/repos/asf/flink/blob/0bb68479/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala
index 489edca..711bb49 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala
@@ -96,6 +96,7 @@ class RowTypeInfo(fieldTypes: Seq[TypeInformation[_]])
val maxIndex = logicalKeyFields.max
new RowComparator(
+ getArity,
logicalKeyFields.toArray,
fieldComparators.toArray.asInstanceOf[Array[TypeComparator[Any]]],
types.take(maxIndex + 1).map(_.createSerializer(config).asInstanceOf[TypeSerializer[Any]]),
http://git-wip-us.apache.org/repos/asf/flink/blob/0bb68479/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorWithManyFieldsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorWithManyFieldsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorWithManyFieldsTest.scala
new file mode 100644
index 0000000..33715c1
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorWithManyFieldsTest.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.typeutils
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeutils.{ComparatorTestBase, TypeComparator, TypeSerializer}
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+import org.junit.Assert._
+
+/**
+ * Tests [[RowComparator]] for wide rows.
+ */
+class RowComparatorWithManyFieldsTest extends ComparatorTestBase[Row] {
+ val numberOfFields = 10
+ val fieldTypes = new Array[TypeInformation[_]](numberOfFields)
+ for (i <- 0 until numberOfFields) {
+ fieldTypes(i) = BasicTypeInfo.STRING_TYPE_INFO
+ }
+ val typeInfo = new RowTypeInfo(fieldTypes)
+
+ val data: Array[Row] = Array(
+ createRow(Array(null, "b0", "c0", "d0", "e0", "f0", "g0", "h0", "i0", "j0")),
+ createRow(Array("a1", "b1", "c1", "d1", "e1", "f1", "g1", "h1", "i1", "j1")),
+ createRow(Array("a2", "b2", "c2", "d2", "e2", "f2", "g2", "h2", "i2", "j2")),
+ createRow(Array("a3", "b3", "c3", "d3", "e3", "f3", "g3", "h3", "i3", "j3"))
+ )
+
+ override protected def deepEquals(message: String, should: Row, is: Row): Unit = {
+ val arity = should.productArity
+ assertEquals(message, arity, is.productArity)
+ var index = 0
+ while (index < arity) {
+ val copiedValue: Any = should.productElement(index)
+ val element: Any = is.productElement(index)
+ assertEquals(message, element, copiedValue)
+ index += 1
+ }
+ }
+
+ override protected def createComparator(ascending: Boolean): TypeComparator[Row] = {
+ typeInfo.createComparator(
+ Array(0),
+ Array(ascending),
+ 0,
+ new ExecutionConfig())
+ }
+
+ override protected def createSerializer(): TypeSerializer[Row] = {
+ typeInfo.createSerializer(new ExecutionConfig())
+ }
+
+ override protected def getSortedTestData: Array[Row] = {
+ data
+ }
+
+ override protected def supportsNullKeys: Boolean = true
+
+ private def createRow(values: Array[_]): Row = {
+ Preconditions.checkArgument(values.length == numberOfFields)
+ val r: Row = new Row(numberOfFields)
+ values.zipWithIndex.foreach { case (e, i) => r.setField(i, e) }
+ r
+ }
+}
[3/3] flink git commit: [FLINK-4832] [table] Fix global aggregation
of empty tables (Count/Sum = 0).
Posted by fh...@apache.org.
[FLINK-4832] [table] Fix global aggregation of empty tables (Count/Sum = 0).
- Fix injects a union with a null record before the global aggregation.
This closes #2840
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ecfb5b5f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ecfb5b5f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ecfb5b5f
Branch: refs/heads/master
Commit: ecfb5b5f6fd6bf1555c7240d77dd9aca982f4416
Parents: 0bb6847
Author: Anton Mushin <an...@epam.com>
Authored: Mon Nov 21 15:49:41 2016 +0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Nov 29 13:30:51 2016 +0100
----------------------------------------------------------------------
.../api/table/plan/rules/FlinkRuleSets.scala | 1 +
.../rules/dataSet/DataSetAggregateRule.scala | 6 +
.../DataSetAggregateWithNullValuesRule.scala | 96 +++++++
.../scala/batch/sql/AggregationsITCase.scala | 39 +++
.../flink/api/table/AggregationTest.scala | 261 +++++++++++++++++++
.../flink/api/table/utils/TableTestBase.scala | 9 +-
6 files changed, 410 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ecfb5b5f/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index 5653083..26c025e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -98,6 +98,7 @@ object FlinkRuleSets {
// translate to Flink DataSet nodes
DataSetAggregateRule.INSTANCE,
+ DataSetAggregateWithNullValuesRule.INSTANCE,
DataSetCalcRule.INSTANCE,
DataSetJoinRule.INSTANCE,
DataSetScanRule.INSTANCE,
http://git-wip-us.apache.org/repos/asf/flink/blob/ecfb5b5f/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
index 72ed27e..0311c48 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
@@ -37,6 +37,12 @@ class DataSetAggregateRule
override def matches(call: RelOptRuleCall): Boolean = {
val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]
+ //for non grouped agg sets should attach null row to source data
+ //need apply DataSetAggregateWithNullValuesRule
+ if (agg.getGroupSet.isEmpty) {
+ return false
+ }
+
// check if we have distinct aggregates
val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
if (distinctAggs) {
http://git-wip-us.apache.org/repos/asf/flink/blob/ecfb5b5f/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
new file mode 100644
index 0000000..54cb8d1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan._
+import scala.collection.JavaConversions._
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{LogicalValues, LogicalUnion, LogicalAggregate}
+import org.apache.calcite.rex.RexLiteral
+import org.apache.flink.api.table._
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention}
+
+/**
+ * Rule for insert [[Row]] with null records into a [[DataSetAggregate]]
+ * Rule apply for non grouped aggregate query
+ */
+class DataSetAggregateWithNullValuesRule
+ extends ConverterRule(
+ classOf[LogicalAggregate],
+ Convention.NONE,
+ DataSetConvention.INSTANCE,
+ "DataSetAggregateWithNullValuesRule")
+{
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]
+
+ //for grouped agg sets shouldn't attach of null row
+ //need apply other rules. e.g. [[DataSetAggregateRule]]
+ if (!agg.getGroupSet.isEmpty) {
+ return false
+ }
+
+ // check if we have distinct aggregates
+ val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
+ if (distinctAggs) {
+ throw TableException("DISTINCT aggregates are currently not supported.")
+ }
+
+ // check if we have grouping sets
+ val groupSets = agg.getGroupSets.size() == 0 || agg.getGroupSets.get(0) != agg.getGroupSet
+ if (groupSets || agg.indicator) {
+ throw TableException("GROUPING SETS are currently not supported.")
+ }
+ !distinctAggs && !groupSets && !agg.indicator
+ }
+
+ override def convert(rel: RelNode): RelNode = {
+ val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+ val cluster: RelOptCluster = rel.getCluster
+
+ val fieldTypes = agg.getInput.getRowType.getFieldList.map(_.getType)
+ val nullLiterals: ImmutableList[ImmutableList[RexLiteral]] =
+ ImmutableList.of(ImmutableList.copyOf[RexLiteral](
+ for (fieldType <- fieldTypes)
+ yield {
+ cluster.getRexBuilder.
+ makeLiteral(null, fieldType, false).asInstanceOf[RexLiteral]
+ }))
+
+ val logicalValues = LogicalValues.create(cluster, agg.getInput.getRowType, nullLiterals)
+ val logicalUnion = LogicalUnion.create(List(logicalValues, agg.getInput), true)
+
+ new DataSetAggregate(
+ cluster,
+ traitSet,
+ RelOptRule.convert(logicalUnion, DataSetConvention.INSTANCE),
+ agg.getNamedAggCalls,
+ rel.getRowType,
+ agg.getInput.getRowType,
+ agg.getGroupSet.toArray
+ )
+ }
+}
+
+object DataSetAggregateWithNullValuesRule {
+ val INSTANCE: RelOptRule = new DataSetAggregateWithNullValuesRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecfb5b5f/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala
index 2dce751..35bb7dc 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala
@@ -258,4 +258,43 @@ class AggregationsITCase(
// must fail. grouping sets are not supported
tEnv.sql(sqlQuery).toDataSet[Row]
}
+
+ @Test
+ def testAggregateEmptyDataSets(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT avg(a), sum(a), count(b) " +
+ "FROM MyTable where a = 4 group by a"
+
+ val sqlQuery2 = "SELECT avg(a), sum(a), count(b) " +
+ "FROM MyTable where a = 4"
+
+ val sqlQuery3 = "SELECT avg(a), sum(a), count(b) " +
+ "FROM MyTable"
+
+ val ds = env.fromElements(
+ (1: Byte, 1: Short),
+ (2: Byte, 2: Short))
+ .toTable(tEnv, 'a, 'b)
+
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+ val result2 = tEnv.sql(sqlQuery2)
+ val result3 = tEnv.sql(sqlQuery3)
+
+ val results = result.toDataSet[Row].collect()
+ val expected = Seq.empty
+ val results2 = result2.toDataSet[Row].collect()
+ val expected2 = "null,null,0"
+ val results3 = result3.toDataSet[Row].collect()
+ val expected3 = "1,3,2"
+
+ assert(results.equals(expected),
+ "Empty result is expected for grouped set, but actual: " + results)
+ TestBaseUtils.compareResultAsText(results2.asJava, expected2)
+ TestBaseUtils.compareResultAsText(results3.asJava, expected3)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecfb5b5f/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/AggregationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/AggregationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/AggregationTest.scala
new file mode 100644
index 0000000..6c9d2e8
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/AggregationTest.scala
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.utils.TableTestBase
+import org.apache.flink.api.table.utils.TableTestUtil._
+import org.junit.Test
+
+/**
+ * Test for testing aggregate plans.
+ */
+class AggregationTest extends TableTestBase {
+
+ @Test
+ def testAggregateQueryBatchSQL(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable"
+
+ val setValues = unaryNode(
+ "DataSetValues",
+ batchTableNode(0),
+ tuples(List(null,null,null)),
+ term("values","a","b","c")
+ )
+ val union = unaryNode(
+ "DataSetUnion",
+ setValues,
+ term("union","a","b","c")
+ )
+
+ val aggregate = unaryNode(
+ "DataSetAggregate",
+ union,
+ term("select",
+ "AVG(a) AS EXPR$0",
+ "SUM(b) AS EXPR$1",
+ "COUNT(c) AS EXPR$2")
+ )
+ util.verifySql(sqlQuery, aggregate)
+ }
+
+ @Test
+ def testAggregateWithFilterQueryBatchSQL(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable WHERE a = 1"
+
+ val calcNode = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b", "c"),
+ term("where", "=(a, 1)")
+ )
+
+ val setValues = unaryNode(
+ "DataSetValues",
+ calcNode,
+ tuples(List(null,null,null)),
+ term("values","a","b","c")
+ )
+
+ val union = unaryNode(
+ "DataSetUnion",
+ setValues,
+ term("union","a","b","c")
+ )
+
+ val aggregate = unaryNode(
+ "DataSetAggregate",
+ union,
+ term("select",
+ "AVG(a) AS EXPR$0",
+ "SUM(b) AS EXPR$1",
+ "COUNT(c) AS EXPR$2")
+ )
+ util.verifySql(sqlQuery, aggregate)
+ }
+
+ @Test
+ def testAggregateGroupQueryBatchSQL(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable GROUP BY a"
+
+ val aggregate = unaryNode(
+ "DataSetAggregate",
+ batchTableNode(0),
+ term("groupBy", "a"),
+ term("select",
+ "a",
+ "AVG(a) AS EXPR$0",
+ "SUM(b) AS EXPR$1",
+ "COUNT(c) AS EXPR$2")
+ )
+ val expected = unaryNode(
+ "DataSetCalc",
+ aggregate,
+ term("select",
+ "EXPR$0",
+ "EXPR$1",
+ "EXPR$2")
+ )
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testAggregateGroupWithFilterQueryBatchSQL(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable WHERE a = 1 GROUP BY a"
+
+ val calcNode = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select","a", "b", "c") ,
+ term("where","=(a, 1)")
+ )
+
+ val aggregate = unaryNode(
+ "DataSetAggregate",
+ calcNode,
+ term("groupBy", "a"),
+ term("select",
+ "a",
+ "AVG(a) AS EXPR$0",
+ "SUM(b) AS EXPR$1",
+ "COUNT(c) AS EXPR$2")
+ )
+ val expected = unaryNode(
+ "DataSetCalc",
+ aggregate,
+ term("select",
+ "EXPR$0",
+ "EXPR$1",
+ "EXPR$2")
+ )
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testAggregateGroupWithFilterTableApi(): Unit = {
+
+ val util = batchTestUtil()
+ val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+ val resultTable = sourceTable.groupBy('a)
+ .select('a, 'a.avg, 'b.sum, 'c.count)
+ .where('a === 1)
+
+ val calcNode = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b", "c"),
+ term("where", "=(a, 1)")
+ )
+
+ val expected = unaryNode(
+ "DataSetAggregate",
+ calcNode,
+ term("groupBy", "a"),
+ term("select",
+ "a",
+ "AVG(a) AS TMP_0",
+ "SUM(b) AS TMP_1",
+ "COUNT(c) AS TMP_2")
+ )
+
+ util.verifyTable(resultTable,expected)
+ }
+
+ @Test
+ def testAggregateTableApi(): Unit = {
+ val util = batchTestUtil()
+ val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+ val resultTable = sourceTable.select('a.avg,'b.sum,'c.count)
+
+ val setValues = unaryNode(
+ "DataSetValues",
+ batchTableNode(0),
+ tuples(List(null,null,null)),
+ term("values","a","b","c")
+ )
+ val union = unaryNode(
+ "DataSetUnion",
+ setValues,
+ term("union","a","b","c")
+ )
+
+ val expected = unaryNode(
+ "DataSetAggregate",
+ union,
+ term("select",
+ "AVG(a) AS TMP_0",
+ "SUM(b) AS TMP_1",
+ "COUNT(c) AS TMP_2")
+ )
+ util.verifyTable(resultTable, expected)
+ }
+
+ @Test
+ def testAggregateWithFilterTableApi(): Unit = {
+ val util = batchTestUtil()
+ val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+ val resultTable = sourceTable.select('a,'b,'c).where('a === 1)
+ .select('a.avg,'b.sum,'c.count)
+
+ val calcNode = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b", "c"),
+ term("where", "=(a, 1)")
+ )
+
+ val setValues = unaryNode(
+ "DataSetValues",
+ calcNode,
+ tuples(List(null,null,null)),
+ term("values","a","b","c")
+ )
+
+ val union = unaryNode(
+ "DataSetUnion",
+ setValues,
+ term("union","a","b","c")
+ )
+
+ val expected = unaryNode(
+ "DataSetAggregate",
+ union,
+ term("select",
+ "AVG(a) AS TMP_0",
+ "SUM(b) AS TMP_1",
+ "COUNT(c) AS TMP_2")
+ )
+
+ util.verifyTable(resultTable, expected)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecfb5b5f/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
index 2ea15a0..539bb61 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
@@ -70,20 +70,25 @@ object TableTestUtil {
def unaryNode(node: String, input: String, term: String*): String = {
s"""$node(${term.mkString(", ")})
|$input
- |""".stripMargin
+ |""".stripMargin.stripLineEnd
}
def binaryNode(node: String, left: String, right: String, term: String*): String = {
s"""$node(${term.mkString(", ")})
|$left
|$right
- |""".stripMargin
+ |""".stripMargin.stripLineEnd
}
def term(term: AnyRef, value: AnyRef*): String = {
s"$term=[${value.mkString(", ")}]"
}
+ def tuples(value:List[AnyRef]*): String={
+ val listValues = value.map( listValue => s"{ ${listValue.mkString(", ")} }")
+ term("tuples","[" + listValues.mkString(", ") + "]")
+ }
+
def batchTableNode(idx: Int): String = {
s"DataSetScan(table=[[_DataSetTable_$idx]])"
}