You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/11/29 14:42:55 UTC

[1/3] flink git commit: [FLINK-4825] [table] Implement a RexExecutor that uses Flink's code generation.

Repository: flink
Updated Branches:
  refs/heads/master 910f733f5 -> ecfb5b5f6


[FLINK-4825] [table] Implement a RexExecutor that uses Flink's code generation.

This closes #2884
This closes #2874 (closing PR with Public API breaking changes)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db441dec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db441dec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db441dec

Branch: refs/heads/master
Commit: db441decb41bf856400766023bfc7de77d6041aa
Parents: 910f733
Author: twalthr <tw...@apache.org>
Authored: Mon Nov 28 12:18:36 2016 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Nov 29 13:14:40 2016 +0100

----------------------------------------------------------------------
 .../flink/api/table/FlinkRelBuilder.scala       |   1 +
 .../flink/api/table/TableEnvironment.scala      |   5 +-
 .../flink/api/table/codegen/Compiler.scala      |  41 +++++++
 .../api/table/codegen/ExpressionReducer.scala   | 117 +++++++++++++++++++
 .../api/table/plan/rules/FlinkRuleSets.scala    |  16 ++-
 .../flink/api/table/runtime/Compiler.scala      |  42 -------
 .../api/table/runtime/FlatJoinRunner.scala      |   1 +
 .../flink/api/table/runtime/FlatMapRunner.scala |   1 +
 .../flink/api/table/runtime/MapRunner.scala     |   1 +
 .../table/runtime/io/ValuesInputFormat.scala    |   2 +-
 .../api/scala/batch/sql/SetOperatorsTest.scala  |   2 +-
 .../api/table/ExpressionReductionTest.scala     |  20 ++--
 .../table/expressions/ScalarOperatorsTest.scala |   2 +-
 .../expressions/utils/ExpressionTestBase.scala  |   3 +-
 14 files changed, 185 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
index ea4eed0..da44ebb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
@@ -80,6 +80,7 @@ object FlinkRelBuilder {
 
     // create context instances with Flink type factory
     val planner = new VolcanoPlanner(Contexts.empty())
+    planner.setExecutor(config.getExecutor)
     planner.addRelTraitDef(ConventionTraitDef.INSTANCE)
     val cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory))
     val calciteSchema = CalciteSchema.from(config.getDefaultSchema)

http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
index e8734f5..7b2b738 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
@@ -25,7 +25,7 @@ import org.apache.calcite.config.Lex
 import org.apache.calcite.plan.RelOptPlanner
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex.RexExecutorImpl
-import org.apache.calcite.schema.{Schemas, SchemaPlus}
+import org.apache.calcite.schema.{SchemaPlus, Schemas}
 import org.apache.calcite.schema.impl.AbstractTable
 import org.apache.calcite.sql.SqlOperatorTable
 import org.apache.calcite.sql.parser.SqlParser
@@ -38,6 +38,7 @@ import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv}
 import org.apache.flink.api.scala.table.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv}
+import org.apache.flink.api.table.codegen.ExpressionReducer
 import org.apache.flink.api.table.expressions.{Alias, Expression, UnresolvedFieldReference}
 import org.apache.flink.api.table.functions.{ScalarFunction, UserDefinedFunction}
 import org.apache.flink.api.table.plan.cost.DataSetCostFactory
