You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2016/04/11 13:05:47 UTC
flink git commit: [FLINK-3640] Add support for SQL in DataSet programs
Repository: flink
Updated Branches:
refs/heads/master b368cb2d5 -> ed1e52a10
[FLINK-3640] Add support for SQL in DataSet programs
- add EnumerableToLogicalScan rule
- in order to be able to mix TableAPI and SQL, we need our own copy of PlannerImpl
- create a dummy RelNode in the reset() method, in order to retrieve the RelOptPlanner
This closes #1862
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed1e52a1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed1e52a1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed1e52a1
Branch: refs/heads/master
Commit: ed1e52a106e9ce3789fab975e34715de2d923bec
Parents: b368cb2
Author: vasia <va...@apache.org>
Authored: Wed Mar 23 19:35:06 2016 +0100
Committer: vasia <va...@apache.org>
Committed: Mon Apr 11 12:27:35 2016 +0200
----------------------------------------------------------------------
.../api/java/table/JavaBatchTranslator.scala | 30 ++-
.../api/table/AbstractTableEnvironment.scala | 15 ++
.../api/table/FlinkCalciteSqlValidator.scala | 47 ++++
.../flink/api/table/FlinkPlannerImpl.scala | 180 +++++++++++++
.../flink/api/table/plan/PlanTranslator.scala | 2 -
.../api/table/plan/TranslationContext.scala | 24 +-
.../rules/EnumerableToLogicalTableScan.scala | 50 ++++
.../api/table/plan/rules/FlinkRuleSets.scala | 1 +
.../api/scala/sql/test/AggregationsITCase.scala | 224 ++++++++++++++++
.../flink/api/scala/sql/test/FilterITCase.scala | 166 ++++++++++++
.../flink/api/scala/sql/test/JoinITCase.scala | 259 +++++++++++++++++++
.../flink/api/scala/sql/test/SelectITCase.scala | 155 +++++++++++
.../api/scala/sql/test/TableWithSQLITCase.scala | 103 ++++++++
.../flink/api/scala/sql/test/UnionITCase.scala | 109 ++++++++
.../api/scala/table/test/FilterITCase.scala | 1 -
15 files changed, 1348 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
index 028711b..4688c82 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
@@ -19,14 +19,14 @@
package org.apache.flink.api.java.table
import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
-import org.apache.calcite.plan.{RelTraitSet, RelOptUtil}
-import org.apache.calcite.rel.{RelCollations, RelNode}
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.RelNode
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.Programs
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.{DataSet => JavaDataSet}
import org.apache.flink.api.table.plan._
-import org.apache.flink.api.table.{TableConfig, Table}
+import org.apache.flink.api.table.{FlinkPlannerImpl, TableConfig, Table}
import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetRel}
import org.apache.flink.api.table.plan.rules.FlinkRuleSets
import org.apache.flink.api.table.plan.schema.DataSetTable
@@ -56,7 +56,8 @@ class JavaBatchTranslator(config: TableConfig) extends PlanTranslator {
// create table scan operator
relBuilder.scan(tabName)
- new Table(relBuilder.build(), relBuilder)
+ val relNode = relBuilder.build()
+ new Table(relNode, relBuilder)
}
override def translate[A](lPlan: RelNode)(implicit tpe: TypeInformation[A]): JavaDataSet[A] = {
@@ -69,9 +70,7 @@ class JavaBatchTranslator(config: TableConfig) extends PlanTranslator {
// optimize the logical Flink plan
val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)
- val flinkOutputProps = RelTraitSet.createEmpty()
- .plus(DataSetConvention.INSTANCE)
- .plus(RelCollations.of()).simplify()
+ val flinkOutputProps = lPlan.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
val dataSetPlan = try {
optProgram.run(planner, decorPlan, flinkOutputProps)
@@ -97,4 +96,21 @@ class JavaBatchTranslator(config: TableConfig) extends PlanTranslator {
}
+ /**
+ * Parse, validate, and translate a SQL query into a relNode Table
+ */
+ def translateSQL(query: String): Table = {
+
+ val frameworkConfig = TranslationContext.getFrameworkConfig
+ val planner = new FlinkPlannerImpl(frameworkConfig, TranslationContext.getPlanner)
+ // parse the sql query
+ val parsed = planner.parse(query)
+ // validate the sql query
+ val validated = planner.validate(parsed)
+ // transform to a relational tree
+ val relational = planner.rel(validated)
+
+ new Table(relational.rel, TranslationContext.getRelBuilder)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/AbstractTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/AbstractTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/AbstractTableEnvironment.scala
index 4dedc47..a0f162e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/AbstractTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/AbstractTableEnvironment.scala
@@ -19,6 +19,7 @@
package org.apache.flink.api.table
import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.table.JavaBatchTranslator
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.table.plan.TranslationContext
import org.apache.flink.api.table.plan.schema.{DataSetTable, TableTable}
@@ -83,4 +84,18 @@ class AbstractTableEnvironment {
)
TranslationContext.registerTable(dataSetTable, name)
}
+
+ /**
+ * Execute a SQL query and retrieve the result as a [[Table]].
+ * All input [[Table]]s have to be registered in the
+ * [[org.apache.flink.api.java.table.TableEnvironment]] with unique names,
+ * using [[registerTable()]] or
+ * [[org.apache.flink.api.java.table.TableEnvironment.registerDataSet()]]
+ *
+ * @param query the SQL query
+ * @return the result of the SQL query as a [[Table]]
+ */
+ def sql(query: String): Table = {
+ new JavaBatchTranslator(config).translateSQL(query)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkCalciteSqlValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkCalciteSqlValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkCalciteSqlValidator.scala
new file mode 100644
index 0000000..b1ccc09
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkCalciteSqlValidator.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import org.apache.calcite.adapter.java.JavaTypeFactory
+import org.apache.calcite.prepare.CalciteCatalogReader
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql.{SqlInsert, SqlOperatorTable}
+import org.apache.calcite.sql.validate.{SqlValidatorImpl, SqlConformance}
+
+/**
+ * This is a copy of Calcite's CalciteSqlValidator to use with [[FlinkPlannerImpl]].
+ */
+class FlinkCalciteSqlValidator(
+ opTab: SqlOperatorTable,
+ catalogReader: CalciteCatalogReader,
+ typeFactory: JavaTypeFactory) extends SqlValidatorImpl(
+ opTab, catalogReader, typeFactory, SqlConformance.DEFAULT) {
+
+ override def getLogicalSourceRowType(
+ sourceRowType: RelDataType,
+ insert: SqlInsert): RelDataType = {
+ typeFactory.asInstanceOf[JavaTypeFactory].toSql(sourceRowType)
+ }
+
+ override def getLogicalTargetRowType(
+ targetRowType: RelDataType,
+ insert: SqlInsert): RelDataType = {
+ typeFactory.asInstanceOf[JavaTypeFactory].toSql(targetRowType)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
new file mode 100644
index 0000000..5a1b3fe
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import java.util
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.adapter.java.JavaTypeFactory
+import org.apache.calcite.jdbc.CalciteSchema
+import org.apache.calcite.plan.RelOptTable.ViewExpander
+import org.apache.calcite.plan._
+import org.apache.calcite.prepare.CalciteCatalogReader
+import org.apache.calcite.rel.RelRoot
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.RexBuilder
+import org.apache.calcite.schema.SchemaPlus
+import org.apache.calcite.sql.parser.{SqlParser, SqlParseException}
+import org.apache.calcite.sql.validate.SqlValidator
+import org.apache.calcite.sql.{SqlNode, SqlOperatorTable}
+import org.apache.calcite.sql2rel.{RelDecorrelator, SqlToRelConverter, SqlRexConvertletTable}
+import org.apache.calcite.tools.{RelConversionException, ValidationException, Frameworks, FrameworkConfig}
+import org.apache.calcite.util.Util
+import scala.collection.JavaConversions._
+
+/** NOTE: this is heavily insipred by Calcite's PlannerImpl.
+ We need it in order to share the planner between the Table API relational plans
+ and the SQL relation plans that are created by the Calcite parser.
+ The only difference is that we initialize the RelOptPlanner planner
+ when instantiating, instead of creating a new one in the ready() method. **/
+class FlinkPlannerImpl(config: FrameworkConfig, var planner: RelOptPlanner) {
+
+ val operatorTable: SqlOperatorTable = config.getOperatorTable
+ /** Holds the trait definitions to be registered with planner. May be null. */
+ val traitDefs: ImmutableList[RelTraitDef[_ <: RelTrait]] = config.getTraitDefs
+ val parserConfig: SqlParser.Config = config.getParserConfig
+ val convertletTable: SqlRexConvertletTable = config.getConvertletTable
+ var defaultSchema: SchemaPlus = config.getDefaultSchema
+
+ var typeFactory: JavaTypeFactory = null
+ var validator: FlinkCalciteSqlValidator = null
+ var validatedSqlNode: SqlNode = null
+ var root: RelRoot = null
+
+ private def ready() {
+ Frameworks.withPlanner(new Frameworks.PlannerAction[Unit] {
+ def apply(
+ cluster: RelOptCluster,
+ relOptSchema: RelOptSchema,
+ rootSchema: SchemaPlus): Unit = {
+
+ Util.discard(rootSchema)
+ typeFactory = cluster.getTypeFactory.asInstanceOf[JavaTypeFactory]
+ if (planner == null) {
+ planner = cluster.getPlanner
+ }
+ }
+ }, config)
+ if (this.traitDefs != null) {
+ planner.clearRelTraitDefs()
+ for (traitDef <- this.traitDefs) {
+ planner.addRelTraitDef(traitDef)
+ }
+ }
+ }
+
+ @throws(classOf[SqlParseException])
+ def parse(sql: String): SqlNode = {
+ ready()
+ val parser: SqlParser = SqlParser.create(sql, parserConfig)
+ val sqlNode: SqlNode = parser.parseStmt
+ sqlNode
+ }
+
+ @throws(classOf[ValidationException])
+ def validate(sqlNode: SqlNode): SqlNode = {
+ validator = new FlinkCalciteSqlValidator(operatorTable, createCatalogReader, typeFactory)
+ validator.setIdentifierExpansion(true)
+ try {
+ validatedSqlNode = validator.validate(sqlNode)
+ }
+ catch {
+ case e: RuntimeException => {
+ throw new ValidationException(e)
+ }
+ }
+ validatedSqlNode
+ }
+
+ @throws(classOf[RelConversionException])
+ def rel(sql: SqlNode): RelRoot = {
+ assert(validatedSqlNode != null)
+ val rexBuilder: RexBuilder = createRexBuilder
+ val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder)
+ val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
+ new ViewExpanderImpl, validator, createCatalogReader, cluster, convertletTable)
+ sqlToRelConverter.setTrimUnusedFields(false)
+ sqlToRelConverter.enableTableAccessConversion(false)
+ root = sqlToRelConverter.convertQuery(validatedSqlNode, false, true)
+ root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
+ root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
+ root
+ }
+
+ /** Implements [[org.apache.calcite.plan.RelOptTable.ViewExpander]]
+ * interface for [[org.apache.calcite.tools.Planner]]. */
+ class ViewExpanderImpl extends ViewExpander {
+
+ override def expandView(
+ rowType: RelDataType,
+ queryString: String,
+ schemaPath: util.List[String]): RelRoot = {
+
+ val parser: SqlParser = SqlParser.create(queryString, parserConfig)
+ var sqlNode: SqlNode = null
+ try {
+ sqlNode = parser.parseQuery
+ }
+ catch {
+ case e: SqlParseException =>
+ throw new RuntimeException("parse failed", e)
+ }
+ val catalogReader: CalciteCatalogReader = createCatalogReader.withSchemaPath(schemaPath)
+ val validator: SqlValidator =
+ new FlinkCalciteSqlValidator(operatorTable, catalogReader, typeFactory)
+ validator.setIdentifierExpansion(true)
+ val validatedSqlNode: SqlNode = validator.validate(sqlNode)
+ val rexBuilder: RexBuilder = createRexBuilder
+ val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder)
+ val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
+ new ViewExpanderImpl, validator, catalogReader, cluster, convertletTable)
+ sqlToRelConverter.setTrimUnusedFields(false)
+ sqlToRelConverter.enableTableAccessConversion(false)
+ root = sqlToRelConverter.convertQuery(validatedSqlNode, true, false)
+ root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
+ root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
+ FlinkPlannerImpl.this.root
+ }
+ }
+
+ private def createCatalogReader: CalciteCatalogReader = {
+ val rootSchema: SchemaPlus = FlinkPlannerImpl.rootSchema(defaultSchema)
+ new CalciteCatalogReader(
+ CalciteSchema.from(rootSchema),
+ parserConfig.caseSensitive,
+ CalciteSchema.from(defaultSchema).path(null),
+ typeFactory)
+ }
+
+ private def createRexBuilder: RexBuilder = {
+ new RexBuilder(typeFactory)
+ }
+
+}
+
+object FlinkPlannerImpl {
+ private def rootSchema(schema: SchemaPlus): SchemaPlus = {
+ if (schema.getParentSchema == null) {
+ schema
+ }
+ else {
+ rootSchema(schema.getParentSchema)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
index 410c570..2a82dc3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
@@ -73,8 +73,6 @@ abstract class PlanTranslator {
*/
def createTable[A](repr: Representation[A], exprs: Array[Expression]): Table = {
- val inputType = repr.getType()
-
val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo(repr.getType(), exprs)
createTable(repr, fieldIndexes.toArray, fieldNames.toArray)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
index cd5e2b0..c7d9f18 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
@@ -20,19 +20,20 @@ package org.apache.flink.api.table.plan
import java.util.concurrent.atomic.AtomicInteger
import org.apache.calcite.config.Lex
-import org.apache.calcite.plan.ConventionTraitDef
+import org.apache.calcite.plan.RelOptPlanner
import org.apache.calcite.schema.impl.AbstractTable
import org.apache.calcite.schema.SchemaPlus
import org.apache.calcite.sql.parser.SqlParser
-import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RelBuilder}
+import org.apache.calcite.tools.{Programs, FrameworkConfig, Frameworks, RelBuilder}
import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.api.table.TableException
import org.apache.flink.api.table.expressions.{Naming, UnresolvedFieldReference, Expression}
import org.apache.flink.api.table.plan.cost.DataSetCostFactory
-import org.apache.flink.api.table.plan.schema.DataSetTable
import org.apache.flink.api.table.plan.schema.DataStreamTable
+import org.apache.flink.api.table.plan.rules.FlinkRuleSets
+import org.apache.flink.api.table.plan.schema.DataSetTable
object TranslationContext {
@@ -41,6 +42,7 @@ object TranslationContext {
private var tables: SchemaPlus = null
private var tablesRegistry: Map[String, AbstractTable] = null
private val nameCntr: AtomicInteger = new AtomicInteger(0)
+ private var relOptPlanner: RelOptPlanner = null
reset()
@@ -53,7 +55,10 @@ object TranslationContext {
// configure sql parser
// we use Java lex because back ticks are easier than double quotes in programming
// and cases are preserved
- val parserConfig = SqlParser.configBuilder().setLex(Lex.JAVA).build()
+ val parserConfig = SqlParser
+ .configBuilder()
+ .setLex(Lex.JAVA)
+ .build()
// initialize RelBuilder
frameworkConfig = Frameworks
@@ -61,13 +66,14 @@ object TranslationContext {
.defaultSchema(tables)
.parserConfig(parserConfig)
.costFactory(new DataSetCostFactory)
- .traitDefs(ConventionTraitDef.INSTANCE)
.build
tablesRegistry = Map[String, AbstractTable]()
relBuilder = RelBuilder.create(frameworkConfig)
+ // create a dummy RelNode, in order to retrieve the planner
+ val dummyRelNode = relBuilder.values(Array("dummy"), new Integer(1)).build()
+ relOptPlanner = dummyRelNode.getCluster.getPlanner
nameCntr.set(0)
-
}
/**
@@ -133,6 +139,10 @@ object TranslationContext {
relBuilder
}
+ def getPlanner: RelOptPlanner = {
+ relOptPlanner
+ }
+
def getFrameworkConfig: FrameworkConfig = {
frameworkConfig
}
@@ -207,5 +217,3 @@ object TranslationContext {
(fieldNames.toArray, fieldIndexes.toArray)
}
}
-
-
http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala
new file mode 100644
index 0000000..02d2159
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules
+
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.plan.{RelOptRuleCall, RelOptRule, RelOptRuleOperand}
+import org.apache.calcite.rel.logical.LogicalTableScan
+
+/**
+ * Rule that converts an EnumerableTableScan into a LogicalTableScan.
+ * We need this rule because Calcite creates an EnumerableTableScan
+ * when parsing a SQL query. We convert it into a LogicalTableScan
+ * so we can merge the optimization process with any plan that might be created
+ * by the Table API.
+ */
+class EnumerableToLogicalTableScan(
+ operand: RelOptRuleOperand,
+ description: String) extends RelOptRule(operand, description) {
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val oldRel = call.rel(0).asInstanceOf[EnumerableTableScan]
+ val table = oldRel.getTable
+ val newRel = LogicalTableScan.create(oldRel.getCluster, table)
+ call.transformTo(newRel)
+ }
+}
+
+object EnumerableToLogicalTableScan {
+ val INSTANCE = new EnumerableToLogicalTableScan(
+ operand(classOf[EnumerableTableScan], any),
+ "EnumerableToLogicalTableScan")
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index 0324a0e..427530b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -34,6 +34,7 @@ object FlinkRuleSets {
// convert a logical table scan to a relational expression
TableScanRule.INSTANCE,
+ EnumerableToLogicalTableScan.INSTANCE,
// push a filter into a join
FilterJoinRule.FILTER_ON_JOIN,
http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/AggregationsITCase.scala
new file mode 100644
index 0000000..d577564
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/AggregationsITCase.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.sql.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class AggregationsITCase(
+ mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsTestBase(mode, configMode) {
+
+ @Test
+ def testAggregationTypes(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT sum(_1), min(_1), max(_1), count(_1), avg(_1) FROM MyTable"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "231,1,21,21,11"
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testTableAggregation(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT sum(_1) FROM MyTable"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "231"
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testDataSetAggregation(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT sum(_1) FROM MyTable"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "231"
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testWorkingAggregationDataTypes(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT avg(_1), avg(_2), avg(_3), avg(_4), avg(_5), avg(_6), count(_7)" +
+ "FROM MyTable"
+
+ val ds = env.fromElements(
+ (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
+ (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao"))
+ tEnv.registerDataSet("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1,1,1,1.5,1.5,2"
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testTableWorkingAggregationDataTypes(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT avg(a), avg(b), avg(c), avg(d), avg(e), avg(f), count(g)" +
+ "FROM MyTable"
+
+ val ds = env.fromElements(
+ (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
+ (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).as('a, 'b, 'c, 'd, 'e, 'f, 'g)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1,1,1,1.5,1.5,2"
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testTableProjection(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT avg(a), sum(a), count(a), avg(b), sum(b) " +
+ "FROM MyTable"
+
+ val ds = env.fromElements((1: Byte, 1: Short), (2: Byte, 2: Short)).as('a, 'b)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,3,2,1,3"
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testTableAggregationWithArithmetic(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT avg(a + 2) + 2, count(b) + 5 " +
+ "FROM MyTable"
+
+ val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).as('a, 'b)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "5.5,7"
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testAggregationWithTwoCount(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT count(_1), count(_2) FROM MyTable"
+
+ val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "2,2"
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+
+ @Test
+ def testAggregationAfterProjection(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM " +
+ "(SELECT _1 as a, _2 as b, _3 as c FROM MyTable)"
+
+ val ds = env.fromElements(
+ (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
+ (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,3,2"
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala
new file mode 100644
index 0000000..c89e25a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.sql.test
+
+import org.apache.calcite.tools.ValidationException
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class FilterITCase(
+ mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsTestBase(mode, configMode) {
+
+ @Test
+ def testAllRejectingFilter(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE false"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "\n"
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testAllPassingFilter(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE true"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+ "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+ "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+ "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+ "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+ "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testFilterOnString(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE c LIKE '%world%'"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testFilterOnInteger(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)=0"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" +
+ "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
+ "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+ "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testDisjunctivePredicate(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE a < 2 OR a > 20"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1,Hi\n" + "21,6,Comment#15\n"
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testFilterWithAnd(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)<>0 AND MOD(b,2)=0"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" +
+ "9,4,Comment#3\n" + "17,6,Comment#11\n" +
+ "19,6,Comment#13\n" + "21,6,Comment#15\n"
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala
new file mode 100644
index 0000000..74844ae
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.sql.test
+
+import org.apache.calcite.tools.ValidationException
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.{TableException, Row}
+import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class JoinITCase(
+ mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsTestBase(mode, configMode) {
+
+ @Test
+ def testJoin(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable.as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testJoinWithFilter(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable.as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hi,Hallo\n"
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testJoinWithJoinFilter(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 AND h < b"
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hello world, how are you?,Hallo Welt wie\n" +
+ "I am fine.,Hallo Welt wie\n"
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testJoinWithMultipleKeys(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d AND b = h"
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
+ "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testJoinNonExistingKey(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE foo = e"
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ tEnv.sql(sqlQuery)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testJoinNonMatchingKeyTypes(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = g"
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testJoinWithAmbiguousFields(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d"
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'c)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ tEnv.sql(sqlQuery)
+ }
+
+ @Test
+ def testJoinWithAlias(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT Table5.c, Table3.c FROM Table3, Table5 WHERE a = d AND a < 4"
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'c)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+ val expected = "1,Hi\n" + "2,Hello\n" + "1,Hello\n" +
+ "2,Hello world\n" + "2,Hello world\n" + "3,Hello world\n"
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testJoinNoEqualityPredicate(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE d = f"
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ tEnv.sql(sqlQuery).toDataSet[Row](getConfig).collect()
+ }
+
+ @Test
+ def testDataSetJoinWithAggregation(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT COUNT(g), COUNT(b) FROM Table3, Table5 WHERE a = d"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env)
+ tEnv.registerDataSet("Table3", ds1, 'a, 'b, 'c)
+ tEnv.registerDataSet("Table5", ds2, 'd, 'e, 'f, 'g, 'h)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "6,6"
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testTableJoinWithAggregation(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT COUNT(b), COUNT(g) FROM Table3, Table5 WHERE a = d"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "6,6"
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/SelectITCase.scala
new file mode 100644
index 0000000..f08c95c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/SelectITCase.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.sql.test
+
+import org.apache.calcite.tools.ValidationException
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class SelectITCase(
+ mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsTestBase(mode, configMode) {
+
+ @Test
+ def testSelectStarFromTable(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT * FROM MyTable"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+ "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+ "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testSelectStarFromDataSet(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT * FROM MyTable"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+ "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+ "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testSimpleSelectAll(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT a, b, c FROM MyTable"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+ "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+ "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testSelectWithNaming(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT _1 as a, _2 as b FROM MyTable"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+ "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+ "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
+
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidFields(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT a, foo FROM MyTable"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ tEnv.sql(sqlQuery)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/TableWithSQLITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/TableWithSQLITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/TableWithSQLITCase.scala
new file mode 100644
index 0000000..153a9d0
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/TableWithSQLITCase.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.sql.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class TableWithSQLITCase(
+ mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsTestBase(mode, configMode) {
+
+ @Test
+ def testSQLTable(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE a > 9"
+
+ val result = tEnv.sql(sqlQuery).select('a.avg, 'b.sum, 'c.count)
+
+ val expected = "15,65,12"
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testTableSQLTable(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ val t1 = ds.filter('a > 9)
+
+ tEnv.registerTable("MyTable", t1)
+
+ val sqlQuery = "SELECT avg(a) as a1, sum(b) as b1, count(c) as c1 FROM MyTable"
+
+ val result = tEnv.sql(sqlQuery).select('a1 + 1, 'b1 - 5, 'c1)
+
+ val expected = "16,60,12"
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testMultipleSQLQueries(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", t)
+
+ val sqlQuery = "SELECT a as aa FROM MyTable WHERE b = 6"
+ val result1 = tEnv.sql(sqlQuery)
+ tEnv.registerTable("ResTable", result1)
+
+ val sqlQuery2 = "SELECT count(aa) FROM ResTable"
+ val result2 = tEnv.sql(sqlQuery2)
+
+ val expected = "6"
+ val results = result2.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/UnionITCase.scala
new file mode 100644
index 0000000..4a031a3
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/UnionITCase.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.sql.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class UnionITCase(
+ mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsTestBase(mode, configMode) {
+
+ @Test
+ def testUnion(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT c FROM t1 UNION ALL (SELECT c FROM t2)"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+ val ds2 = CollectionDataSets.getSmall3TupleDataSet(env)
+ tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+ tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n"
+ val results = result.toDataSet[Row](getConfig).collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ //TODO: activate for EFFICIENT mode
+ @Test
+ def testUnionWithFilter(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT c FROM (" +
+ "SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))" +
+ "WHERE b < 2"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env)
+ tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+ tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hi\n" + "Hallo\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ //TODO: activate for EFFICIENT mode
+ @Test
+ def testUnionWithAggregation(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT count(c) FROM (" +
+ "SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env)
+ tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+ tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "18"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
index 3582c33..946f584 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
@@ -22,7 +22,6 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.codegen.CodeGenException
import org.apache.flink.api.table.expressions.Literal
import org.apache.flink.api.table.test.utils.TableProgramsTestBase
import TableProgramsTestBase.TableConfigMode