You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/10/04 12:36:38 UTC
[1/2] flink git commit: [FLINK-4068] [table] Move constant
computations out of code-generated
Repository: flink
Updated Branches:
refs/heads/master 171d10930 -> f00e1e7c5
[FLINK-4068] [table] Move constant computations out of code-generated
This closes #2560.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7113394
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7113394
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7113394
Branch: refs/heads/master
Commit: a71133945e66ec471b4cb07da0693d545c40923c
Parents: 171d109
Author: Jark Wu <wu...@alibaba-inc.com>
Authored: Wed Sep 28 10:31:59 2016 +0800
Committer: twalthr <tw...@apache.org>
Committed: Tue Oct 4 12:02:36 2016 +0200
----------------------------------------------------------------------
.../flink/api/table/BatchTableEnvironment.scala | 48 +++++----
.../flink/api/table/FlinkRelBuilder.scala | 5 +-
.../api/table/StreamTableEnvironment.scala | 40 ++++---
.../flink/api/table/TableEnvironment.scala | 5 +-
.../api/table/plan/logical/operators.scala | 3 +-
.../flink/api/table/plan/nodes/FlinkCalc.scala | 2 +-
.../flink/api/scala/batch/ExplainTest.scala | 4 +-
.../api/table/BatchTableEnvironmentTest.scala | 102 ++++++++++++++++++
.../api/table/StreamTableEnvironmentTest.scala | 106 +++++++++++++++++++
.../src/test/scala/resources/testJoin0.out | 2 +-
.../src/test/scala/resources/testJoin1.out | 2 +-
11 files changed, 277 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
index ad3ff7a..10c2450 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger
import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.RelNode
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.Programs
@@ -228,21 +229,12 @@ abstract class BatchTableEnvironment(
}
/**
- * Translates a [[Table]] into a [[DataSet]].
+ * Generates the optimized [[RelNode]] tree from the original relational node tree.
*
- * The transformation involves optimizing the relational expression tree as defined by
- * Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators.
- *
- * @param table The root node of the relational expression tree.
- * @param tpe The [[TypeInformation]] of the resulting [[DataSet]].
- * @tparam A The type of the resulting [[DataSet]].
- * @return The [[DataSet]] that corresponds to the translated [[Table]].
+ * @param relNode The original [[RelNode]] tree
+ * @return The optimized [[RelNode]] tree
*/
- protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = {
-
- validateType(tpe)
-
- val relNode = table.getRelNode
+ private[flink] def optimize(relNode: RelNode): RelNode = {
// decorrelate
val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
@@ -253,8 +245,7 @@ abstract class BatchTableEnvironment(
val dataSetPlan = try {
optProgram.run(getPlanner, decorPlan, flinkOutputProps)
- }
- catch {
+ } catch {
case e: CannotPlanException =>
throw new TableException(
s"Cannot generate a valid execution plan for the given query: \n\n" +
@@ -263,13 +254,32 @@ abstract class BatchTableEnvironment(
s"Please check the documentation for the set of currently supported SQL features.")
case t: TableException =>
throw new TableException(
- s"Cannot generate a valid execution plan for the given query: \n\n" +
- s"${RelOptUtil.toString(relNode)}\n" +
- s"${t.msg}\n" +
- s"Please check the documentation for the set of currently supported SQL features.")
+ s"Cannot generate a valid execution plan for the given query: \n\n" +
+ s"${RelOptUtil.toString(relNode)}\n" +
+ s"${t.msg}\n" +
+ s"Please check the documentation for the set of currently supported SQL features.")
case a: AssertionError =>
throw a.getCause
}
+ dataSetPlan
+ }
+
+ /**
+ * Translates a [[Table]] into a [[DataSet]].
+ *
+ * The transformation involves optimizing the relational expression tree as defined by
+ * Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators.
+ *
+ * @param table The root node of the relational expression tree.
+ * @param tpe The [[TypeInformation]] of the resulting [[DataSet]].
+ * @tparam A The type of the resulting [[DataSet]].
+ * @return The [[DataSet]] that corresponds to the translated [[Table]].
+ */
+ protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = {
+
+ validateType(tpe)
+
+ val dataSetPlan = optimize(table.getRelNode)
dataSetPlan match {
case node: DataSetRel =>
http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
index 3827f05..34ed4ce 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
@@ -21,8 +21,8 @@ package org.apache.flink.api.table
import org.apache.calcite.jdbc.CalciteSchema
import org.apache.calcite.plan.{Context, RelOptCluster, RelOptPlanner, RelOptSchema}
import org.apache.calcite.prepare.CalciteCatalogReader
-import org.apache.calcite.rex.RexBuilder
-import org.apache.calcite.schema.SchemaPlus
+import org.apache.calcite.rex.{RexExecutorImpl, RexBuilder}
+import org.apache.calcite.schema.{Schemas, SchemaPlus}
import org.apache.calcite.tools.Frameworks.PlannerAction
import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RelBuilder}
@@ -66,6 +66,7 @@ object FlinkRelBuilder {
}
})
val planner = clusters(0).getPlanner
+ planner.setExecutor(config.getExecutor)
val defaultRelOptSchema = relOptSchemas(0).asInstanceOf[CalciteCatalogReader]
// create Flink type factory
http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
index e3e5751..44d90ac 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
@@ -22,10 +22,12 @@ import java.util.concurrent.atomic.AtomicInteger
import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.RelNode
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.Programs
import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel}
@@ -228,22 +230,12 @@ abstract class StreamTableEnvironment(
}
/**
- * Translates a [[Table]] into a [[DataStream]].
- *
- * The transformation involves optimizing the relational expression tree as defined by
- * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators.
+ * Generates the optimized [[RelNode]] tree from the original relational node tree.
*
- * @param table The root node of the relational expression tree.
- * @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
- * @tparam A The type of the resulting [[DataStream]].
- * @return The [[DataStream]] that corresponds to the translated [[Table]].
+ * @param relNode The root node of the relational expression tree.
+ * @return The optimized [[RelNode]] tree
*/
- protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = {
-
- validateType(tpe)
-
- val relNode = table.getRelNode
-
+ private[flink] def optimize(relNode: RelNode): RelNode = {
// decorrelate
val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
@@ -262,6 +254,26 @@ abstract class StreamTableEnvironment(
s"This exception indicates that the query uses an unsupported SQL feature.\n" +
s"Please check the documentation for the set of currently supported SQL features.")
}
+ dataStreamPlan
+ }
+
+
+ /**
+ * Translates a [[Table]] into a [[DataStream]].
+ *
+ * The transformation involves optimizing the relational expression tree as defined by
+ * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators.
+ *
+ * @param table The root node of the relational expression tree.
+ * @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
+ * @tparam A The type of the resulting [[DataStream]].
+ * @return The [[DataStream]] that corresponds to the translated [[Table]].
+ */
+ protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = {
+
+ validateType(tpe)
+
+ val dataStreamPlan = optimize(table.getRelNode)
dataStreamPlan match {
case node: DataStreamRel =>
http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
index f56df0c..02204b1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
@@ -24,7 +24,8 @@ import java.util.concurrent.atomic.AtomicInteger
import org.apache.calcite.config.Lex
import org.apache.calcite.plan.RelOptPlanner
import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.schema.SchemaPlus
+import org.apache.calcite.rex.RexExecutorImpl
+import org.apache.calcite.schema.{Schemas, SchemaPlus}
import org.apache.calcite.schema.impl.AbstractTable
import org.apache.calcite.sql.SqlOperatorTable
import org.apache.calcite.sql.parser.SqlParser
@@ -77,6 +78,8 @@ abstract class TableEnvironment(val config: TableConfig) {
.costFactory(new DataSetCostFactory)
.typeSystem(new FlinkTypeSystem)
.operatorTable(sqlOperatorTable)
+ // set the executor to evaluate constant expressions
+ .executor(new RexExecutorImpl(Schemas.createDataContext(null)))
.build
// the builder for Calcite RelNodes, Calcite's representation of a relational expression tree.
http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
index ccdab85..066e9d6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
@@ -87,7 +87,8 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend
// Calcite's RelBuilder does not translate identity projects even if they rename fields.
// Add a projection ourselves (will be automatically removed by translation rules).
val project = LogicalProject.create(relBuilder.peek(),
- projectList.map(_.toRexNode(relBuilder)).asJava,
+ // avoid AS call
+ projectList.map(_.asInstanceOf[Alias].child.toRexNode(relBuilder)).asJava,
projectList.map(_.name).asJava)
relBuilder.build() // pop previous relNode
relBuilder.push(project)
http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala
index aa5492f..d5f8010 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala
@@ -134,7 +134,7 @@ trait FlinkCalc {
val proj = calcProgram.getProjectList.asScala.toList
val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
val localExprs = calcProgram.getExprList.asScala.toList
- val outFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
+ val outFields = calcProgram.getOutputRowType.getFieldNames.asScala.toList
proj
.map(expression(_, inFields, Some(localExprs)))
http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ExplainTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ExplainTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ExplainTest.scala
index ab70ec5..9d00dda 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ExplainTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ExplainTest.scala
@@ -72,7 +72,7 @@ class ExplainTest
val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
val source = scala.io.Source.fromFile(testFilePath +
"../../src/test/scala/resources/testJoin0.out").mkString.replaceAll("\\r\\n", "\n")
- assertEquals(result, source)
+ assertEquals(source, result)
}
@Test
@@ -87,7 +87,7 @@ class ExplainTest
val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n")
val source = scala.io.Source.fromFile(testFilePath +
"../../src/test/scala/resources/testJoin1.out").mkString.replaceAll("\\r\\n", "\n")
- assertEquals(result, source)
+ assertEquals(source, result)
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala
new file mode 100644
index 0000000..0344dee
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.junit.Assert._
+import org.junit.Test
+
+
+class BatchTableEnvironmentTest {
+
+ @Test
+ def testReduceExpressionForSQL(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val sqlQuery = "SELECT " +
+ "(3+4)+a, " +
+ "b+(1+2), " +
+ "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+ "TRIM(BOTH ' STRING '), " +
+ "'test' || 'string', " +
+ "NULLIF(1, 1), " +
+ "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
+ "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " +
+ "1 IS NULL, " +
+ "'TEST' LIKE '%EST', " +
+ "FLOOR(2.5), " +
+ "'TEST' IN ('west', 'TEST', 'rest') " +
+ "FROM MyTable WHERE a>(1+7)"
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ val table = tEnv.sql(sqlQuery)
+
+ val optimized = tEnv.optimize(table.getRelNode)
+ val optimizedString = optimized.toString
+ assertTrue(optimizedString.contains(">(_1, 8)"))
+ assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0"))
+ assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1"))
+ assertTrue(optimizedString.contains("'b' AS EXPR$2"))
+ assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
+ assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
+ assertTrue(optimizedString.contains("null AS EXPR$5"))
+ assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
+ assertTrue(optimizedString.contains("19 AS EXPR$7"))
+ assertTrue(optimizedString.contains("false AS EXPR$8"))
+ assertTrue(optimizedString.contains("true AS EXPR$9"))
+ assertTrue(optimizedString.contains("2 AS EXPR$10"))
+ assertTrue(optimizedString.contains("true AS EXPR$11"))
+ }
+
+ @Test
+ def testReduceExpressionForTableAPI(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+
+ val table = ds
+ .where('a > (1+7))
+ .select((3+4).toExpr + 6,
+ (11 === 1) ? ("a", "b"),
+ " STRING ".trim,
+ "test" + "string",
+ "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second,
+ 1.isNull,
+ "TEST".like("%EST"),
+ 2.5.toExpr.floor())
+
+
+ val optimized = tEnv.optimize(table.getRelNode)
+ val optimizedString = optimized.toString
+ assertTrue(optimizedString.contains(">(_1, 8)"))
+ assertTrue(optimizedString.contains("13 AS _c0"))
+ assertTrue(optimizedString.contains("'b' AS _c1"))
+ assertTrue(optimizedString.contains("'STRING' AS _c2"))
+ assertTrue(optimizedString.contains("'teststring' AS _c3"))
+ assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
+ assertTrue(optimizedString.contains("false AS _c5"))
+ assertTrue(optimizedString.contains("true AS _c6"))
+ assertTrue(optimizedString.contains("2E0 AS _c7"))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala
new file mode 100644
index 0000000..52bf9ac
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+import org.apache.flink.api.scala.stream.utils.StreamTestData
+import org.apache.flink.api.scala.table._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Test
+import org.junit.Assert._
+
+
+
+class StreamTableEnvironmentTest extends StreamingMultipleProgramsTestBase{
+
+ @Test
+ def testReduceExpression(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val sqlQuery = "SELECT STREAM " +
+ "(3+4)+a, " +
+ "b+(1+2), " +
+ "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+ "TRIM(BOTH ' STRING '), " +
+ "'test' || 'string', " +
+ "NULLIF(1, 1), " +
+ "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
+ "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " +
+ "1 IS NULL, " +
+ "'TEST' LIKE '%EST', " +
+ "FLOOR(2.5), " +
+ "'TEST' IN ('west', 'TEST', 'rest') " +
+ "FROM MyTable WHERE a>(1+7)"
+
+ val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", t)
+
+ val table = tEnv.sql(sqlQuery)
+
+ val optimized = tEnv.optimize(table.getRelNode)
+ val optimizedString = optimized.toString
+ assertTrue(optimizedString.contains(">(_1, 8)"))
+ assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0"))
+ assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1"))
+ assertTrue(optimizedString.contains("'b' AS EXPR$2"))
+ assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
+ assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
+ assertTrue(optimizedString.contains("null AS EXPR$5"))
+ assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
+ assertTrue(optimizedString.contains("19 AS EXPR$7"))
+ assertTrue(optimizedString.contains("false AS EXPR$8"))
+ assertTrue(optimizedString.contains("true AS EXPR$9"))
+ assertTrue(optimizedString.contains("2 AS EXPR$10"))
+ assertTrue(optimizedString.contains("true AS EXPR$11"))
+ }
+
+ @Test
+ def testReduceExpressionForTableAPI(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+
+ val table = t
+ .where('a > (1+7))
+ .select((3+4).toExpr + 6,
+ (11 === 1) ? ("a", "b"),
+ " STRING ".trim,
+ "test" + "string",
+ "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second,
+ 1.isNull,
+ "TEST".like("%EST"),
+ 2.5.toExpr.floor())
+
+
+ val optimized = tEnv.optimize(table.getRelNode)
+ val optimizedString = optimized.toString
+ assertTrue(optimizedString.contains(">(_1, 8)"))
+ assertTrue(optimizedString.contains("13 AS _c0"))
+ assertTrue(optimizedString.contains("'b' AS _c1"))
+ assertTrue(optimizedString.contains("'STRING' AS _c2"))
+ assertTrue(optimizedString.contains("'teststring' AS _c3"))
+ assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
+ assertTrue(optimizedString.contains("false AS _c5"))
+ assertTrue(optimizedString.contains("true AS _c6"))
+ assertTrue(optimizedString.contains("2E0 AS _c7"))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
index f71ea9f..11961ef 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
@@ -36,7 +36,7 @@ Stage 6 : Data Source
Partitioning : RANDOM_PARTITIONED
Stage 1 : FlatMap
- content : select: (a, c AS b)
+ content : select: (a, c)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : FlatMap
http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
index f117cd9..c6e8b34 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
@@ -121,7 +121,7 @@ Stage 6 : Data Source
Filter Factor : (none)
Stage 1 : FlatMap
- content : select: (a, c AS b)
+ content : select: (a, c)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : FlatMap
[2/2] flink git commit: [FLINK-4068] [table] Reduce expression also
for filter/project
Posted by tw...@apache.org.
[FLINK-4068] [table] Reduce expression also for filter/project
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f00e1e7c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f00e1e7c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f00e1e7c
Branch: refs/heads/master
Commit: f00e1e7c5578caf52eaffc5cbdd102589c13f52d
Parents: a711339
Author: twalthr <tw...@apache.org>
Authored: Fri Sep 30 17:30:44 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Tue Oct 4 14:30:48 2016 +0200
----------------------------------------------------------------------
.../flink/api/table/FlinkTypeFactory.scala | 12 +-
.../flink/api/table/FlinkTypeSystem.scala | 9 +
.../api/table/plan/rules/FlinkRuleSets.scala | 7 +
.../api/table/BatchTableEnvironmentTest.scala | 102 -----
.../api/table/ExpressionReductionTest.scala | 400 +++++++++++++++++++
.../api/table/StreamTableEnvironmentTest.scala | 106 -----
6 files changed, 427 insertions(+), 209 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f00e1e7c/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
index 581ecde..77eb907 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
@@ -22,7 +22,7 @@ import org.apache.calcite.avatica.util.TimeUnit
import org.apache.calcite.jdbc.JavaTypeFactoryImpl
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
import org.apache.calcite.sql.SqlIntervalQualifier
-import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName, SqlTypeUtil}
import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
@@ -68,6 +68,16 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
}
}
+ override def createSqlType(typeName: SqlTypeName, precision: Int): RelDataType = {
+ // it might happen that inferred VARCHAR types overflow as we set them to Int.MaxValue
+ // always set those to default value
+ if (typeName == VARCHAR && precision < 0) {
+ createSqlType(typeName, getTypeSystem.getDefaultPrecision(typeName))
+ } else {
+ super.createSqlType(typeName, precision)
+ }
+ }
+
private def createAdvancedType(typeInfo: TypeInformation[_]): RelDataType = typeInfo match {
// TODO add specific RelDataTypes
// for PrimitiveArrayTypeInfo, ObjectArrayTypeInfo, CompositeType
http://git-wip-us.apache.org/repos/asf/flink/blob/f00e1e7c/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala
index df6022a..2df043f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala
@@ -19,6 +19,7 @@
package org.apache.flink.api.table
import org.apache.calcite.rel.`type`.RelDataTypeSystemImpl
+import org.apache.calcite.sql.`type`.SqlTypeName
/**
* Custom type system for Flink.
@@ -33,4 +34,12 @@ class FlinkTypeSystem extends RelDataTypeSystemImpl {
// half should be enough for all use cases
override def getMaxNumericPrecision: Int = Int.MaxValue / 2
+ override def getDefaultPrecision(typeName: SqlTypeName): Int = typeName match {
+ // by default all VARCHARs can have the Java default length
+ case SqlTypeName.VARCHAR =>
+ Int.MaxValue
+ case _ =>
+ super.getDefaultPrecision(typeName)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f00e1e7c/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index ddfa578..7d915e6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -75,6 +75,8 @@ object FlinkRuleSets {
SortRemoveRule.INSTANCE,
// simplify expressions rules
+ ReduceExpressionsRule.FILTER_INSTANCE,
+ ReduceExpressionsRule.PROJECT_INSTANCE,
ReduceExpressionsRule.CALC_INSTANCE,
ReduceExpressionsRule.JOIN_INSTANCE,
@@ -113,6 +115,9 @@ object FlinkRuleSets {
val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(
RemoveDeltaRule.INSTANCE,
+
+ // convert a logical table scan to a relational expression
+ TableScanRule.INSTANCE,
EnumerableToLogicalTableScan.INSTANCE,
// calc rules
@@ -133,6 +138,8 @@ object FlinkRuleSets {
ProjectRemoveRule.INSTANCE,
// simplify expressions rules
+ ReduceExpressionsRule.FILTER_INSTANCE,
+ ReduceExpressionsRule.PROJECT_INSTANCE,
ReduceExpressionsRule.CALC_INSTANCE,
// merge and push unions rules
http://git-wip-us.apache.org/repos/asf/flink/blob/f00e1e7c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala
deleted file mode 100644
index 0344dee..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.junit.Assert._
-import org.junit.Test
-
-
-class BatchTableEnvironmentTest {
-
- @Test
- def testReduceExpressionForSQL(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val sqlQuery = "SELECT " +
- "(3+4)+a, " +
- "b+(1+2), " +
- "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
- "TRIM(BOTH ' STRING '), " +
- "'test' || 'string', " +
- "NULLIF(1, 1), " +
- "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
- "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " +
- "1 IS NULL, " +
- "'TEST' LIKE '%EST', " +
- "FLOOR(2.5), " +
- "'TEST' IN ('west', 'TEST', 'rest') " +
- "FROM MyTable WHERE a>(1+7)"
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("MyTable", ds)
-
- val table = tEnv.sql(sqlQuery)
-
- val optimized = tEnv.optimize(table.getRelNode)
- val optimizedString = optimized.toString
- assertTrue(optimizedString.contains(">(_1, 8)"))
- assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0"))
- assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1"))
- assertTrue(optimizedString.contains("'b' AS EXPR$2"))
- assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
- assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
- assertTrue(optimizedString.contains("null AS EXPR$5"))
- assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
- assertTrue(optimizedString.contains("19 AS EXPR$7"))
- assertTrue(optimizedString.contains("false AS EXPR$8"))
- assertTrue(optimizedString.contains("true AS EXPR$9"))
- assertTrue(optimizedString.contains("2 AS EXPR$10"))
- assertTrue(optimizedString.contains("true AS EXPR$11"))
- }
-
- @Test
- def testReduceExpressionForTableAPI(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
-
- val table = ds
- .where('a > (1+7))
- .select((3+4).toExpr + 6,
- (11 === 1) ? ("a", "b"),
- " STRING ".trim,
- "test" + "string",
- "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second,
- 1.isNull,
- "TEST".like("%EST"),
- 2.5.toExpr.floor())
-
-
- val optimized = tEnv.optimize(table.getRelNode)
- val optimizedString = optimized.toString
- assertTrue(optimizedString.contains(">(_1, 8)"))
- assertTrue(optimizedString.contains("13 AS _c0"))
- assertTrue(optimizedString.contains("'b' AS _c1"))
- assertTrue(optimizedString.contains("'STRING' AS _c2"))
- assertTrue(optimizedString.contains("'teststring' AS _c3"))
- assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
- assertTrue(optimizedString.contains("false AS _c5"))
- assertTrue(optimizedString.contains("true AS _c6"))
- assertTrue(optimizedString.contains("2E0 AS _c7"))
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f00e1e7c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
new file mode 100644
index 0000000..4830b75
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+import org.apache.flink.api.java.{DataSet => JDataSet}
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, createTypeInformation}
+import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.junit.Assert._
+import org.junit.Test
+import org.mockito.Mockito.{mock, when}
+
+class ExpressionReductionTest {
+
+ private def mockBatchTableEnvironment(): BatchTableEnvironment = {
+ val env = mock(classOf[ExecutionEnvironment])
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val ds = mock(classOf[DataSet[(Int, Long, String)]])
+ val jDs = mock(classOf[JDataSet[(Int, Long, String)]])
+ when(ds.javaSet).thenReturn(jDs)
+ when(jDs.getType).thenReturn(createTypeInformation[(Int, Long, String)])
+
+ val t = ds.toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", t)
+ tEnv
+ }
+
+ private def mockStreamTableEnvironment(): StreamTableEnvironment = {
+ val env = mock(classOf[StreamExecutionEnvironment])
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val ds = mock(classOf[DataStream[(Int, Long, String)]])
+ val jDs = mock(classOf[JDataStream[(Int, Long, String)]])
+ when(ds.javaStream).thenReturn(jDs)
+ when(jDs.getType).thenReturn(createTypeInformation[(Int, Long, String)])
+
+ val t = ds.toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", t)
+ tEnv
+ }
+
+ @Test
+ def testReduceCalcExpressionForBatchSQL(): Unit = {
+ val tEnv = mockBatchTableEnvironment()
+
+ val sqlQuery = "SELECT " +
+ "(3+4)+a, " +
+ "b+(1+2), " +
+ "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+ "TRIM(BOTH ' STRING '), " +
+ "'test' || 'string', " +
+ "NULLIF(1, 1), " +
+ "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
+ "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " +
+ "1 IS NULL, " +
+ "'TEST' LIKE '%EST', " +
+ "FLOOR(2.5), " +
+ "'TEST' IN ('west', 'TEST', 'rest'), " +
+ "CAST(TRUE AS VARCHAR) || 'X'" +
+ "FROM MyTable WHERE a>(1+7)"
+
+ val table = tEnv.sql(sqlQuery)
+
+ val optimized = tEnv.optimize(table.getRelNode)
+ val optimizedString = optimized.toString
+ assertTrue(optimizedString.contains(">(_1, 8)"))
+ assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0"))
+ assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1"))
+ assertTrue(optimizedString.contains("'b' AS EXPR$2"))
+ assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
+ assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
+ assertTrue(optimizedString.contains("null AS EXPR$5"))
+ assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
+ assertTrue(optimizedString.contains("19 AS EXPR$7"))
+ assertTrue(optimizedString.contains("false AS EXPR$8"))
+ assertTrue(optimizedString.contains("true AS EXPR$9"))
+ assertTrue(optimizedString.contains("2 AS EXPR$10"))
+ assertTrue(optimizedString.contains("true AS EXPR$11"))
+ assertTrue(optimizedString.contains("'TRUEX' AS EXPR$12"))
+ }
+
+ @Test
+ def testReduceProjectExpressionForBatchSQL(): Unit = {
+ val tEnv = mockBatchTableEnvironment()
+
+ val sqlQuery = "SELECT " +
+ "(3+4)+a, " +
+ "b+(1+2), " +
+ "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+ "TRIM(BOTH ' STRING '), " +
+ "'test' || 'string', " +
+ "NULLIF(1, 1), " +
+ "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
+ "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " +
+ "1 IS NULL, " +
+ "'TEST' LIKE '%EST', " +
+ "FLOOR(2.5), " +
+ "'TEST' IN ('west', 'TEST', 'rest'), " +
+ "CAST(TRUE AS VARCHAR) || 'X'" +
+ "FROM MyTable"
+
+ val table = tEnv.sql(sqlQuery)
+
+ val optimized = tEnv.optimize(table.getRelNode)
+ val optimizedString = optimized.toString
+ assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0"))
+ assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1"))
+ assertTrue(optimizedString.contains("'b' AS EXPR$2"))
+ assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
+ assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
+ assertTrue(optimizedString.contains("null AS EXPR$5"))
+ assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
+ assertTrue(optimizedString.contains("19 AS EXPR$7"))
+ assertTrue(optimizedString.contains("false AS EXPR$8"))
+ assertTrue(optimizedString.contains("true AS EXPR$9"))
+ assertTrue(optimizedString.contains("2 AS EXPR$10"))
+ assertTrue(optimizedString.contains("true AS EXPR$11"))
+ assertTrue(optimizedString.contains("'TRUEX' AS EXPR$12"))
+ }
+
+ @Test
+ def testReduceFilterExpressionForBatchSQL(): Unit = {
+ val tEnv = mockBatchTableEnvironment()
+
+ val sqlQuery = "SELECT " +
+ "*" +
+ "FROM MyTable WHERE a>(1+7)"
+
+ val table = tEnv.sql(sqlQuery)
+
+ val optimized = tEnv.optimize(table.getRelNode)
+ val optimizedString = optimized.toString
+ assertTrue(optimizedString.contains(">(_1, 8)"))
+ }
+
+ @Test
+ def testReduceCalcExpressionForBatchTableAPI(): Unit = {
+ val tEnv = mockBatchTableEnvironment()
+
+ val table = tEnv
+ .scan("MyTable")
+ .where('a > (1 + 7))
+ .select((3 + 4).toExpr + 6,
+ (11 === 1) ? ("a", "b"),
+ " STRING ".trim,
+ "test" + "string",
+ "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second,
+ 1.isNull,
+ "TEST".like("%EST"),
+ 2.5.toExpr.floor(),
+ true.cast(Types.STRING) + "X")
+
+
+ val optimized = tEnv.optimize(table.getRelNode)
+ val optimizedString = optimized.toString
+ assertTrue(optimizedString.contains(">(_1, 8)"))
+ assertTrue(optimizedString.contains("13 AS _c0"))
+ assertTrue(optimizedString.contains("'b' AS _c1"))
+ assertTrue(optimizedString.contains("'STRING' AS _c2"))
+ assertTrue(optimizedString.contains("'teststring' AS _c3"))
+ assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
+ assertTrue(optimizedString.contains("false AS _c5"))
+ assertTrue(optimizedString.contains("true AS _c6"))
+ assertTrue(optimizedString.contains("2E0 AS _c7"))
+ assertTrue(optimizedString.contains("'TRUEX' AS _c8"))
+ }
+
+ @Test
+ def testReduceProjectExpressionForBatchTableAPI(): Unit = {
+ val tEnv = mockBatchTableEnvironment()
+
+ val table = tEnv
+ .scan("MyTable")
+ .select((3 + 4).toExpr + 6,
+ (11 === 1) ? ("a", "b"),
+ " STRING ".trim,
+ "test" + "string",
+ "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second,
+ 1.isNull,
+ "TEST".like("%EST"),
+ 2.5.toExpr.floor(),
+ true.cast(Types.STRING) + "X")
+
+
+ val optimized = tEnv.optimize(table.getRelNode)
+ val optimizedString = optimized.toString
+ assertTrue(optimizedString.contains("13 AS _c0"))
+ assertTrue(optimizedString.contains("'b' AS _c1"))
+ assertTrue(optimizedString.contains("'STRING' AS _c2"))
+ assertTrue(optimizedString.contains("'teststring' AS _c3"))
+ assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
+ assertTrue(optimizedString.contains("false AS _c5"))
+ assertTrue(optimizedString.contains("true AS _c6"))
+ assertTrue(optimizedString.contains("2E0 AS _c7"))
+ assertTrue(optimizedString.contains("'TRUEX' AS _c8"))
+ }
+
+ @Test
+ def testReduceFilterExpressionForBatchTableAPI(): Unit = {
+ val tEnv = mockBatchTableEnvironment()
+
+ val table = tEnv
+ .scan("MyTable")
+ .where('a > (1 + 7))
+
+ val optimized = tEnv.optimize(table.getRelNode)
+ val optimizedString = optimized.toString
+ assertTrue(optimizedString.contains(">(_1, 8)"))
+ }
+
+ @Test
+ def testReduceCalcExpressionForStreamSQL(): Unit = {
+ val tEnv = mockStreamTableEnvironment()
+
+ val sqlQuery = "SELECT STREAM " +
+ "(3+4)+a, " +
+ "b+(1+2), " +
+ "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+ "TRIM(BOTH ' STRING '), " +
+ "'test' || 'string', " +
+ "NULLIF(1, 1), " +
+ "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
+ "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " +
+ "1 IS NULL, " +
+ "'TEST' LIKE '%EST', " +
+ "FLOOR(2.5), " +
+ "'TEST' IN ('west', 'TEST', 'rest'), " +
+ "CAST(TRUE AS VARCHAR) || 'X'" +
+ "FROM MyTable WHERE a>(1+7)"
+
+ val table = tEnv.sql(sqlQuery)
+
+ val optimized = tEnv.optimize(table.getRelNode)
+ val optimizedString = optimized.toString
+ assertTrue(optimizedString.contains(">(_1, 8)"))
+ assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0"))
+ assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1"))
+ assertTrue(optimizedString.contains("'b' AS EXPR$2"))
+ assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
+ assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
+ assertTrue(optimizedString.contains("null AS EXPR$5"))
+ assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
+ assertTrue(optimizedString.contains("19 AS EXPR$7"))
+ assertTrue(optimizedString.contains("false AS EXPR$8"))
+ assertTrue(optimizedString.contains("true AS EXPR$9"))
+ assertTrue(optimizedString.contains("2 AS EXPR$10"))
+ assertTrue(optimizedString.contains("true AS EXPR$11"))
+ assertTrue(optimizedString.contains("'TRUEX' AS EXPR$12"))
+ }
+
+ @Test
+ def testReduceProjectExpressionForStreamSQL(): Unit = {
+ val tEnv = mockStreamTableEnvironment()
+
+ val sqlQuery = "SELECT STREAM " +
+ "(3+4)+a, " +
+ "b+(1+2), " +
+ "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+ "TRIM(BOTH ' STRING '), " +
+ "'test' || 'string', " +
+ "NULLIF(1, 1), " +
+ "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
+ "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " +
+ "1 IS NULL, " +
+ "'TEST' LIKE '%EST', " +
+ "FLOOR(2.5), " +
+ "'TEST' IN ('west', 'TEST', 'rest'), " +
+ "CAST(TRUE AS VARCHAR) || 'X'" +
+ "FROM MyTable"
+
+ val table = tEnv.sql(sqlQuery)
+
+ val optimized = tEnv.optimize(table.getRelNode)
+ val optimizedString = optimized.toString
+ assertTrue(optimizedString.contains("+(7, a) AS EXPR$0"))
+ assertTrue(optimizedString.contains("+(b, 3) AS EXPR$1"))
+ assertTrue(optimizedString.contains("'b' AS EXPR$2"))
+ assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
+ assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
+ assertTrue(optimizedString.contains("null AS EXPR$5"))
+ assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
+ assertTrue(optimizedString.contains("19 AS EXPR$7"))
+ assertTrue(optimizedString.contains("false AS EXPR$8"))
+ assertTrue(optimizedString.contains("true AS EXPR$9"))
+ assertTrue(optimizedString.contains("2 AS EXPR$10"))
+ assertTrue(optimizedString.contains("true AS EXPR$11"))
+ assertTrue(optimizedString.contains("'TRUEX' AS EXPR$12"))
+ }
+
+ @Test
+ def testReduceFilterExpressionForStreamSQL(): Unit = {
+ val tEnv = mockStreamTableEnvironment()
+
+ val sqlQuery = "SELECT STREAM " +
+ "*" +
+ "FROM MyTable WHERE a>(1+7)"
+
+ val table = tEnv.sql(sqlQuery)
+
+ val optimized = tEnv.optimize(table.getRelNode)
+ val optimizedString = optimized.toString
+ assertTrue(optimizedString.contains(">(_1, 8)"))
+ }
+
+ @Test
+ def testReduceCalcExpressionForStreamTableAPI(): Unit = {
+ val tEnv = mockStreamTableEnvironment()
+
+ val table = tEnv
+ .ingest("MyTable")
+ .where('a > (1 + 7))
+ .select((3 + 4).toExpr + 6,
+ (11 === 1) ? ("a", "b"),
+ " STRING ".trim,
+ "test" + "string",
+ "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second,
+ 1.isNull,
+ "TEST".like("%EST"),
+ 2.5.toExpr.floor(),
+ true.cast(Types.STRING) + "X")
+
+
+ val optimized = tEnv.optimize(table.getRelNode)
+ val optimizedString = optimized.toString
+ assertTrue(optimizedString.contains(">(_1, 8)"))
+ assertTrue(optimizedString.contains("13 AS _c0"))
+ assertTrue(optimizedString.contains("'b' AS _c1"))
+ assertTrue(optimizedString.contains("'STRING' AS _c2"))
+ assertTrue(optimizedString.contains("'teststring' AS _c3"))
+ assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
+ assertTrue(optimizedString.contains("false AS _c5"))
+ assertTrue(optimizedString.contains("true AS _c6"))
+ assertTrue(optimizedString.contains("2E0 AS _c7"))
+ assertTrue(optimizedString.contains("'TRUEX' AS _c8"))
+ }
+
+ @Test
+ def testReduceProjectExpressionForStreamTableAPI(): Unit = {
+ val tEnv = mockStreamTableEnvironment()
+
+ val table = tEnv
+ .ingest("MyTable")
+ .where('a > (1 + 7))
+ .select((3 + 4).toExpr + 6,
+ (11 === 1) ? ("a", "b"),
+ " STRING ".trim,
+ "test" + "string",
+ "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second,
+ 1.isNull,
+ "TEST".like("%EST"),
+ 2.5.toExpr.floor(),
+ true.cast(Types.STRING) + "X")
+
+
+ val optimized = tEnv.optimize(table.getRelNode)
+ val optimizedString = optimized.toString
+ assertTrue(optimizedString.contains(">(_1, 8)"))
+ assertTrue(optimizedString.contains("13 AS _c0"))
+ assertTrue(optimizedString.contains("'b' AS _c1"))
+ assertTrue(optimizedString.contains("'STRING' AS _c2"))
+ assertTrue(optimizedString.contains("'teststring' AS _c3"))
+ assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
+ assertTrue(optimizedString.contains("false AS _c5"))
+ assertTrue(optimizedString.contains("true AS _c6"))
+ assertTrue(optimizedString.contains("2E0 AS _c7"))
+ assertTrue(optimizedString.contains("'TRUEX' AS _c8"))
+ }
+
+ @Test
+ def testReduceFilterExpressionForStreamTableAPI(): Unit = {
+ val tEnv = mockStreamTableEnvironment()
+
+ val table = tEnv
+ .ingest("MyTable")
+ .where('a > (1 + 7))
+
+
+ val optimized = tEnv.optimize(table.getRelNode)
+ val optimizedString = optimized.toString
+ assertTrue(optimizedString.contains(">(_1, 8)"))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f00e1e7c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala
deleted file mode 100644
index 52bf9ac..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-import org.apache.flink.api.scala.stream.utils.StreamTestData
-import org.apache.flink.api.scala.table._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Test
-import org.junit.Assert._
-
-
-
-class StreamTableEnvironmentTest extends StreamingMultipleProgramsTestBase{
-
- @Test
- def testReduceExpression(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val sqlQuery = "SELECT STREAM " +
- "(3+4)+a, " +
- "b+(1+2), " +
- "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
- "TRIM(BOTH ' STRING '), " +
- "'test' || 'string', " +
- "NULLIF(1, 1), " +
- "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
- "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " +
- "1 IS NULL, " +
- "'TEST' LIKE '%EST', " +
- "FLOOR(2.5), " +
- "'TEST' IN ('west', 'TEST', 'rest') " +
- "FROM MyTable WHERE a>(1+7)"
-
- val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("MyTable", t)
-
- val table = tEnv.sql(sqlQuery)
-
- val optimized = tEnv.optimize(table.getRelNode)
- val optimizedString = optimized.toString
- assertTrue(optimizedString.contains(">(_1, 8)"))
- assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0"))
- assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1"))
- assertTrue(optimizedString.contains("'b' AS EXPR$2"))
- assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
- assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
- assertTrue(optimizedString.contains("null AS EXPR$5"))
- assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
- assertTrue(optimizedString.contains("19 AS EXPR$7"))
- assertTrue(optimizedString.contains("false AS EXPR$8"))
- assertTrue(optimizedString.contains("true AS EXPR$9"))
- assertTrue(optimizedString.contains("2 AS EXPR$10"))
- assertTrue(optimizedString.contains("true AS EXPR$11"))
- }
-
- @Test
- def testReduceExpressionForTableAPI(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-
- val table = t
- .where('a > (1+7))
- .select((3+4).toExpr + 6,
- (11 === 1) ? ("a", "b"),
- " STRING ".trim,
- "test" + "string",
- "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second,
- 1.isNull,
- "TEST".like("%EST"),
- 2.5.toExpr.floor())
-
-
- val optimized = tEnv.optimize(table.getRelNode)
- val optimizedString = optimized.toString
- assertTrue(optimizedString.contains(">(_1, 8)"))
- assertTrue(optimizedString.contains("13 AS _c0"))
- assertTrue(optimizedString.contains("'b' AS _c1"))
- assertTrue(optimizedString.contains("'STRING' AS _c2"))
- assertTrue(optimizedString.contains("'teststring' AS _c3"))
- assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
- assertTrue(optimizedString.contains("false AS _c5"))
- assertTrue(optimizedString.contains("true AS _c6"))
- assertTrue(optimizedString.contains("2E0 AS _c7"))
- }
-
-}