@@ -71,7 +72,7 @@ abstract class TableEnvironment(val config: TableConfig) {
     .typeSystem(new FlinkTypeSystem)
     .operatorTable(getSqlOperatorTable)
     // set the executor to evaluate constant expressions
-    .executor(new RexExecutorImpl(Schemas.createDataContext(null)))
+    .executor(new ExpressionReducer(config))
     .build
 
   // the builder for Calcite RelNodes, Calcite's representation of a relational expression tree.

http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Compiler.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Compiler.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Compiler.scala
new file mode 100644
index 0000000..fce13ba
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Compiler.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.codegen
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.codehaus.commons.compiler.CompileException
+import org.codehaus.janino.SimpleCompiler
+
+trait Compiler[T] {
+
+  @throws(classOf[CompileException])
+  def compile(cl: ClassLoader, name: String, code: String): Class[T] = {
+    require(cl != null, "Classloader must not be null.")
+    val compiler = new SimpleCompiler()
+    compiler.setParentClassLoader(cl)
+    try {
+      compiler.cook(code)
+    } catch {
+      case e: CompileException =>
+        throw new InvalidProgramException("Table program cannot be compiled. " +
+          "This is a bug. Please file an issue.", e)
+    }
+    compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]]
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala
new file mode 100644
index 0000000..74756ef
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.codegen
+
+import java.util
+
+import org.apache.calcite.plan.RelOptPlanner
+import org.apache.calcite.rex.{RexBuilder, RexNode}
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.table.typeutils.{RowTypeInfo, TypeConverter}
+import org.apache.flink.api.table.{FlinkTypeFactory, Row, TableConfig}
+
+import scala.collection.JavaConverters._
+
+/**
+  * Evaluates constant expressions using Flink's [[CodeGenerator]].
+  */
+class ExpressionReducer(config: TableConfig)
+  extends RelOptPlanner.Executor with Compiler[MapFunction[Row, Row]] {
+
+  private val EMPTY_ROW_INFO = TypeConverter.DEFAULT_ROW_TYPE
+  private val EMPTY_ROW = new Row(0)
+
+  override def reduce(
+    rexBuilder: RexBuilder,
+    constExprs: util.List[RexNode],
+    reducedValues: util.List[RexNode]): Unit = {
+
+    val typeFactory = rexBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+
+    val literals = constExprs.asScala.map(e => (e.getType.getSqlTypeName, e)).flatMap {
+
+      // we need to cast here for RexBuilder.makeLiteral
+      case (SqlTypeName.DATE, e) =>
+        Some(
+          rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO), e)
+        )
+      case (SqlTypeName.TIME, e) =>
+        Some(
+          rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO), e)
+        )
+      case (SqlTypeName.TIMESTAMP, e) =>
+        Some(
+          rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.LONG_TYPE_INFO), e)
+        )
+
+      // we don't support object literals yet, we skip those constant expressions
+      case (SqlTypeName.ANY, _) | (SqlTypeName.ROW, _) => None
+
+      case (_, e) => Some(e)
+    }
+
+    val literalTypes = literals.map(e => FlinkTypeFactory.toTypeInfo(e.getType))
+    val resultType = new RowTypeInfo(literalTypes)
+
+    // generate MapFunction
+    val generator = new CodeGenerator(config, false, EMPTY_ROW_INFO)
+
+    val result = generator.generateResultExpression(
+      resultType,
+      resultType.getFieldNames,
+      literals)
+
+    val generatedFunction = generator.generateFunction[MapFunction[Row, Row]](
+      "ExpressionReducer",
+      classOf[MapFunction[Row, Row]],
+      s"""
+        |${result.code}
+        |return ${result.resultTerm};
+        |""".stripMargin,
+      resultType.asInstanceOf[TypeInformation[Any]])
+
+    val clazz = compile(getClass.getClassLoader, generatedFunction.name, generatedFunction.code)
+    val function = clazz.newInstance()
+
+    // execute
+    val reduced = function.map(EMPTY_ROW)
+
+    // add the reduced results or keep them unreduced
+    var i = 0
+    var reducedIdx = 0
+    while (i < constExprs.size()) {
+      val unreduced = constExprs.get(i)
+      unreduced.getType.getSqlTypeName match {
+        // we insert the original expression for object literals
+        case SqlTypeName.ANY | SqlTypeName.ROW =>
+          reducedValues.add(unreduced)
+        case _ =>
+          val literal = rexBuilder.makeLiteral(
+            reduced.productElement(reducedIdx),
+            unreduced.getType,
+            true)
+          reducedValues.add(literal)
+          reducedIdx += 1
+      }
+      i += 1
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index 638deac..5653083 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -75,11 +75,10 @@ object FlinkRuleSets {
     SortRemoveRule.INSTANCE,
 
     // simplify expressions rules
-    // TODO uncomment if FLINK-4825 is solved
-    // ReduceExpressionsRule.FILTER_INSTANCE,
-    // ReduceExpressionsRule.PROJECT_INSTANCE,
-    // ReduceExpressionsRule.CALC_INSTANCE,
-    // ReduceExpressionsRule.JOIN_INSTANCE,
+    ReduceExpressionsRule.FILTER_INSTANCE,
+    ReduceExpressionsRule.PROJECT_INSTANCE,
+    ReduceExpressionsRule.CALC_INSTANCE,
+    ReduceExpressionsRule.JOIN_INSTANCE,
 
     // prune empty results rules
     PruneEmptyRules.AGGREGATE_INSTANCE,
@@ -137,10 +136,9 @@ object FlinkRuleSets {
       ProjectRemoveRule.INSTANCE,
 
       // simplify expressions rules
-      // TODO uncomment if FLINK-4825 is solved
-      // ReduceExpressionsRule.FILTER_INSTANCE,
-      // ReduceExpressionsRule.PROJECT_INSTANCE,
-      // ReduceExpressionsRule.CALC_INSTANCE,
+      ReduceExpressionsRule.FILTER_INSTANCE,
+      ReduceExpressionsRule.PROJECT_INSTANCE,
+      ReduceExpressionsRule.CALC_INSTANCE,
 
       // merge and push unions rules
       UnionEliminatorRule.INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/Compiler.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/Compiler.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/Compiler.scala
deleted file mode 100644
index c5d566e..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/Compiler.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.common.InvalidProgramException
-import org.apache.flink.api.common.functions.Function
-import org.codehaus.commons.compiler.CompileException
-import org.codehaus.janino.SimpleCompiler
-
-trait Compiler[T] {
-
-  @throws(classOf[CompileException])
-  def compile(cl: ClassLoader, name: String, code: String): Class[T] = {
-    require(cl != null, "Classloader must not be null.")
-    val compiler = new SimpleCompiler()
-    compiler.setParentClassLoader(cl)
-    try {
-      compiler.cook(code)
-    } catch {
-      case e: CompileException =>
-        throw new InvalidProgramException("Table program cannot be compiled. " +
-          "This is a bug. Please file an issue.", e)
-    }
-    compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]]
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala
index c6a8fe8..2e57a0f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala
@@ -21,6 +21,7 @@ package org.apache.flink.api.table.runtime
 import org.apache.flink.api.common.functions.{FlatJoinFunction, RichFlatJoinFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.api.table.codegen.Compiler
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.util.Collector
 import org.slf4j.LoggerFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
index 2e942eb..e228e2b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
@@ -21,6 +21,7 @@ package org.apache.flink.api.table.runtime
 import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.api.table.codegen.Compiler
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.util.Collector
 import org.slf4j.LoggerFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
index 944b415..9fd1876 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
@@ -21,6 +21,7 @@ package org.apache.flink.api.table.runtime
 import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.api.table.codegen.Compiler
 import org.apache.flink.configuration.Configuration
 import org.slf4j.LoggerFactory
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
index 34bff15..2a4be46 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.table.runtime.io
 import org.apache.flink.api.common.io.{GenericInputFormat, NonParallelInput}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.api.table.runtime.Compiler
+import org.apache.flink.api.table.codegen.Compiler
 import org.apache.flink.core.io.GenericInputSplit
 import org.slf4j.LoggerFactory
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
index 5bc6e4a..7b2b497 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
@@ -54,7 +54,7 @@ class SetOperatorsTest extends TableTestBase {
               term("join", "b_long", "b_int", "b_string", "a_long"),
               term("joinType", "InnerJoin")
             ),
-            term("select", "a_long", "true AS $f0")
+            term("select", "true AS $f0", "a_long")
           ),
           term("groupBy", "a_long"),
           term("select", "a_long", "MIN($f0) AS $f1")

http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
index 9694687..b8156a2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
@@ -21,10 +21,8 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.table.utils.TableTestBase
 import org.apache.flink.api.table.utils.TableTestUtil._
-import org.junit.{Ignore, Test}
+import org.junit.Test
 
-// TODO enable if FLINK-4825 is solved
-@Ignore
 class ExpressionReductionTest extends TableTestBase {
 
   @Test
@@ -64,7 +62,7 @@ class ExpressionReductionTest extends TableTestBase {
         "true AS EXPR$9",
         "2 AS EXPR$10",
         "true AS EXPR$11",
-        "'TRUEX' AS EXPR$12"
+        "'trueX' AS EXPR$12"
       ),
       term("where", ">(a, 8)")
     )
@@ -109,7 +107,7 @@ class ExpressionReductionTest extends TableTestBase {
         "true AS EXPR$9",
         "2 AS EXPR$10",
         "true AS EXPR$11",
-        "'TRUEX' AS EXPR$12"
+        "'trueX' AS EXPR$12"
       )
     )
 
@@ -164,7 +162,7 @@ class ExpressionReductionTest extends TableTestBase {
         "false AS _c5",
         "true AS _c6",
         "2E0 AS _c7",
-        "'TRUEX' AS _c8"
+        "'trueX' AS _c8"
       ),
       term("where", ">(a, 8)")
     )
@@ -200,7 +198,7 @@ class ExpressionReductionTest extends TableTestBase {
         "false AS _c5",
         "true AS _c6",
         "2E0 AS _c7",
-        "'TRUEX' AS _c8"
+        "'trueX' AS _c8"
       )
     )
 
