You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/10/04 12:36:38 UTC

[1/2] flink git commit: [FLINK-4068] [table] Move constant computations out of code-generated

Repository: flink
Updated Branches:
  refs/heads/master 171d10930 -> f00e1e7c5


[FLINK-4068] [table] Move constant computations out of code-generated

This closes #2560.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7113394
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7113394
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7113394

Branch: refs/heads/master
Commit: a71133945e66ec471b4cb07da0693d545c40923c
Parents: 171d109
Author: Jark Wu <wu...@alibaba-inc.com>
Authored: Wed Sep 28 10:31:59 2016 +0800
Committer: twalthr <tw...@apache.org>
Committed: Tue Oct 4 12:02:36 2016 +0200

----------------------------------------------------------------------
 .../flink/api/table/BatchTableEnvironment.scala |  48 +++++----
 .../flink/api/table/FlinkRelBuilder.scala       |   5 +-
 .../api/table/StreamTableEnvironment.scala      |  40 ++++---
 .../flink/api/table/TableEnvironment.scala      |   5 +-
 .../api/table/plan/logical/operators.scala      |   3 +-
 .../flink/api/table/plan/nodes/FlinkCalc.scala  |   2 +-
 .../flink/api/scala/batch/ExplainTest.scala     |   4 +-
 .../api/table/BatchTableEnvironmentTest.scala   | 102 ++++++++++++++++++
 .../api/table/StreamTableEnvironmentTest.scala  | 106 +++++++++++++++++++
 .../src/test/scala/resources/testJoin0.out      |   2 +-
 .../src/test/scala/resources/testJoin1.out      |   2 +-
 11 files changed, 277 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
index ad3ff7a..10c2450 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
 import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.RelNode
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.Programs
 
@@ -228,21 +229,12 @@ abstract class BatchTableEnvironment(
   }
 
   /**
-    * Translates a [[Table]] into a [[DataSet]].
+    * Generates the optimized [[RelNode]] tree from the original relational node tree.
     *
-    * The transformation involves optimizing the relational expression tree as defined by
-    * Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators.
-    *
-    * @param table The root node of the relational expression tree.
-    * @param tpe The [[TypeInformation]] of the resulting [[DataSet]].
-    * @tparam A The type of the resulting [[DataSet]].
-    * @return The [[DataSet]] that corresponds to the translated [[Table]].
+    * @param relNode The original [[RelNode]] tree
+    * @return The optimized [[RelNode]] tree
     */
-  protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = {
-
-    validateType(tpe)
-
-    val relNode = table.getRelNode
+  private[flink] def optimize(relNode: RelNode): RelNode = {
 
     // decorrelate
     val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
@@ -253,8 +245,7 @@ abstract class BatchTableEnvironment(
 
     val dataSetPlan = try {
       optProgram.run(getPlanner, decorPlan, flinkOutputProps)
-    }
-    catch {
+    } catch {
       case e: CannotPlanException =>
         throw new TableException(
           s"Cannot generate a valid execution plan for the given query: \n\n" +
@@ -263,13 +254,32 @@ abstract class BatchTableEnvironment(
             s"Please check the documentation for the set of currently supported SQL features.")
       case t: TableException =>
         throw new TableException(
-        s"Cannot generate a valid execution plan for the given query: \n\n" +
-          s"${RelOptUtil.toString(relNode)}\n" +
-          s"${t.msg}\n" +
-          s"Please check the documentation for the set of currently supported SQL features.")
+          s"Cannot generate a valid execution plan for the given query: \n\n" +
+            s"${RelOptUtil.toString(relNode)}\n" +
+            s"${t.msg}\n" +
+            s"Please check the documentation for the set of currently supported SQL features.")
       case a: AssertionError =>
         throw a.getCause
     }
+    dataSetPlan
+  }
+
+  /**
+    * Translates a [[Table]] into a [[DataSet]].
+    *
+    * The transformation involves optimizing the relational expression tree as defined by
+    * Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators.
+    *
+    * @param table The root node of the relational expression tree.
+    * @param tpe The [[TypeInformation]] of the resulting [[DataSet]].
+    * @tparam A The type of the resulting [[DataSet]].
+    * @return The [[DataSet]] that corresponds to the translated [[Table]].
+    */
+  protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = {
+
+    validateType(tpe)
+
+    val dataSetPlan = optimize(table.getRelNode)
 
     dataSetPlan match {
       case node: DataSetRel =>

http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
index 3827f05..34ed4ce 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
@@ -21,8 +21,8 @@ package org.apache.flink.api.table
 import org.apache.calcite.jdbc.CalciteSchema
 import org.apache.calcite.plan.{Context, RelOptCluster, RelOptPlanner, RelOptSchema}
 import org.apache.calcite.prepare.CalciteCatalogReader
-import org.apache.calcite.rex.RexBuilder
-import org.apache.calcite.schema.SchemaPlus
+import org.apache.calcite.rex.{RexExecutorImpl, RexBuilder}
+import org.apache.calcite.schema.{Schemas, SchemaPlus}
 import org.apache.calcite.tools.Frameworks.PlannerAction
 import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RelBuilder}
 
@@ -66,6 +66,7 @@ object FlinkRelBuilder {
       }
     })
     val planner = clusters(0).getPlanner
+    planner.setExecutor(config.getExecutor)
     val defaultRelOptSchema = relOptSchemas(0).asInstanceOf[CalciteCatalogReader]
 
     // create Flink type factory

http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
index e3e5751..44d90ac 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
@@ -22,10 +22,12 @@ import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
 import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.RelNode
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.Programs
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
 import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel}
