You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2016/04/11 13:05:47 UTC

flink git commit: [FLINK-3640] Add support for SQL in DataSet programs

Repository: flink
Updated Branches:
  refs/heads/master b368cb2d5 -> ed1e52a10


[FLINK-3640] Add support for SQL in DataSet programs

- add EnumerableToLogicalScan rule
- in order to be able to mix TableAPI and SQL, we need our own copy of PlannerImpl
- create a dummy RelNode in the reset() method, in order to retrieve the RelOptPlanner

This closes #1862


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed1e52a1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed1e52a1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed1e52a1

Branch: refs/heads/master
Commit: ed1e52a106e9ce3789fab975e34715de2d923bec
Parents: b368cb2
Author: vasia <va...@apache.org>
Authored: Wed Mar 23 19:35:06 2016 +0100
Committer: vasia <va...@apache.org>
Committed: Mon Apr 11 12:27:35 2016 +0200

----------------------------------------------------------------------
 .../api/java/table/JavaBatchTranslator.scala    |  30 ++-
 .../api/table/AbstractTableEnvironment.scala    |  15 ++
 .../api/table/FlinkCalciteSqlValidator.scala    |  47 ++++
 .../flink/api/table/FlinkPlannerImpl.scala      | 180 +++++++++++++
 .../flink/api/table/plan/PlanTranslator.scala   |   2 -
 .../api/table/plan/TranslationContext.scala     |  24 +-
 .../rules/EnumerableToLogicalTableScan.scala    |  50 ++++
 .../api/table/plan/rules/FlinkRuleSets.scala    |   1 +
 .../api/scala/sql/test/AggregationsITCase.scala | 224 ++++++++++++++++
 .../flink/api/scala/sql/test/FilterITCase.scala | 166 ++++++++++++
 .../flink/api/scala/sql/test/JoinITCase.scala   | 259 +++++++++++++++++++
 .../flink/api/scala/sql/test/SelectITCase.scala | 155 +++++++++++
 .../api/scala/sql/test/TableWithSQLITCase.scala | 103 ++++++++
 .../flink/api/scala/sql/test/UnionITCase.scala  | 109 ++++++++
 .../api/scala/table/test/FilterITCase.scala     |   1 -
 15 files changed, 1348 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
index 028711b..4688c82 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
@@ -19,14 +19,14 @@
 package org.apache.flink.api.java.table
 
 import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
-import org.apache.calcite.plan.{RelTraitSet, RelOptUtil}
-import org.apache.calcite.rel.{RelCollations, RelNode}
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.RelNode
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.Programs
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.{DataSet => JavaDataSet}
 import org.apache.flink.api.table.plan._
-import org.apache.flink.api.table.{TableConfig, Table}
+import org.apache.flink.api.table.{FlinkPlannerImpl, TableConfig, Table}
 import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetRel}
 import org.apache.flink.api.table.plan.rules.FlinkRuleSets
 import org.apache.flink.api.table.plan.schema.DataSetTable