@@ -262,7 +260,7 @@ class ExpressionReductionTest extends TableTestBase {
         "true AS EXPR$9",
         "2 AS EXPR$10",
         "true AS EXPR$11",
-        "'TRUEX' AS EXPR$12"
+        "'trueX' AS EXPR$12"
       ),
       term("where", ">(a, 8)")
     )
@@ -307,7 +305,7 @@ class ExpressionReductionTest extends TableTestBase {
         "true AS EXPR$9",
         "2 AS EXPR$10",
         "true AS EXPR$11",
-        "'TRUEX' AS EXPR$12"
+        "'trueX' AS EXPR$12"
       )
     )
 
@@ -362,7 +360,7 @@ class ExpressionReductionTest extends TableTestBase {
         "false AS _c5",
         "true AS _c6",
         "2E0 AS _c7",
-        "'TRUEX' AS _c8"
+        "'trueX' AS _c8"
       ),
       term("where", ">(a, 8)")
     )
@@ -398,7 +396,7 @@ class ExpressionReductionTest extends TableTestBase {
         "false AS _c5",
         "true AS _c6",
         "2E0 AS _c7",
-        "'TRUEX' AS _c8"
+        "'trueX' AS _c8"
       )
     )
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala
index 1f5a069..7ad2212 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala
@@ -129,7 +129,7 @@ class ScalarOperatorsTest extends ExpressionTestBase {
     testSqlApi(
       "CASE 1 WHEN 1, 2 THEN '1 or 2' WHEN 2 THEN 'not possible' WHEN 3, 2 THEN '3' " +
       "ELSE 'none of the above' END",
-      "1 or 2")
+      "1 or 2           ")
     testSqlApi("CASE WHEN 'a'='a' THEN 1 END", "1")
     testSqlApi("CASE 2 WHEN 1 THEN 'a' WHEN 2 THEN 'bcd' END", "bcd")
     testSqlApi("CASE f2 WHEN 1 THEN 11 WHEN 2 THEN 4 ELSE NULL END", "11")