@@ -228,22 +230,12 @@ abstract class StreamTableEnvironment(
   }
 
   /**
-    * Translates a [[Table]] into a [[DataStream]].
-    *
-    * The transformation involves optimizing the relational expression tree as defined by
-    * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators.
+    * Generates the optimized [[RelNode]] tree from the original relational node tree.
     *
-    * @param table The root node of the relational expression tree.
-    * @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
-    * @tparam A The type of the resulting [[DataStream]].
-    * @return The [[DataStream]] that corresponds to the translated [[Table]].
+    * @param relNode The root node of the relational expression tree.
+    * @return The optimized [[RelNode]] tree
     */
-  protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = {
-
-    validateType(tpe)
-
-    val relNode = table.getRelNode
-
+  private[flink] def optimize(relNode: RelNode): RelNode = {
     // decorrelate
     val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
 
@@ -262,6 +254,26 @@ abstract class StreamTableEnvironment(
             s"This exception indicates that the query uses an unsupported SQL feature.\n" +
             s"Please check the documentation for the set of currently supported SQL features.")
     }
+    dataStreamPlan
+  }
+
+
+  /**
+    * Translates a [[Table]] into a [[DataStream]].
+    *
+    * The transformation involves optimizing the relational expression tree as defined by
+    * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators.
+    *
+    * @param table The root node of the relational expression tree.
+    * @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
+    * @tparam A The type of the resulting [[DataStream]].
+    * @return The [[DataStream]] that corresponds to the translated [[Table]].
+    */
+  protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = {
+
+    validateType(tpe)
+
+   val dataStreamPlan = optimize(table.getRelNode)
 
     dataStreamPlan match {
       case node: DataStreamRel =>

http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
index f56df0c..02204b1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
@@ -24,7 +24,8 @@ import java.util.concurrent.atomic.AtomicInteger
 import org.apache.calcite.config.Lex
 import org.apache.calcite.plan.RelOptPlanner
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.schema.SchemaPlus
+import org.apache.calcite.rex.RexExecutorImpl
+import org.apache.calcite.schema.{Schemas, SchemaPlus}
 import org.apache.calcite.schema.impl.AbstractTable
 import org.apache.calcite.sql.SqlOperatorTable
 import org.apache.calcite.sql.parser.SqlParser
@@ -77,6 +78,8 @@ abstract class TableEnvironment(val config: TableConfig) {
     .costFactory(new DataSetCostFactory)
     .typeSystem(new FlinkTypeSystem)
     .operatorTable(sqlOperatorTable)
+    // set the executor to evaluate constant expressions
+    .executor(new RexExecutorImpl(Schemas.createDataContext(null)))
     .build
 
   // the builder for Calcite RelNodes, Calcite's representation of a relational expression tree.

http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
index ccdab85..066e9d6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
@@ -87,7 +87,8 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend
       // Calcite's RelBuilder does not translate identity projects even if they rename fields.
       //   Add a projection ourselves (will be automatically removed by translation rules).
       val project = LogicalProject.create(relBuilder.peek(),
-        projectList.map(_.toRexNode(relBuilder)).asJava,
+        // avoid AS call
+        projectList.map(_.asInstanceOf[Alias].child.toRexNode(relBuilder)).asJava,
         projectList.map(_.name).asJava)
       relBuilder.build()  // pop previous relNode
       relBuilder.push(project)

http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala
index aa5492f..d5f8010 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala
@@ -134,7 +134,7 @@ trait FlinkCalc {
     val proj = calcProgram.getProjectList.asScala.toList
     val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
     val localExprs = calcProgram.getExprList.asScala.toList
-    val outFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
+    val outFields = calcProgram.getOutputRowType.getFieldNames.asScala.toList
 
     proj
       .map(expression(_, inFields, Some(localExprs)))

http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ExplainTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ExplainTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ExplainTest.scala
index ab70ec5..9d00dda 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ExplainTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ExplainTest.scala
@@ -72,7 +72,7 @@ class ExplainTest
     val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
     val source = scala.io.Source.fromFile(testFilePath +
       "../../src/test/scala/resources/testJoin0.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(result, source)
+    assertEquals(source, result)
   }
 
   @Test
@@ -87,7 +87,7 @@ class ExplainTest
     val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n")
     val source = scala.io.Source.fromFile(testFilePath +
       "../../src/test/scala/resources/testJoin1.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(result, source)
+    assertEquals(source, result)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala
new file mode 100644
index 0000000..0344dee
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.junit.Assert._
+import org.junit.Test
+
+
+class BatchTableEnvironmentTest {
+
+  @Test
+  def testReduceExpressionForSQL(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val sqlQuery = "SELECT " +
+      "(3+4)+a, " +
+      "b+(1+2), " +
+      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+      "TRIM(BOTH ' STRING '),  " +
+      "'test' || 'string', " +
+      "NULLIF(1, 1), " +
+      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
+      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
+      "1 IS NULL, " +
+      "'TEST' LIKE '%EST', " +
+      "FLOOR(2.5), " +
+      "'TEST' IN ('west', 'TEST', 'rest') " +
+      "FROM MyTable WHERE a>(1+7)"
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", ds)
+
+    val table = tEnv.sql(sqlQuery)
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains(">(_1, 8)"))
+    assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0"))
+    assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1"))
+    assertTrue(optimizedString.contains("'b' AS EXPR$2"))
+    assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
+    assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
+    assertTrue(optimizedString.contains("null AS EXPR$5"))
+    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
+    assertTrue(optimizedString.contains("19 AS EXPR$7"))
+    assertTrue(optimizedString.contains("false AS EXPR$8"))
+    assertTrue(optimizedString.contains("true AS EXPR$9"))
+    assertTrue(optimizedString.contains("2 AS EXPR$10"))
+    assertTrue(optimizedString.contains("true AS EXPR$11"))
+  }
+
+  @Test
+  def testReduceExpressionForTableAPI(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+
+    val table = ds
+      .where('a > (1+7))
+      .select((3+4).toExpr + 6,
+              (11 === 1) ? ("a", "b"),
+              " STRING ".trim,
+              "test" + "string",
+              "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second,
+              1.isNull,
+              "TEST".like("%EST"),
+              2.5.toExpr.floor())
+
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains(">(_1, 8)"))
+    assertTrue(optimizedString.contains("13 AS _c0"))
+    assertTrue(optimizedString.contains("'b' AS _c1"))
+    assertTrue(optimizedString.contains("'STRING' AS _c2"))
+    assertTrue(optimizedString.contains("'teststring' AS _c3"))
+    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
+    assertTrue(optimizedString.contains("false AS _c5"))
+    assertTrue(optimizedString.contains("true AS _c6"))
+    assertTrue(optimizedString.contains("2E0 AS _c7"))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala
new file mode 100644
index 0000000..52bf9ac
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+import org.apache.flink.api.scala.stream.utils.StreamTestData
+import org.apache.flink.api.scala.table._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Test
+import org.junit.Assert._
+
+
+
+class StreamTableEnvironmentTest extends StreamingMultipleProgramsTestBase{
+
+  @Test
+  def testReduceExpression(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val sqlQuery = "SELECT STREAM " +
+      "(3+4)+a, " +
+      "b+(1+2), " +
+      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+      "TRIM(BOTH ' STRING '),  " +
+      "'test' || 'string', " +
+      "NULLIF(1, 1), " +
+      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
+      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
+      "1 IS NULL, " +
+      "'TEST' LIKE '%EST', " +
+      "FLOOR(2.5), " +
+      "'TEST' IN ('west', 'TEST', 'rest') " +
+      "FROM MyTable WHERE a>(1+7)"
+
+    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", t)
+
+    val table = tEnv.sql(sqlQuery)
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains(">(_1, 8)"))
+    assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0"))
+    assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1"))
+    assertTrue(optimizedString.contains("'b' AS EXPR$2"))
+    assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
+    assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
+    assertTrue(optimizedString.contains("null AS EXPR$5"))
+    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
+    assertTrue(optimizedString.contains("19 AS EXPR$7"))
+    assertTrue(optimizedString.contains("false AS EXPR$8"))
+    assertTrue(optimizedString.contains("true AS EXPR$9"))
+    assertTrue(optimizedString.contains("2 AS EXPR$10"))
+    assertTrue(optimizedString.contains("true AS EXPR$11"))
+  }
+
+  @Test
+  def testReduceExpressionForTableAPI(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+
+    val table = t
+      .where('a > (1+7))
+      .select((3+4).toExpr + 6,
+              (11 === 1) ? ("a", "b"),
+              " STRING ".trim,
+              "test" + "string",
+              "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second,
+              1.isNull,
+              "TEST".like("%EST"),
+              2.5.toExpr.floor())
+
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains(">(_1, 8)"))
+    assertTrue(optimizedString.contains("13 AS _c0"))
+    assertTrue(optimizedString.contains("'b' AS _c1"))
+    assertTrue(optimizedString.contains("'STRING' AS _c2"))
+    assertTrue(optimizedString.contains("'teststring' AS _c3"))
+    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
+    assertTrue(optimizedString.contains("false AS _c5"))
+    assertTrue(optimizedString.contains("true AS _c6"))
+    assertTrue(optimizedString.contains("2E0 AS _c7"))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
index f71ea9f..11961ef 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
@@ -36,7 +36,7 @@ Stage 6 : Data Source
 			Partitioning : RANDOM_PARTITIONED
 
 			Stage 1 : FlatMap
-				content : select: (a, c AS b)
+				content : select: (a, c)
 				ship_strategy : Forward
 				exchange_mode : PIPELINED
 				driver_strategy : FlatMap

http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
index f117cd9..c6e8b34 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
@@ -121,7 +121,7 @@ Stage 6 : Data Source
 			Filter Factor : (none)
 
 			Stage 1 : FlatMap
-				content : select: (a, c AS b)
+				content : select: (a, c)
 				ship_strategy : Forward
 				exchange_mode : PIPELINED
 				driver_strategy : FlatMap


[2/2] flink git commit: [FLINK-4068] [table] Reduce expression also for filter/project

Posted by tw...@apache.org.
[FLINK-4068] [table] Reduce expression also for filter/project


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f00e1e7c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f00e1e7c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f00e1e7c

Branch: refs/heads/master
Commit: f00e1e7c5578caf52eaffc5cbdd102589c13f52d
Parents: a711339
Author: twalthr <tw...@apache.org>
Authored: Fri Sep 30 17:30:44 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Tue Oct 4 14:30:48 2016 +0200

----------------------------------------------------------------------
 .../flink/api/table/FlinkTypeFactory.scala      |  12 +-
 .../flink/api/table/FlinkTypeSystem.scala       |   9 +
 .../api/table/plan/rules/FlinkRuleSets.scala    |   7 +
 .../api/table/BatchTableEnvironmentTest.scala   | 102 -----
 .../api/table/ExpressionReductionTest.scala     | 400 +++++++++++++++++++
 .../api/table/StreamTableEnvironmentTest.scala  | 106 -----
 6 files changed, 427 insertions(+), 209 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f00e1e7c/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
index 581ecde..77eb907 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
@@ -22,7 +22,7 @@ import org.apache.calcite.avatica.util.TimeUnit
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
 import org.apache.calcite.sql.SqlIntervalQualifier
-import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName, SqlTypeUtil}
 import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
@@ -68,6 +68,16 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
     }
   }
 
+  override def createSqlType(typeName: SqlTypeName, precision: Int): RelDataType = {
+    // it might happen that inferred VARCHAR types overflow as we set them to Int.MaxValue
+    // always set those to default value
+    if (typeName == VARCHAR && precision < 0) {
+      createSqlType(typeName, getTypeSystem.getDefaultPrecision(typeName))
+    } else {
+      super.createSqlType(typeName, precision)
+    }
+  }
+
   private def createAdvancedType(typeInfo: TypeInformation[_]): RelDataType = typeInfo match {
     // TODO add specific RelDataTypes
     // for PrimitiveArrayTypeInfo, ObjectArrayTypeInfo, CompositeType

http://git-wip-us.apache.org/repos/asf/flink/blob/f00e1e7c/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala
index df6022a..2df043f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.api.table
 
 import org.apache.calcite.rel.`type`.RelDataTypeSystemImpl
+import org.apache.calcite.sql.`type`.SqlTypeName
 
 /**
   * Custom type system for Flink.
@@ -33,4 +34,12 @@ class FlinkTypeSystem extends RelDataTypeSystemImpl {
   // half should be enough for all use cases
   override def getMaxNumericPrecision: Int = Int.MaxValue / 2
 
+  override def getDefaultPrecision(typeName: SqlTypeName): Int = typeName match {
+    // by default all VARCHARs can have the Java default length
+    case SqlTypeName.VARCHAR =>
+      Int.MaxValue
+    case _ =>
+      super.getDefaultPrecision(typeName)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f00e1e7c/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index ddfa578..7d915e6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -75,6 +75,8 @@ object FlinkRuleSets {
     SortRemoveRule.INSTANCE,
 
     // simplify expressions rules
+    ReduceExpressionsRule.FILTER_INSTANCE,
+    ReduceExpressionsRule.PROJECT_INSTANCE,
     ReduceExpressionsRule.CALC_INSTANCE,
     ReduceExpressionsRule.JOIN_INSTANCE,
 
@@ -113,6 +115,9 @@ object FlinkRuleSets {
   val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(
 
       RemoveDeltaRule.INSTANCE,
+
+      // convert a logical table scan to a relational expression
+      TableScanRule.INSTANCE,
       EnumerableToLogicalTableScan.INSTANCE,
 
       // calc rules
@@ -133,6 +138,8 @@ object FlinkRuleSets {
       ProjectRemoveRule.INSTANCE,
 
       // simplify expressions rules
+      ReduceExpressionsRule.FILTER_INSTANCE,
+      ReduceExpressionsRule.PROJECT_INSTANCE,
       ReduceExpressionsRule.CALC_INSTANCE,
 
       // merge and push unions rules

http://git-wip-us.apache.org/repos/asf/flink/blob/f00e1e7c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala
deleted file mode 100644
index 0344dee..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.junit.Assert._
-import org.junit.Test
-
-
-class BatchTableEnvironmentTest {
-
-  @Test
-  def testReduceExpressionForSQL(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val sqlQuery = "SELECT " +
-      "(3+4)+a, " +
-      "b+(1+2), " +
-      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
-      "TRIM(BOTH ' STRING '),  " +
-      "'test' || 'string', " +
-      "NULLIF(1, 1), " +
-      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
-      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
-      "1 IS NULL, " +
-      "'TEST' LIKE '%EST', " +
-      "FLOOR(2.5), " +
-      "'TEST' IN ('west', 'TEST', 'rest') " +
-      "FROM MyTable WHERE a>(1+7)"
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", ds)
-
-    val table = tEnv.sql(sqlQuery)
-
-    val optimized = tEnv.optimize(table.getRelNode)
-    val optimizedString = optimized.toString
-    assertTrue(optimizedString.contains(">(_1, 8)"))
-    assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0"))
-    assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1"))
-    assertTrue(optimizedString.contains("'b' AS EXPR$2"))
-    assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
-    assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
-    assertTrue(optimizedString.contains("null AS EXPR$5"))
-    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
-    assertTrue(optimizedString.contains("19 AS EXPR$7"))
-    assertTrue(optimizedString.contains("false AS EXPR$8"))
-    assertTrue(optimizedString.contains("true AS EXPR$9"))
-    assertTrue(optimizedString.contains("2 AS EXPR$10"))
-    assertTrue(optimizedString.contains("true AS EXPR$11"))
-  }
-
-  @Test
-  def testReduceExpressionForTableAPI(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
-
-    val table = ds
-      .where('a > (1+7))
-      .select((3+4).toExpr + 6,
-              (11 === 1) ? ("a", "b"),
-              " STRING ".trim,
-              "test" + "string",
-              "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second,
-              1.isNull,
-              "TEST".like("%EST"),
-              2.5.toExpr.floor())
-
-
-    val optimized = tEnv.optimize(table.getRelNode)
-    val optimizedString = optimized.toString
-    assertTrue(optimizedString.contains(">(_1, 8)"))
-    assertTrue(optimizedString.contains("13 AS _c0"))
-    assertTrue(optimizedString.contains("'b' AS _c1"))
-    assertTrue(optimizedString.contains("'STRING' AS _c2"))
-    assertTrue(optimizedString.contains("'teststring' AS _c3"))
-    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
-    assertTrue(optimizedString.contains("false AS _c5"))
-    assertTrue(optimizedString.contains("true AS _c6"))
-    assertTrue(optimizedString.contains("2E0 AS _c7"))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f00e1e7c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
new file mode 100644
index 0000000..4830b75
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+import org.apache.flink.api.java.{DataSet => JDataSet}
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, createTypeInformation}
+import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.junit.Assert._
+import org.junit.Test
+import org.mockito.Mockito.{mock, when}
+
+class ExpressionReductionTest {
+
+  private def mockBatchTableEnvironment(): BatchTableEnvironment = {
+    val env = mock(classOf[ExecutionEnvironment])
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = mock(classOf[DataSet[(Int, Long, String)]])
+    val jDs = mock(classOf[JDataSet[(Int, Long, String)]])
+    when(ds.javaSet).thenReturn(jDs)
+    when(jDs.getType).thenReturn(createTypeInformation[(Int, Long, String)])
+
+    val t = ds.toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", t)
+    tEnv
+  }
+
+  private def mockStreamTableEnvironment(): StreamTableEnvironment = {
+    val env = mock(classOf[StreamExecutionEnvironment])
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = mock(classOf[DataStream[(Int, Long, String)]])
+    val jDs = mock(classOf[JDataStream[(Int, Long, String)]])
+    when(ds.javaStream).thenReturn(jDs)
+    when(jDs.getType).thenReturn(createTypeInformation[(Int, Long, String)])
+
+    val t = ds.toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", t)
+    tEnv
+  }
+
+  @Test
+  def testReduceCalcExpressionForBatchSQL(): Unit = {
+    val tEnv = mockBatchTableEnvironment()
+
+    val sqlQuery = "SELECT " +
+      "(3+4)+a, " +
+      "b+(1+2), " +
+      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+      "TRIM(BOTH ' STRING '),  " +
+      "'test' || 'string', " +
+      "NULLIF(1, 1), " +
+      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
+      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
+      "1 IS NULL, " +
+      "'TEST' LIKE '%EST', " +
+      "FLOOR(2.5), " +
+      "'TEST' IN ('west', 'TEST', 'rest'), " +
+      "CAST(TRUE AS VARCHAR) || 'X'" +
+      "FROM MyTable WHERE a>(1+7)"
+
+    val table = tEnv.sql(sqlQuery)
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains(">(_1, 8)"))
+    assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0"))
+    assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1"))
+    assertTrue(optimizedString.contains("'b' AS EXPR$2"))
+    assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
+    assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
+    assertTrue(optimizedString.contains("null AS EXPR$5"))
+    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
+    assertTrue(optimizedString.contains("19 AS EXPR$7"))
+    assertTrue(optimizedString.contains("false AS EXPR$8"))
+    assertTrue(optimizedString.contains("true AS EXPR$9"))
+    assertTrue(optimizedString.contains("2 AS EXPR$10"))
+    assertTrue(optimizedString.contains("true AS EXPR$11"))
+    assertTrue(optimizedString.contains("'TRUEX' AS EXPR$12"))
+  }
+
+  @Test
+  def testReduceProjectExpressionForBatchSQL(): Unit = {
+    val tEnv = mockBatchTableEnvironment()
+
+    val sqlQuery = "SELECT " +
+      "(3+4)+a, " +
+      "b+(1+2), " +
+      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+      "TRIM(BOTH ' STRING '),  " +
+      "'test' || 'string', " +
+      "NULLIF(1, 1), " +
+      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
+      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
+      "1 IS NULL, " +
+      "'TEST' LIKE '%EST', " +
+      "FLOOR(2.5), " +
+      "'TEST' IN ('west', 'TEST', 'rest'), " +
+      "CAST(TRUE AS VARCHAR) || 'X'" +
+      "FROM MyTable"
+
+    val table = tEnv.sql(sqlQuery)
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0"))
+    assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1"))
+    assertTrue(optimizedString.contains("'b' AS EXPR$2"))
+    assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
+    assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
+    assertTrue(optimizedString.contains("null AS EXPR$5"))
+    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
+    assertTrue(optimizedString.contains("19 AS EXPR$7"))
+    assertTrue(optimizedString.contains("false AS EXPR$8"))
+    assertTrue(optimizedString.contains("true AS EXPR$9"))
+    assertTrue(optimizedString.contains("2 AS EXPR$10"))
+    assertTrue(optimizedString.contains("true AS EXPR$11"))
+    assertTrue(optimizedString.contains("'TRUEX' AS EXPR$12"))
+  }
+
+  @Test
+  def testReduceFilterExpressionForBatchSQL(): Unit = {
+    val tEnv = mockBatchTableEnvironment()
+
+    val sqlQuery = "SELECT " +
+      "*" +
+      "FROM MyTable WHERE a>(1+7)"
+
+    val table = tEnv.sql(sqlQuery)
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains(">(_1, 8)"))
+  }
+
+  @Test
+  def testReduceCalcExpressionForBatchTableAPI(): Unit = {
+    val tEnv = mockBatchTableEnvironment()
+
+    val table = tEnv
+      .scan("MyTable")
+      .where('a > (1 + 7))
+      .select((3 + 4).toExpr + 6,
+              (11 === 1) ? ("a", "b"),
+              " STRING ".trim,
+              "test" + "string",
+              "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second,
+              1.isNull,
+              "TEST".like("%EST"),
+              2.5.toExpr.floor(),
+              true.cast(Types.STRING) + "X")
+
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains(">(_1, 8)"))
+    assertTrue(optimizedString.contains("13 AS _c0"))
+    assertTrue(optimizedString.contains("'b' AS _c1"))
+    assertTrue(optimizedString.contains("'STRING' AS _c2"))
+    assertTrue(optimizedString.contains("'teststring' AS _c3"))
+    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
+    assertTrue(optimizedString.contains("false AS _c5"))
+    assertTrue(optimizedString.contains("true AS _c6"))
+    assertTrue(optimizedString.contains("2E0 AS _c7"))
+    assertTrue(optimizedString.contains("'TRUEX' AS _c8"))
+  }
+
+  @Test
+  def testReduceProjectExpressionForBatchTableAPI(): Unit = {
+    val tEnv = mockBatchTableEnvironment()
+
+    val table = tEnv
+      .scan("MyTable")
+      .select((3 + 4).toExpr + 6,
+              (11 === 1) ? ("a", "b"),
+              " STRING ".trim,
+              "test" + "string",
+              "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second,
+              1.isNull,
+              "TEST".like("%EST"),
+              2.5.toExpr.floor(),
+              true.cast(Types.STRING) + "X")
+
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains("13 AS _c0"))
+    assertTrue(optimizedString.contains("'b' AS _c1"))
+    assertTrue(optimizedString.contains("'STRING' AS _c2"))
+    assertTrue(optimizedString.contains("'teststring' AS _c3"))
+    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
+    assertTrue(optimizedString.contains("false AS _c5"))
+    assertTrue(optimizedString.contains("true AS _c6"))
+    assertTrue(optimizedString.contains("2E0 AS _c7"))
+    assertTrue(optimizedString.contains("'TRUEX' AS _c8"))
+  }
+
+  @Test
+  def testReduceFilterExpressionForBatchTableAPI(): Unit = {
+    val tEnv = mockBatchTableEnvironment()
+
+    val table = tEnv
+      .scan("MyTable")
+      .where('a > (1 + 7))
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains(">(_1, 8)"))
+  }
+
+  @Test
+  def testReduceCalcExpressionForStreamSQL(): Unit = {
+    val tEnv = mockStreamTableEnvironment()
+
+    val sqlQuery = "SELECT STREAM " +
+      "(3+4)+a, " +
+      "b+(1+2), " +
+      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+      "TRIM(BOTH ' STRING '),  " +
+      "'test' || 'string', " +
+      "NULLIF(1, 1), " +
+      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
+      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
+      "1 IS NULL, " +
+      "'TEST' LIKE '%EST', " +
+      "FLOOR(2.5), " +
+      "'TEST' IN ('west', 'TEST', 'rest'), " +
+      "CAST(TRUE AS VARCHAR) || 'X'" +
+      "FROM MyTable WHERE a>(1+7)"
+
+    val table = tEnv.sql(sqlQuery)
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains(">(_1, 8)"))
+    assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0"))
+    assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1"))
+    assertTrue(optimizedString.contains("'b' AS EXPR$2"))
+    assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
+    assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
+    assertTrue(optimizedString.contains("null AS EXPR$5"))
+    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
+    assertTrue(optimizedString.contains("19 AS EXPR$7"))
+    assertTrue(optimizedString.contains("false AS EXPR$8"))
+    assertTrue(optimizedString.contains("true AS EXPR$9"))
+    assertTrue(optimizedString.contains("2 AS EXPR$10"))
+    assertTrue(optimizedString.contains("true AS EXPR$11"))
+    assertTrue(optimizedString.contains("'TRUEX' AS EXPR$12"))
+  }
+
+  @Test
+  def testReduceProjectExpressionForStreamSQL(): Unit = {
+    val tEnv = mockStreamTableEnvironment()
+
+    val sqlQuery = "SELECT STREAM " +
+      "(3+4)+a, " +
+      "b+(1+2), " +
+      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+      "TRIM(BOTH ' STRING '),  " +
+      "'test' || 'string', " +
+      "NULLIF(1, 1), " +
+      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
+      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
+      "1 IS NULL, " +
+      "'TEST' LIKE '%EST', " +
+      "FLOOR(2.5), " +
+      "'TEST' IN ('west', 'TEST', 'rest'), " +
+      "CAST(TRUE AS VARCHAR) || 'X'" +
+      "FROM MyTable"
+
+    val table = tEnv.sql(sqlQuery)
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains("+(7, a) AS EXPR$0"))
+    assertTrue(optimizedString.contains("+(b, 3) AS EXPR$1"))
+    assertTrue(optimizedString.contains("'b' AS EXPR$2"))
+    assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
+    assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
+    assertTrue(optimizedString.contains("null AS EXPR$5"))
+    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
+    assertTrue(optimizedString.contains("19 AS EXPR$7"))
+    assertTrue(optimizedString.contains("false AS EXPR$8"))
+    assertTrue(optimizedString.contains("true AS EXPR$9"))
+    assertTrue(optimizedString.contains("2 AS EXPR$10"))
+    assertTrue(optimizedString.contains("true AS EXPR$11"))
+    assertTrue(optimizedString.contains("'TRUEX' AS EXPR$12"))
+  }
+
+  @Test
+  def testReduceFilterExpressionForStreamSQL(): Unit = {
+    val tEnv = mockStreamTableEnvironment()
+
+    val sqlQuery = "SELECT STREAM " +
+      "*" +
+      "FROM MyTable WHERE a>(1+7)"
+
+    val table = tEnv.sql(sqlQuery)
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains(">(_1, 8)"))
+  }
+
+  @Test
+  def testReduceCalcExpressionForStreamTableAPI(): Unit = {
+    val tEnv = mockStreamTableEnvironment()
+
+    val table = tEnv
+      .ingest("MyTable")
+      .where('a > (1 + 7))
+      .select((3 + 4).toExpr + 6,
+              (11 === 1) ? ("a", "b"),
+              " STRING ".trim,
+              "test" + "string",
+              "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second,
+              1.isNull,
+              "TEST".like("%EST"),
+              2.5.toExpr.floor(),
+              true.cast(Types.STRING) + "X")
+
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains(">(_1, 8)"))
+    assertTrue(optimizedString.contains("13 AS _c0"))
+    assertTrue(optimizedString.contains("'b' AS _c1"))
+    assertTrue(optimizedString.contains("'STRING' AS _c2"))
+    assertTrue(optimizedString.contains("'teststring' AS _c3"))
+    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
+    assertTrue(optimizedString.contains("false AS _c5"))
+    assertTrue(optimizedString.contains("true AS _c6"))
+    assertTrue(optimizedString.contains("2E0 AS _c7"))
+    assertTrue(optimizedString.contains("'TRUEX' AS _c8"))
+  }
+
+  @Test
+  def testReduceProjectExpressionForStreamTableAPI(): Unit = {
+    val tEnv = mockStreamTableEnvironment()
+
+    val table =  tEnv
+      .ingest("MyTable")
+      .where('a > (1 + 7))
+      .select((3 + 4).toExpr + 6,
+              (11 === 1) ? ("a", "b"),
+              " STRING ".trim,
+              "test" + "string",
+              "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second,
+              1.isNull,
+              "TEST".like("%EST"),
+              2.5.toExpr.floor(),
+              true.cast(Types.STRING) + "X")
+
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains(">(_1, 8)"))
+    assertTrue(optimizedString.contains("13 AS _c0"))
+    assertTrue(optimizedString.contains("'b' AS _c1"))
+    assertTrue(optimizedString.contains("'STRING' AS _c2"))
+    assertTrue(optimizedString.contains("'teststring' AS _c3"))
+    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
+    assertTrue(optimizedString.contains("false AS _c5"))
+    assertTrue(optimizedString.contains("true AS _c6"))
+    assertTrue(optimizedString.contains("2E0 AS _c7"))
+    assertTrue(optimizedString.contains("'TRUEX' AS _c8"))
+  }
+
+  @Test
+  def testReduceFilterExpressionForStreamTableAPI(): Unit = {
+    val tEnv = mockStreamTableEnvironment()
+
+    val table = tEnv
+      .ingest("MyTable")
+      .where('a > (1 + 7))
+
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains(">(_1, 8)"))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f00e1e7c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala
deleted file mode 100644
index 52bf9ac..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-import org.apache.flink.api.scala.stream.utils.StreamTestData
-import org.apache.flink.api.scala.table._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Test
-import org.junit.Assert._
-
-
-
-class StreamTableEnvironmentTest extends StreamingMultipleProgramsTestBase{
-
-  @Test
-  def testReduceExpression(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val sqlQuery = "SELECT STREAM " +
-      "(3+4)+a, " +
-      "b+(1+2), " +
-      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
-      "TRIM(BOTH ' STRING '),  " +
-      "'test' || 'string', " +
-      "NULLIF(1, 1), " +
-      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
-      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
-      "1 IS NULL, " +
-      "'TEST' LIKE '%EST', " +
-      "FLOOR(2.5), " +
-      "'TEST' IN ('west', 'TEST', 'rest') " +
-      "FROM MyTable WHERE a>(1+7)"
-
-    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", t)
-
-    val table = tEnv.sql(sqlQuery)
-
-    val optimized = tEnv.optimize(table.getRelNode)
-    val optimizedString = optimized.toString
-    assertTrue(optimizedString.contains(">(_1, 8)"))
-    assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0"))
-    assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1"))
-    assertTrue(optimizedString.contains("'b' AS EXPR$2"))
-    assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
-    assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
-    assertTrue(optimizedString.contains("null AS EXPR$5"))
-    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
-    assertTrue(optimizedString.contains("19 AS EXPR$7"))
-    assertTrue(optimizedString.contains("false AS EXPR$8"))
-    assertTrue(optimizedString.contains("true AS EXPR$9"))
-    assertTrue(optimizedString.contains("2 AS EXPR$10"))
-    assertTrue(optimizedString.contains("true AS EXPR$11"))
-  }
-
-  @Test
-  def testReduceExpressionForTableAPI(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-
-    val table = t
-      .where('a > (1+7))
-      .select((3+4).toExpr + 6,
-              (11 === 1) ? ("a", "b"),
-              " STRING ".trim,
-              "test" + "string",
-              "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second,
-              1.isNull,
-              "TEST".like("%EST"),
-              2.5.toExpr.floor())
-
-
-    val optimized = tEnv.optimize(table.getRelNode)
-    val optimizedString = optimized.toString
-    assertTrue(optimizedString.contains(">(_1, 8)"))
-    assertTrue(optimizedString.contains("13 AS _c0"))
-    assertTrue(optimizedString.contains("'b' AS _c1"))
-    assertTrue(optimizedString.contains("'STRING' AS _c2"))
-    assertTrue(optimizedString.contains("'teststring' AS _c3"))
-    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
-    assertTrue(optimizedString.contains("false AS _c5"))
-    assertTrue(optimizedString.contains("true AS _c6"))
-    assertTrue(optimizedString.contains("2E0 AS _c7"))
-  }
-
-}