@@ -56,7 +56,8 @@ class JavaBatchTranslator(config: TableConfig) extends PlanTranslator {
 
     // create table scan operator
     relBuilder.scan(tabName)
-    new Table(relBuilder.build(), relBuilder)
+    val relNode = relBuilder.build()
+    new Table(relNode, relBuilder)
   }
 
   override def translate[A](lPlan: RelNode)(implicit tpe: TypeInformation[A]): JavaDataSet[A] = {
@@ -69,9 +70,7 @@ class JavaBatchTranslator(config: TableConfig) extends PlanTranslator {
 
     // optimize the logical Flink plan
     val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)
-    val flinkOutputProps = RelTraitSet.createEmpty()
-      .plus(DataSetConvention.INSTANCE)
-      .plus(RelCollations.of()).simplify()
+    val flinkOutputProps = lPlan.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
 
     val dataSetPlan = try {
       optProgram.run(planner, decorPlan, flinkOutputProps)
@@ -97,4 +96,21 @@ class JavaBatchTranslator(config: TableConfig) extends PlanTranslator {
 
   }
 
+  /**
+   * Parse, validate, and translate a SQL query into a relNode Table
+   */
+  def translateSQL(query: String): Table = {
+
+    val frameworkConfig = TranslationContext.getFrameworkConfig
+    val planner = new FlinkPlannerImpl(frameworkConfig, TranslationContext.getPlanner)
+    // parse the sql query
+    val parsed = planner.parse(query)
+    // validate the sql query
+    val validated = planner.validate(parsed)
+    // transform to a relational tree
+    val relational = planner.rel(validated)
+
+    new Table(relational.rel, TranslationContext.getRelBuilder)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/AbstractTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/AbstractTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/AbstractTableEnvironment.scala
index 4dedc47..a0f162e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/AbstractTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/AbstractTableEnvironment.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.api.table
 
 import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.table.JavaBatchTranslator
 import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.api.table.plan.TranslationContext
 import org.apache.flink.api.table.plan.schema.{DataSetTable, TableTable}
@@ -83,4 +84,18 @@ class AbstractTableEnvironment {
     )
     TranslationContext.registerTable(dataSetTable, name)
   }
+
+  /**
+   * Execute a SQL query and retrieve the result as a [[Table]].
+   * All input [[Table]]s have to be registered in the
+   * [[org.apache.flink.api.java.table.TableEnvironment]] with unique names,
+   * using [[registerTable()]] or
+   * [[org.apache.flink.api.java.table.TableEnvironment.registerDataSet()]]
+   *
+   * @param query the SQL query
+   * @return the result of the SQL query as a [[Table]]
+   */
+  def sql(query: String): Table = {
+    new JavaBatchTranslator(config).translateSQL(query)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkCalciteSqlValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkCalciteSqlValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkCalciteSqlValidator.scala
new file mode 100644
index 0000000..b1ccc09
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkCalciteSqlValidator.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import org.apache.calcite.adapter.java.JavaTypeFactory
+import org.apache.calcite.prepare.CalciteCatalogReader
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql.{SqlInsert, SqlOperatorTable}
+import org.apache.calcite.sql.validate.{SqlValidatorImpl, SqlConformance}
+
+/**
+ * This is a copy of Calcite's CalciteSqlValidator to use with [[FlinkPlannerImpl]].
+ */
+class FlinkCalciteSqlValidator(
+    opTab: SqlOperatorTable,
+    catalogReader: CalciteCatalogReader,
+    typeFactory: JavaTypeFactory) extends SqlValidatorImpl(
+        opTab, catalogReader, typeFactory, SqlConformance.DEFAULT) {
+
+  override def getLogicalSourceRowType(
+      sourceRowType: RelDataType,
+      insert: SqlInsert): RelDataType = {
+    typeFactory.asInstanceOf[JavaTypeFactory].toSql(sourceRowType)
+  }
+
+  override def getLogicalTargetRowType(
+      targetRowType: RelDataType,
+      insert: SqlInsert): RelDataType = {
+    typeFactory.asInstanceOf[JavaTypeFactory].toSql(targetRowType)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
new file mode 100644
index 0000000..5a1b3fe
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import java.util
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.adapter.java.JavaTypeFactory
+import org.apache.calcite.jdbc.CalciteSchema
+import org.apache.calcite.plan.RelOptTable.ViewExpander
+import org.apache.calcite.plan._
+import org.apache.calcite.prepare.CalciteCatalogReader
+import org.apache.calcite.rel.RelRoot
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.RexBuilder
+import org.apache.calcite.schema.SchemaPlus
+import org.apache.calcite.sql.parser.{SqlParser, SqlParseException}
+import org.apache.calcite.sql.validate.SqlValidator
+import org.apache.calcite.sql.{SqlNode, SqlOperatorTable}
+import org.apache.calcite.sql2rel.{RelDecorrelator, SqlToRelConverter, SqlRexConvertletTable}
+import org.apache.calcite.tools.{RelConversionException, ValidationException, Frameworks, FrameworkConfig}
+import org.apache.calcite.util.Util
+import scala.collection.JavaConversions._
+
+/** NOTE: this is heavily insipred by Calcite's PlannerImpl.
+  We need it in order to share the planner between the Table API relational plans
+  and the SQL relation plans that are created by the Calcite parser.
+  The only difference is that we initialize the RelOptPlanner planner
+  when instantiating, instead of creating a new one in the ready() method. **/
+class FlinkPlannerImpl(config: FrameworkConfig, var planner: RelOptPlanner) {
+
+  val operatorTable: SqlOperatorTable = config.getOperatorTable
+  /** Holds the trait definitions to be registered with planner. May be null. */
+  val traitDefs: ImmutableList[RelTraitDef[_ <: RelTrait]] = config.getTraitDefs
+  val parserConfig: SqlParser.Config = config.getParserConfig
+  val convertletTable: SqlRexConvertletTable = config.getConvertletTable
+  var defaultSchema: SchemaPlus = config.getDefaultSchema
+
+  var typeFactory: JavaTypeFactory = null
+  var validator: FlinkCalciteSqlValidator = null
+  var validatedSqlNode: SqlNode = null
+  var root: RelRoot = null
+
+  private def ready() {
+    Frameworks.withPlanner(new Frameworks.PlannerAction[Unit] {
+      def apply(
+          cluster: RelOptCluster,
+          relOptSchema: RelOptSchema,
+          rootSchema: SchemaPlus): Unit = {
+
+        Util.discard(rootSchema)
+        typeFactory = cluster.getTypeFactory.asInstanceOf[JavaTypeFactory]
+        if (planner == null) {
+          planner = cluster.getPlanner
+        }
+      }
+    }, config)
+    if (this.traitDefs != null) {
+      planner.clearRelTraitDefs()
+      for (traitDef <- this.traitDefs) {
+        planner.addRelTraitDef(traitDef)
+      }
+    }
+  }
+
+  @throws(classOf[SqlParseException])
+  def parse(sql: String): SqlNode = {
+    ready()
+    val parser: SqlParser = SqlParser.create(sql, parserConfig)
+    val sqlNode: SqlNode = parser.parseStmt
+    sqlNode
+  }
+
+  @throws(classOf[ValidationException])
+  def validate(sqlNode: SqlNode): SqlNode = {
+    validator = new FlinkCalciteSqlValidator(operatorTable, createCatalogReader, typeFactory)
+    validator.setIdentifierExpansion(true)
+    try {
+      validatedSqlNode = validator.validate(sqlNode)
+    }
+    catch {
+      case e: RuntimeException => {
+        throw new ValidationException(e)
+      }
+    }
+    validatedSqlNode
+  }
+
+  @throws(classOf[RelConversionException])
+  def rel(sql: SqlNode): RelRoot = {
+    assert(validatedSqlNode != null)
+    val rexBuilder: RexBuilder = createRexBuilder
+    val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder)
+    val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
+      new ViewExpanderImpl, validator, createCatalogReader, cluster, convertletTable)
+    sqlToRelConverter.setTrimUnusedFields(false)
+    sqlToRelConverter.enableTableAccessConversion(false)
+    root = sqlToRelConverter.convertQuery(validatedSqlNode, false, true)
+    root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
+    root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
+    root
+  }
+
+  /** Implements [[org.apache.calcite.plan.RelOptTable.ViewExpander]]
+    * interface for [[org.apache.calcite.tools.Planner]]. */
+  class ViewExpanderImpl extends ViewExpander {
+
+    override def expandView(
+        rowType: RelDataType,
+        queryString: String,
+        schemaPath: util.List[String]): RelRoot = {
+
+      val parser: SqlParser = SqlParser.create(queryString, parserConfig)
+      var sqlNode: SqlNode = null
+      try {
+        sqlNode = parser.parseQuery
+      }
+      catch {
+        case e: SqlParseException =>
+          throw new RuntimeException("parse failed", e)
+      }
+      val catalogReader: CalciteCatalogReader = createCatalogReader.withSchemaPath(schemaPath)
+      val validator: SqlValidator =
+        new FlinkCalciteSqlValidator(operatorTable, catalogReader, typeFactory)
+      validator.setIdentifierExpansion(true)
+      val validatedSqlNode: SqlNode = validator.validate(sqlNode)
+      val rexBuilder: RexBuilder = createRexBuilder
+      val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder)
+      val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
+        new ViewExpanderImpl, validator, catalogReader, cluster, convertletTable)
+      sqlToRelConverter.setTrimUnusedFields(false)
+      sqlToRelConverter.enableTableAccessConversion(false)
+      root = sqlToRelConverter.convertQuery(validatedSqlNode, true, false)
+      root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
+      root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
+      FlinkPlannerImpl.this.root
+    }
+  }
+
+  private def createCatalogReader: CalciteCatalogReader = {
+    val rootSchema: SchemaPlus = FlinkPlannerImpl.rootSchema(defaultSchema)
+    new CalciteCatalogReader(
+      CalciteSchema.from(rootSchema),
+      parserConfig.caseSensitive,
+      CalciteSchema.from(defaultSchema).path(null),
+      typeFactory)
+  }
+
+  private def createRexBuilder: RexBuilder = {
+    new RexBuilder(typeFactory)
+  }
+
+}
+
+object FlinkPlannerImpl {
+  private def rootSchema(schema: SchemaPlus): SchemaPlus = {
+    if (schema.getParentSchema == null) {
+      schema
+    }
+    else {
+      rootSchema(schema.getParentSchema)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
index 410c570..2a82dc3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
@@ -73,8 +73,6 @@ abstract class PlanTranslator {
     */
   def createTable[A](repr: Representation[A], exprs: Array[Expression]): Table = {
 
-    val inputType = repr.getType()
-
     val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo(repr.getType(), exprs)
     createTable(repr, fieldIndexes.toArray, fieldNames.toArray)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
index cd5e2b0..c7d9f18 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
@@ -20,19 +20,20 @@ package org.apache.flink.api.table.plan
 
 import java.util.concurrent.atomic.AtomicInteger
 import org.apache.calcite.config.Lex
-import org.apache.calcite.plan.ConventionTraitDef
+import org.apache.calcite.plan.RelOptPlanner
 import org.apache.calcite.schema.impl.AbstractTable
 import org.apache.calcite.schema.SchemaPlus
 import org.apache.calcite.sql.parser.SqlParser
-import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RelBuilder}
+import org.apache.calcite.tools.{Programs, FrameworkConfig, Frameworks, RelBuilder}
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.api.table.TableException
 import org.apache.flink.api.table.expressions.{Naming, UnresolvedFieldReference, Expression}
 import org.apache.flink.api.table.plan.cost.DataSetCostFactory
-import org.apache.flink.api.table.plan.schema.DataSetTable
 import org.apache.flink.api.table.plan.schema.DataStreamTable
+import org.apache.flink.api.table.plan.rules.FlinkRuleSets
+import org.apache.flink.api.table.plan.schema.DataSetTable
 
 object TranslationContext {
 
@@ -41,6 +42,7 @@ object TranslationContext {
   private var tables: SchemaPlus = null
   private var tablesRegistry: Map[String, AbstractTable] = null
   private val nameCntr: AtomicInteger = new AtomicInteger(0)
+  private var relOptPlanner: RelOptPlanner = null
 
   reset()
 
@@ -53,7 +55,10 @@ object TranslationContext {
     // configure sql parser
     // we use Java lex because back ticks are easier than double quotes in programming
     // and cases are preserved
-    val parserConfig = SqlParser.configBuilder().setLex(Lex.JAVA).build()
+    val parserConfig = SqlParser
+      .configBuilder()
+      .setLex(Lex.JAVA)
+      .build()
 
     // initialize RelBuilder
     frameworkConfig = Frameworks
@@ -61,13 +66,14 @@ object TranslationContext {
       .defaultSchema(tables)
       .parserConfig(parserConfig)
       .costFactory(new DataSetCostFactory)
-      .traitDefs(ConventionTraitDef.INSTANCE)
       .build
 
     tablesRegistry = Map[String, AbstractTable]()
     relBuilder = RelBuilder.create(frameworkConfig)
+    // create a dummy RelNode, in order to retrieve the planner
+    val dummyRelNode = relBuilder.values(Array("dummy"), new Integer(1)).build()
+    relOptPlanner = dummyRelNode.getCluster.getPlanner
     nameCntr.set(0)
-
   }
 
   /**
@@ -133,6 +139,10 @@ object TranslationContext {
     relBuilder
   }
 
+  def getPlanner: RelOptPlanner = {
+    relOptPlanner
+  }
+
   def getFrameworkConfig: FrameworkConfig = {
     frameworkConfig
   }
@@ -207,5 +217,3 @@ object TranslationContext {
     (fieldNames.toArray, fieldIndexes.toArray)
   }
 }
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala
new file mode 100644
index 0000000..02d2159
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules
+
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.plan.{RelOptRuleCall, RelOptRule, RelOptRuleOperand}
+import org.apache.calcite.rel.logical.LogicalTableScan
+
+/**
+ * Rule that converts an EnumerableTableScan into a LogicalTableScan.
+ * We need this rule because Calcite creates an EnumerableTableScan
+ * when parsing a SQL query. We convert it into a LogicalTableScan
+ * so we can merge the optimization process with any plan that might be created
+ * by the Table API.
+ */
+class EnumerableToLogicalTableScan(
+    operand: RelOptRuleOperand,
+    description: String) extends RelOptRule(operand, description) {
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val oldRel = call.rel(0).asInstanceOf[EnumerableTableScan]
+    val table = oldRel.getTable
+    val newRel = LogicalTableScan.create(oldRel.getCluster, table)
+    call.transformTo(newRel)
+  }
+}
+
+object EnumerableToLogicalTableScan {
+  val INSTANCE = new EnumerableToLogicalTableScan(
+      operand(classOf[EnumerableTableScan], any),
+    "EnumerableToLogicalTableScan")
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index 0324a0e..427530b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -34,6 +34,7 @@ object FlinkRuleSets {
 
     // convert a logical table scan to a relational expression
     TableScanRule.INSTANCE,
+    EnumerableToLogicalTableScan.INSTANCE,
 
     // push a filter into a join
     FilterJoinRule.FILTER_ON_JOIN,

http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/AggregationsITCase.scala
new file mode 100644
index 0000000..d577564
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/AggregationsITCase.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.sql.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class AggregationsITCase(
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  @Test
+  def testAggregationTypes(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT sum(_1), min(_1), max(_1), count(_1), avg(_1) FROM MyTable"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    tEnv.registerDataSet("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "231,1,21,21,11"
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testTableAggregation(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT sum(_1) FROM MyTable"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "231"
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testDataSetAggregation(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT sum(_1) FROM MyTable"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    tEnv.registerDataSet("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "231"
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testWorkingAggregationDataTypes(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT avg(_1), avg(_2), avg(_3), avg(_4), avg(_5), avg(_6), count(_7)" +
+      "FROM MyTable"
+
+    val ds = env.fromElements(
+      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
+      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao"))
+    tEnv.registerDataSet("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "1,1,1,1,1.5,1.5,2"
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testTableWorkingAggregationDataTypes(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT avg(a), avg(b), avg(c), avg(d), avg(e), avg(f), count(g)" +
+      "FROM MyTable"
+
+    val ds = env.fromElements(
+      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
+      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).as('a, 'b, 'c, 'd, 'e, 'f, 'g)
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "1,1,1,1,1.5,1.5,2"
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testTableProjection(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT avg(a), sum(a), count(a), avg(b), sum(b) " +
+      "FROM MyTable"
+
+    val ds = env.fromElements((1: Byte, 1: Short), (2: Byte, 2: Short)).as('a, 'b)
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "1,3,2,1,3"
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testTableAggregationWithArithmetic(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT avg(a + 2) + 2, count(b) + 5 " +
+      "FROM MyTable"
+
+    val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).as('a, 'b)
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "5.5,7"
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testAggregationWithTwoCount(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT count(_1), count(_2) FROM MyTable"
+
+    val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "2,2"
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+
+  @Test
+  def testAggregationAfterProjection(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM " +
+      "(SELECT _1 as a, _2 as b, _3 as c FROM MyTable)"
+
+    val ds = env.fromElements(
+      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
+      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "1,3,2"
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala
new file mode 100644
index 0000000..c89e25a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.sql.test
+
+import org.apache.calcite.tools.ValidationException
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class FilterITCase(
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  @Test
+  def testAllRejectingFilter(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT * FROM MyTable WHERE false"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "\n"
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testAllPassingFilter(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT * FROM MyTable WHERE true"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testFilterOnString(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT * FROM MyTable WHERE c LIKE '%world%'"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testFilterOnInteger(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)=0"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" +
+      "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
+      "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+      "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testDisjunctivePredicate(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT * FROM MyTable WHERE a < 2 OR a > 20"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "1,1,Hi\n" + "21,6,Comment#15\n"
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testFilterWithAnd(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)<>0 AND MOD(b,2)=0"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" +
+      "9,4,Comment#3\n" + "17,6,Comment#11\n" +
+      "19,6,Comment#13\n" + "21,6,Comment#15\n"
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala
new file mode 100644
index 0000000..74844ae
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.sql.test
+
+import org.apache.calcite.tools.ValidationException
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.{TableException, Row}
+import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class JoinITCase(
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  @Test
+  def testJoin(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e"
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable.as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+    tEnv.registerTable("Table3", ds1)
+    tEnv.registerTable("Table5", ds2)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testJoinWithFilter(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2"
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable.as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+    tEnv.registerTable("Table3", ds1)
+    tEnv.registerTable("Table5", ds2)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "Hi,Hallo\n"
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testJoinWithJoinFilter(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 AND h < b"
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+    tEnv.registerTable("Table3", ds1)
+    tEnv.registerTable("Table5", ds2)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "Hello world, how are you?,Hallo Welt wie\n" +
+      "I am fine.,Hallo Welt wie\n"
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testJoinWithMultipleKeys(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d AND b = h"
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+    tEnv.registerTable("Table3", ds1)
+    tEnv.registerTable("Table5", ds2)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
+      "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testJoinNonExistingKey(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE foo = e"
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+    tEnv.registerTable("Table3", ds1)
+    tEnv.registerTable("Table5", ds2)
+
+    tEnv.sql(sqlQuery)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testJoinNonMatchingKeyTypes(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = g"
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+    tEnv.registerTable("Table3", ds1)
+    tEnv.registerTable("Table5", ds2)
+
+    tEnv.sql(sqlQuery).toDataSet[Row].collect()
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testJoinWithAmbiguousFields(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d"
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'c)
+    tEnv.registerTable("Table3", ds1)
+    tEnv.registerTable("Table5", ds2)
+
+    tEnv.sql(sqlQuery)
+  }
+
+  @Test
+  def testJoinWithAlias(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT Table5.c, Table3.c FROM Table3, Table5 WHERE a = d AND a < 4"
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'c)
+    tEnv.registerTable("Table3", ds1)
+    tEnv.registerTable("Table5", ds2)
+
+    val result = tEnv.sql(sqlQuery)
+    val expected = "1,Hi\n" + "2,Hello\n" + "1,Hello\n" +
+      "2,Hello world\n" + "2,Hello world\n" + "3,Hello world\n"
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testJoinNoEqualityPredicate(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE d = f"
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+    tEnv.registerTable("Table3", ds1)
+    tEnv.registerTable("Table5", ds2)
+
+    tEnv.sql(sqlQuery).toDataSet[Row](getConfig).collect()
+  }
+
+  @Test
+  def testDataSetJoinWithAggregation(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT COUNT(g), COUNT(b) FROM Table3, Table5 WHERE a = d"
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env)
+    tEnv.registerDataSet("Table3", ds1, 'a, 'b, 'c)
+    tEnv.registerDataSet("Table5", ds2, 'd, 'e, 'f, 'g, 'h)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "6,6"
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testTableJoinWithAggregation(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT COUNT(b), COUNT(g) FROM Table3, Table5 WHERE a = d"
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+    tEnv.registerTable("Table3", ds1)
+    tEnv.registerTable("Table5", ds2)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "6,6"
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/SelectITCase.scala
new file mode 100644
index 0000000..f08c95c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/SelectITCase.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.sql.test
+
+import org.apache.calcite.tools.ValidationException
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class SelectITCase(
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  @Test
+  def testSelectStarFromTable(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT * FROM MyTable"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testSelectStarFromDataSet(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT * FROM MyTable"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testSimpleSelectAll(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT a, b, c FROM MyTable"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testSelectWithNaming(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT _1 as a, _2 as b FROM MyTable"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+      "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+      "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
+
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidFields(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT a, foo FROM MyTable"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", ds)
+
+    tEnv.sql(sqlQuery)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/TableWithSQLITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/TableWithSQLITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/TableWithSQLITCase.scala
new file mode 100644
index 0000000..153a9d0
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/TableWithSQLITCase.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.sql.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class TableWithSQLITCase(
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  @Test
+  def testSQLTable(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT * FROM MyTable WHERE a > 9"
+
+    val result = tEnv.sql(sqlQuery).select('a.avg, 'b.sum, 'c.count)
+
+    val expected = "15,65,12"
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testTableSQLTable(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val t1 = ds.filter('a > 9)
+
+    tEnv.registerTable("MyTable", t1)
+
+    val sqlQuery = "SELECT avg(a) as a1, sum(b) as b1, count(c) as c1 FROM MyTable"
+
+    val result = tEnv.sql(sqlQuery).select('a1 + 1, 'b1 - 5, 'c1)
+
+    val expected = "16,60,12"
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testMultipleSQLQueries(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", t)
+
+    val sqlQuery = "SELECT a as aa FROM MyTable WHERE b = 6"
+    val result1 = tEnv.sql(sqlQuery)
+    tEnv.registerTable("ResTable", result1)
+
+    val sqlQuery2 = "SELECT count(aa) FROM ResTable"
+    val result2 = tEnv.sql(sqlQuery2)
+
+    val expected = "6"
+    val results = result2.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/UnionITCase.scala
new file mode 100644
index 0000000..4a031a3
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/UnionITCase.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.sql.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class UnionITCase(
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  @Test
+  def testUnion(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT c FROM t1 UNION ALL (SELECT c FROM t2)"
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env)
+    tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+    tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n"
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  //TODO: activate for EFFICIENT mode
+  @Test
+  def testUnionWithFilter(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT c FROM (" +
+      "SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))" +
+      "WHERE b < 2"
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env)
+    tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+    tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "Hi\n" + "Hallo\n"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  //TODO: activate for EFFICIENT mode
+  @Test
+  def testUnionWithAggregation(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT count(c) FROM (" +
+      "SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))"
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env)
+    tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+    tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "18"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
index 3582c33..946f584 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
@@ -22,7 +22,6 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.codegen.CodeGenException
 import org.apache.flink.api.table.expressions.Literal
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase
 import TableProgramsTestBase.TableConfigMode