http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
index d34e335..84b61da 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
@@ -28,12 +28,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.{DataSet => JDataSet}
 import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
 import org.apache.flink.api.table._
-import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedFunction}
+import org.apache.flink.api.table.codegen.{CodeGenerator, Compiler, GeneratedFunction}
 import org.apache.flink.api.table.expressions.{Expression, ExpressionParser}
 import org.apache.flink.api.table.functions.UserDefinedFunction
 import org.apache.flink.api.table.plan.nodes.dataset.{DataSetCalc, DataSetConvention}
 import org.apache.flink.api.table.plan.rules.FlinkRuleSets
-import org.apache.flink.api.table.runtime.Compiler
 import org.apache.flink.api.table.typeutils.RowTypeInfo
 import org.junit.Assert._
 import org.junit.{After, Before}


[2/3] flink git commit: [FLINK-5184] [table] Fix compareSerialized() of RowComparator.

Posted by fh...@apache.org.
[FLINK-5184] [table] Fix compareSerialized() of RowComparator.

This closes #2894


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0bb68479
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0bb68479
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0bb68479

Branch: refs/heads/master
Commit: 0bb684797dfb3e03dd4f9761a6bf1eb8ce9d1c0d
Parents: db441de
Author: godfreyhe <go...@163.com>
Authored: Tue Nov 29 19:27:58 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Nov 29 13:19:26 2016 +0100

----------------------------------------------------------------------
 .../api/table/typeutils/RowComparator.scala     | 16 +++-
 .../flink/api/table/typeutils/RowTypeInfo.scala |  1 +
 .../RowComparatorWithManyFieldsTest.scala       | 82 ++++++++++++++++++++
 3 files changed, 95 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0bb68479/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowComparator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowComparator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowComparator.scala
index cc97656..8bbe4d8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowComparator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowComparator.scala
@@ -32,6 +32,8 @@ import org.apache.flink.types.KeyFieldOutOfBoundsException
  * Comparator for [[Row]].
  */
 class RowComparator private (
+    /** the number of fields of the Row */
+    val numberOfFields: Int,
     /** key positions describe which fields are keys in what order */
     val keyPositions: Array[Int],
     /** null-aware comparators for the key fields, in the same order as the key fields */
@@ -43,8 +45,8 @@ class RowComparator private (
   extends CompositeTypeComparator[Row] with Serializable {
 
   // null masks for serialized comparison
-  private val nullMask1 = new Array[Boolean](serializers.length)
-  private val nullMask2 = new Array[Boolean](serializers.length)
+  private val nullMask1 = new Array[Boolean](numberOfFields)
+  private val nullMask2 = new Array[Boolean](numberOfFields)
 
   // cache for the deserialized key field objects
   @transient
@@ -63,10 +65,12 @@ class RowComparator private (
    * Intermediate constructor for creating auxiliary fields.
    */
   def this(
+      numberOfFields: Int,
       keyPositions: Array[Int],
       comparators: Array[NullAwareComparator[Any]],
       serializers: Array[TypeSerializer[Any]]) = {
     this(
+      numberOfFields,
       keyPositions,
       comparators,
       serializers,
@@ -76,6 +80,7 @@ class RowComparator private (
   /**
    * General constructor for RowComparator.
    *
+   * @param numberOfFields the number of fields of the Row
    * @param keyPositions key positions describe which fields are keys in what order
    * @param comparators non-null-aware comparators for the key fields, in the same order as
    *   the key fields
@@ -83,11 +88,13 @@ class RowComparator private (
    * @param orders sorting orders for the fields
    */
   def this(
+      numberOfFields: Int,
       keyPositions: Array[Int],
       comparators: Array[TypeComparator[Any]],
       serializers: Array[TypeSerializer[Any]],
       orders: Array[Boolean]) = {
     this(
+      numberOfFields,
       keyPositions,
       makeNullAware(comparators, orders),
       serializers)
@@ -133,8 +140,8 @@ class RowComparator private (
     val len = serializers.length
     val keyLen = keyPositions.length
 
-    readIntoNullMask(len, firstSource, nullMask1)
-    readIntoNullMask(len, secondSource, nullMask2)
+    readIntoNullMask(numberOfFields, firstSource, nullMask1)
+    readIntoNullMask(numberOfFields, secondSource, nullMask2)
 
     // deserialize
     var i = 0
@@ -217,6 +224,7 @@ class RowComparator private (
     val serializersCopy = serializers.map(_.duplicate())
 
     new RowComparator(
+      numberOfFields,
       keyPositions,
       comparatorsCopy,
       serializersCopy,

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb68479/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala
index 489edca..711bb49 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala
@@ -96,6 +96,7 @@ class RowTypeInfo(fieldTypes: Seq[TypeInformation[_]])
       val maxIndex = logicalKeyFields.max
 
       new RowComparator(
+        getArity,
         logicalKeyFields.toArray,
         fieldComparators.toArray.asInstanceOf[Array[TypeComparator[Any]]],
         types.take(maxIndex + 1).map(_.createSerializer(config).asInstanceOf[TypeSerializer[Any]]),

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb68479/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorWithManyFieldsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorWithManyFieldsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorWithManyFieldsTest.scala
new file mode 100644
index 0000000..33715c1
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorWithManyFieldsTest.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.typeutils
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeutils.{ComparatorTestBase, TypeComparator, TypeSerializer}
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+import org.junit.Assert._
+
+/**
+  * Tests [[RowComparator]] for wide rows.
+  */
+class RowComparatorWithManyFieldsTest extends ComparatorTestBase[Row] {
+  val numberOfFields = 10
+  val fieldTypes = new Array[TypeInformation[_]](numberOfFields)
+  for (i <- 0 until numberOfFields) {
+    fieldTypes(i) = BasicTypeInfo.STRING_TYPE_INFO
+  }
+  val typeInfo = new RowTypeInfo(fieldTypes)
+
+  val data: Array[Row] = Array(
+    createRow(Array(null, "b0", "c0", "d0", "e0", "f0", "g0", "h0", "i0", "j0")),
+    createRow(Array("a1", "b1", "c1", "d1", "e1", "f1", "g1", "h1", "i1", "j1")),
+    createRow(Array("a2", "b2", "c2", "d2", "e2", "f2", "g2", "h2", "i2", "j2")),
+    createRow(Array("a3", "b3", "c3", "d3", "e3", "f3", "g3", "h3", "i3", "j3"))
+  )
+
+  override protected def deepEquals(message: String, should: Row, is: Row): Unit = {
+    val arity = should.productArity
+    assertEquals(message, arity, is.productArity)
+    var index = 0
+    while (index < arity) {
+      val copiedValue: Any = should.productElement(index)
+      val element: Any = is.productElement(index)
+      assertEquals(message, element, copiedValue)
+      index += 1
+    }
+  }
+
+  override protected def createComparator(ascending: Boolean): TypeComparator[Row] = {
+    typeInfo.createComparator(
+      Array(0),
+      Array(ascending),
+      0,
+      new ExecutionConfig())
+  }
+
+  override protected def createSerializer(): TypeSerializer[Row] = {
+    typeInfo.createSerializer(new ExecutionConfig())
+  }
+
+  override protected def getSortedTestData: Array[Row] = {
+    data
+  }
+
+  override protected def supportsNullKeys: Boolean = true
+
+  private def createRow(values: Array[_]): Row = {
+    Preconditions.checkArgument(values.length == numberOfFields)
+    val r: Row = new Row(numberOfFields)
+    values.zipWithIndex.foreach { case (e, i) => r.setField(i, e) }
+    r
+  }
+}


[3/3] flink git commit: [FLINK-4832] [table] Fix global aggregation of empty tables (Count/Sum = 0).

Posted by fh...@apache.org.
[FLINK-4832] [table] Fix global aggregation of empty tables (Count/Sum = 0).

- Fix injects a union with a null record before the global aggregation.

This closes #2840


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ecfb5b5f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ecfb5b5f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ecfb5b5f

Branch: refs/heads/master
Commit: ecfb5b5f6fd6bf1555c7240d77dd9aca982f4416
Parents: 0bb6847
Author: Anton Mushin <an...@epam.com>
Authored: Mon Nov 21 15:49:41 2016 +0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Nov 29 13:30:51 2016 +0100

----------------------------------------------------------------------
 .../api/table/plan/rules/FlinkRuleSets.scala    |   1 +
 .../rules/dataSet/DataSetAggregateRule.scala    |   6 +
 .../DataSetAggregateWithNullValuesRule.scala    |  96 +++++++
 .../scala/batch/sql/AggregationsITCase.scala    |  39 +++
 .../flink/api/table/AggregationTest.scala       | 261 +++++++++++++++++++
 .../flink/api/table/utils/TableTestBase.scala   |   9 +-
 6 files changed, 410 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ecfb5b5f/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index 5653083..26c025e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -98,6 +98,7 @@ object FlinkRuleSets {
 
     // translate to Flink DataSet nodes
     DataSetAggregateRule.INSTANCE,
+    DataSetAggregateWithNullValuesRule.INSTANCE,
     DataSetCalcRule.INSTANCE,
     DataSetJoinRule.INSTANCE,
     DataSetScanRule.INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/ecfb5b5f/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
index 72ed27e..0311c48 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
@@ -37,6 +37,12 @@ class DataSetAggregateRule
   override def matches(call: RelOptRuleCall): Boolean = {
     val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]
 
+    //for non grouped agg sets should attach null row to source data
+    //need apply DataSetAggregateWithNullValuesRule
+    if (agg.getGroupSet.isEmpty) {
+      return false
+    }
+
     // check if we have distinct aggregates
     val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
     if (distinctAggs) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ecfb5b5f/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
new file mode 100644
index 0000000..54cb8d1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan._
+import scala.collection.JavaConversions._
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{LogicalValues, LogicalUnion, LogicalAggregate}
+import org.apache.calcite.rex.RexLiteral
+import org.apache.flink.api.table._
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention}
+
+/**
+  * Rule for insert [[Row]] with null records into a [[DataSetAggregate]]
+  * Rule apply for non grouped aggregate query
+  */
+class DataSetAggregateWithNullValuesRule
+  extends ConverterRule(
+    classOf[LogicalAggregate],
+    Convention.NONE,
+    DataSetConvention.INSTANCE,
+    "DataSetAggregateWithNullValuesRule")
+{
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]
+
+    //for grouped agg sets shouldn't attach of null row
+    //need apply other rules. e.g. [[DataSetAggregateRule]]
+    if (!agg.getGroupSet.isEmpty) {
+      return false
+    }
+
+    // check if we have distinct aggregates
+    val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
+    if (distinctAggs) {
+      throw TableException("DISTINCT aggregates are currently not supported.")
+    }
+
+    // check if we have grouping sets
+    val groupSets = agg.getGroupSets.size() == 0 || agg.getGroupSets.get(0) != agg.getGroupSet
+    if (groupSets || agg.indicator) {
+      throw TableException("GROUPING SETS are currently not supported.")
+    }
+    !distinctAggs && !groupSets && !agg.indicator
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+    val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+    val cluster: RelOptCluster = rel.getCluster
+
+    val fieldTypes = agg.getInput.getRowType.getFieldList.map(_.getType)
+    val nullLiterals: ImmutableList[ImmutableList[RexLiteral]] =
+      ImmutableList.of(ImmutableList.copyOf[RexLiteral](
+        for (fieldType <- fieldTypes)
+          yield {
+            cluster.getRexBuilder.
+              makeLiteral(null, fieldType, false).asInstanceOf[RexLiteral]
+          }))
+
+    val logicalValues = LogicalValues.create(cluster, agg.getInput.getRowType, nullLiterals)
+    val logicalUnion = LogicalUnion.create(List(logicalValues, agg.getInput), true)
+
+    new DataSetAggregate(
+      cluster,
+      traitSet,
+      RelOptRule.convert(logicalUnion, DataSetConvention.INSTANCE),
+      agg.getNamedAggCalls,
+      rel.getRowType,
+      agg.getInput.getRowType,
+      agg.getGroupSet.toArray
+    )
+  }
+}
+
+object DataSetAggregateWithNullValuesRule {
+  val INSTANCE: RelOptRule = new DataSetAggregateWithNullValuesRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecfb5b5f/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala
index 2dce751..35bb7dc 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala
@@ -258,4 +258,43 @@ class AggregationsITCase(
     // must fail. grouping sets are not supported
     tEnv.sql(sqlQuery).toDataSet[Row]
   }
+
+  @Test
+  def testAggregateEmptyDataSets(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT avg(a), sum(a), count(b) " +
+      "FROM MyTable where a = 4 group by a"
+
+    val sqlQuery2 = "SELECT avg(a), sum(a), count(b) " +
+      "FROM MyTable where a = 4"
+
+    val sqlQuery3 = "SELECT avg(a), sum(a), count(b) " +
+      "FROM MyTable"
+
+    val ds = env.fromElements(
+      (1: Byte, 1: Short),
+      (2: Byte, 2: Short))
+      .toTable(tEnv, 'a, 'b)
+
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+    val result2 = tEnv.sql(sqlQuery2)
+    val result3 = tEnv.sql(sqlQuery3)
+
+    val results = result.toDataSet[Row].collect()
+    val expected = Seq.empty
+    val results2 =  result2.toDataSet[Row].collect()
+    val expected2 = "null,null,0"
+    val results3 = result3.toDataSet[Row].collect()
+    val expected3 = "1,3,2"
+
+    assert(results.equals(expected),
+      "Empty result is expected for grouped set, but actual: " + results)
+    TestBaseUtils.compareResultAsText(results2.asJava, expected2)
+    TestBaseUtils.compareResultAsText(results3.asJava, expected3)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ecfb5b5f/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/AggregationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/AggregationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/AggregationTest.scala
new file mode 100644
index 0000000..6c9d2e8
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/AggregationTest.scala
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.utils.TableTestBase
+import org.apache.flink.api.table.utils.TableTestUtil._
+import org.junit.Test
+
+/**
+  * Test for testing aggregate plans.
+  */
+class AggregationTest extends TableTestBase {
+
+  @Test
+  def testAggregateQueryBatchSQL(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable"
+
+    val setValues = unaryNode(
+      "DataSetValues",
+      batchTableNode(0),
+      tuples(List(null,null,null)),
+      term("values","a","b","c")
+    )
+    val union = unaryNode(
+      "DataSetUnion",
+      setValues,
+      term("union","a","b","c")
+    )
+
+    val aggregate = unaryNode(
+      "DataSetAggregate",
+      union,
+      term("select",
+        "AVG(a) AS EXPR$0",
+        "SUM(b) AS EXPR$1",
+        "COUNT(c) AS EXPR$2")
+    )
+    util.verifySql(sqlQuery, aggregate)
+  }
+
+  @Test
+  def testAggregateWithFilterQueryBatchSQL(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable WHERE a = 1"
+
+    val calcNode = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", "=(a, 1)")
+    )
+
+    val setValues =  unaryNode(
+        "DataSetValues",
+        calcNode,
+        tuples(List(null,null,null)),
+        term("values","a","b","c")
+    )
+
+    val union = unaryNode(
+      "DataSetUnion",
+      setValues,
+      term("union","a","b","c")
+    )
+
+    val aggregate = unaryNode(
+      "DataSetAggregate",
+      union,
+      term("select",
+        "AVG(a) AS EXPR$0",
+        "SUM(b) AS EXPR$1",
+        "COUNT(c) AS EXPR$2")
+    )
+    util.verifySql(sqlQuery, aggregate)
+  }
+
+  @Test
+  def testAggregateGroupQueryBatchSQL(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable GROUP BY a"
+
+    val aggregate = unaryNode(
+        "DataSetAggregate",
+        batchTableNode(0),
+        term("groupBy", "a"),
+        term("select",
+          "a",
+          "AVG(a) AS EXPR$0",
+          "SUM(b) AS EXPR$1",
+          "COUNT(c) AS EXPR$2")
+    )
+    val expected = unaryNode(
+        "DataSetCalc",
+        aggregate,
+        term("select",
+          "EXPR$0",
+          "EXPR$1",
+          "EXPR$2")
+    )
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testAggregateGroupWithFilterQueryBatchSQL(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable WHERE a = 1 GROUP BY a"
+
+    val calcNode = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select","a", "b", "c") ,
+      term("where","=(a, 1)")
+    )
+
+    val aggregate = unaryNode(
+        "DataSetAggregate",
+        calcNode,
+        term("groupBy", "a"),
+        term("select",
+          "a",
+          "AVG(a) AS EXPR$0",
+          "SUM(b) AS EXPR$1",
+          "COUNT(c) AS EXPR$2")
+    )
+    val expected = unaryNode(
+        "DataSetCalc",
+        aggregate,
+        term("select",
+          "EXPR$0",
+          "EXPR$1",
+          "EXPR$2")
+    )
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testAggregateGroupWithFilterTableApi(): Unit = {
+
+    val util = batchTestUtil()
+    val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+    val resultTable = sourceTable.groupBy('a)
+      .select('a, 'a.avg, 'b.sum, 'c.count)
+      .where('a === 1)
+
+    val calcNode = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", "=(a, 1)")
+    )
+
+    val expected = unaryNode(
+      "DataSetAggregate",
+      calcNode,
+      term("groupBy", "a"),
+      term("select",
+        "a",
+        "AVG(a) AS TMP_0",
+        "SUM(b) AS TMP_1",
+        "COUNT(c) AS TMP_2")
+    )
+
+    util.verifyTable(resultTable,expected)
+  }
+
+  @Test
+  def testAggregateTableApi(): Unit = {
+    val util = batchTestUtil()
+    val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+    val resultTable = sourceTable.select('a.avg,'b.sum,'c.count)
+
+    val setValues = unaryNode(
+      "DataSetValues",
+      batchTableNode(0),
+      tuples(List(null,null,null)),
+      term("values","a","b","c")
+    )
+    val union = unaryNode(
+      "DataSetUnion",
+      setValues,
+      term("union","a","b","c")
+    )
+
+    val expected = unaryNode(
+      "DataSetAggregate",
+      union,
+      term("select",
+        "AVG(a) AS TMP_0",
+        "SUM(b) AS TMP_1",
+        "COUNT(c) AS TMP_2")
+    )
+    util.verifyTable(resultTable, expected)
+  }
+
+  @Test
+  def testAggregateWithFilterTableApi(): Unit = {
+    val util = batchTestUtil()
+    val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+    val resultTable = sourceTable.select('a,'b,'c).where('a === 1)
+      .select('a.avg,'b.sum,'c.count)
+
+    val calcNode = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", "=(a, 1)")
+    )
+
+    val setValues =  unaryNode(
+      "DataSetValues",
+      calcNode,
+      tuples(List(null,null,null)),
+      term("values","a","b","c")
+    )
+
+    val union = unaryNode(
+      "DataSetUnion",
+      setValues,
+      term("union","a","b","c")
+    )
+
+    val expected = unaryNode(
+      "DataSetAggregate",
+      union,
+      term("select",
+        "AVG(a) AS TMP_0",
+        "SUM(b) AS TMP_1",
+        "COUNT(c) AS TMP_2")
+    )
+
+    util.verifyTable(resultTable, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecfb5b5f/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
index 2ea15a0..539bb61 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
@@ -70,20 +70,25 @@ object TableTestUtil {
   def unaryNode(node: String, input: String, term: String*): String = {
     s"""$node(${term.mkString(", ")})
        |$input
-       |""".stripMargin
+       |""".stripMargin.stripLineEnd
   }
 
   def binaryNode(node: String, left: String, right: String, term: String*): String = {
     s"""$node(${term.mkString(", ")})
        |$left
        |$right
-       |""".stripMargin
+       |""".stripMargin.stripLineEnd
   }
 
   def term(term: AnyRef, value: AnyRef*): String = {
     s"$term=[${value.mkString(", ")}]"
   }
 
+  def tuples(value:List[AnyRef]*): String={
+    val listValues = value.map( listValue => s"{ ${listValue.mkString(", ")} }")
+    term("tuples","[" + listValues.mkString(", ") + "]")
+  }
+
   def batchTableNode(idx: Int): String = {
     s"DataSetScan(table=[[_DataSetTable_$idx]])"
   }