You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/04/18 16:56:49 UTC

flink git commit: [FLINK-3587] Bump Calcite version to 1.7.0

Repository: flink
Updated Branches:
  refs/heads/master 7eb58773e -> 367687df1


[FLINK-3587] Bump Calcite version to 1.7.0

- Add DataSetValues and DataStreamValues due to changed Calcite RelNode generation.
- Pass TableEnvironment instead of TableConfig for DataSet and DataStream translation.
- Add methods to create new DataSources to BatchTableEnvironment and StreamTableEnvironment.
- Remove copied Calcite rule that got fixed.

This closes #1897


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/367687df
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/367687df
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/367687df

Branch: refs/heads/master
Commit: 367687df1e2107e07622d3ffb0fdf0466bce7ff1
Parents: 7eb5877
Author: Fabian Hueske <fh...@apache.org>
Authored: Tue Mar 22 18:51:46 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Apr 18 16:55:29 2016 +0200

----------------------------------------------------------------------
 flink-libraries/flink-table/pom.xml             |   2 +-
 .../api/java/table/BatchTableEnvironment.scala  |  17 ++-
 .../api/java/table/StreamTableEnvironment.scala |  17 ++-
 .../api/scala/table/BatchTableEnvironment.scala |  18 ++-
 .../scala/table/StreamTableEnvironment.scala    |  19 ++-
 .../flink/api/table/BatchTableEnvironment.scala |  15 ++-
 .../api/table/StreamTableEnvironment.scala      |  14 ++-
 .../flink/api/table/codegen/CodeGenerator.scala |  11 +-
 .../plan/nodes/dataset/DataSetAggregate.scala   |  12 +-
 .../table/plan/nodes/dataset/DataSetCalc.scala  |  17 +--
 .../table/plan/nodes/dataset/DataSetJoin.scala  |  16 +--
 .../table/plan/nodes/dataset/DataSetRel.scala   |  11 +-
 .../plan/nodes/dataset/DataSetSource.scala      |  13 +-
 .../table/plan/nodes/dataset/DataSetUnion.scala |  12 +-
 .../plan/nodes/dataset/DataSetValues.scala      | 118 ++++++++++++++++++
 .../plan/nodes/datastream/DataStreamCalc.scala  |   9 +-
 .../plan/nodes/datastream/DataStreamRel.scala   |   9 +-
 .../nodes/datastream/DataStreamSource.scala     |   6 +-
 .../plan/nodes/datastream/DataStreamUnion.scala |   9 +-
 .../nodes/datastream/DataStreamValues.scala     |  75 +++++++++++
 .../api/table/plan/rules/FlinkRuleSets.scala    |  22 ++--
 .../rules/dataSet/DataSetAggregateRule.scala    |   2 +-
 .../plan/rules/dataSet/DataSetCalcRule.scala    |   2 +-
 .../plan/rules/dataSet/DataSetJoinRule.scala    |   2 +-
 .../plan/rules/dataSet/DataSetScanRule.scala    |   2 +-
 .../plan/rules/dataSet/DataSetUnionRule.scala   |   2 +-
 .../plan/rules/dataSet/DataSetValuesRule.scala  |  50 ++++++++
 .../FlinkFilterAggregateTransposeRule.scala     | 123 -------------------
 .../rules/datastream/DataStreamValuesRule.scala |  50 ++++++++
 29 files changed, 476 insertions(+), 199 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml
index 088df98..bb841a3 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -111,7 +111,7 @@ under the License.
 		<dependency>
 			<groupId>org.apache.calcite</groupId>
 			<artifactId>calcite-core</artifactId>
-			<version>1.5.0</version>
+			<version>1.7.0</version>
 		</dependency>
 
 		<dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
index 69bff95..dd9033e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
@@ -17,11 +17,12 @@
  */
 package org.apache.flink.api.java.table
 
+import org.apache.flink.api.common.io.InputFormat
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.{ExecutionEnvironment, DataSet}
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.api.table.expressions.ExpressionParser
-import org.apache.flink.api.table.{TableConfig, Table}
+import org.apache.flink.api.table.{Row, TableConfig, Table}
 
 /**
   * The [[org.apache.flink.api.table.TableEnvironment]] for a Java batch [[DataSet]]
@@ -162,4 +163,18 @@ class BatchTableEnvironment(
     translate[T](table)(typeInfo)
   }
 
+  /**
+    * Creates a [[Row]] [[DataSet]] from an [[InputFormat]].
+    *
+    * @param inputFormat [[InputFormat]] from which the [[DataSet]] is created.
+    * @param typeInfo [[TypeInformation]] of the type of the [[DataSet]].
+    * @return A [[Row]] [[DataSet]] created from the [[InputFormat]].
+    */
+  override private[flink] def createDataSetSource(
+      inputFormat: InputFormat[Row, _],
+      typeInfo: TypeInformation[Row]): DataSet[Row] = {
+
+    execEnv.createInput(inputFormat, typeInfo)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
index 7479426..980e45b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
@@ -17,9 +17,10 @@
  */
 package org.apache.flink.api.java.table
 
+import org.apache.flink.api.common.io.InputFormat
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.api.table.{TableConfig, Table}
+import org.apache.flink.api.table.{Row, TableConfig, Table}
 import org.apache.flink.api.table.expressions.ExpressionParser
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
@@ -120,4 +121,18 @@ class StreamTableEnvironment(
     translate[T](table)(typeInfo)
   }
 
+  /**
+    * Creates a [[Row]] [[DataStream]] from an [[InputFormat]].
+    *
+    * @param inputFormat [[InputFormat]] from which the [[DataStream]] is created.
+    * @param typeInfo [[TypeInformation]] of the type of the [[DataStream]].
+    * @return A [[Row]] [[DataStream]] created from the [[InputFormat]].
+    */
+  override private[flink] def createDataStreamSource(
+      inputFormat: InputFormat[Row, _],
+      typeInfo: TypeInformation[Row]): DataStream[Row] = {
+
+    execEnv.createInput(inputFormat, typeInfo)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
index a18f338..24f23f2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
@@ -17,10 +17,12 @@
  */
 package org.apache.flink.api.scala.table
 
+import org.apache.flink.api.common.io.InputFormat
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
+import org.apache.flink.api.java.{DataSet => JavaSet}
 import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.{TableConfig, Table}
+import org.apache.flink.api.table.{Row, TableConfig, Table}
 
 import scala.reflect.ClassTag
 
@@ -139,4 +141,18 @@ class BatchTableEnvironment(
     wrap[T](translate(table))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]])
   }
 
+  /**
+    * Creates a [[Row]] [[JavaSet]] from an [[InputFormat]].
+    *
+    * @param inputFormat [[InputFormat]] from which the [[JavaSet]] is created.
+    * @param typeInfo [[TypeInformation]] of the type of the [[JavaSet]].
+    * @return A [[Row]] [[JavaSet]] created from the [[InputFormat]].
+    */
+  override private[flink] def createDataSetSource(
+      inputFormat: InputFormat[Row, _],
+      typeInfo: TypeInformation[Row]): JavaSet[Row] = {
+
+    execEnv.createInput(inputFormat).javaSet
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
index 15ef55e..48de953 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
@@ -17,9 +17,12 @@
  */
 package org.apache.flink.api.scala.table
 
+import org.apache.flink.api.common.io.InputFormat
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.{TableConfig, Table}
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.{Row, TableConfig, Table}
 import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, DataStream}
 
 import org.apache.flink.streaming.api.scala.asScalaStream
@@ -99,4 +102,18 @@ class StreamTableEnvironment(
     asScalaStream(translate(table))
   }
 
+  /**
+    * Creates a [[Row]] [[JavaStream]] from an [[InputFormat]].
+    *
+    * @param inputFormat [[InputFormat]] from which the [[JavaStream]] is created.
+    * @param typeInfo [[TypeInformation]] of the type of the [[JavaStream]].
+    * @return A [[Row]] [[JavaStream]] created from the [[InputFormat]].
+    */
+  override private[flink] def createDataStreamSource(
+      inputFormat: InputFormat[Row, _],
+      typeInfo: TypeInformation[Row]): JavaStream[Row] = {
+
+    execEnv.createInput(inputFormat)(typeInfo).javaStream
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
index b6aa229..ade3b49 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
@@ -24,6 +24,7 @@ import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.Programs
+import org.apache.flink.api.common.io.InputFormat
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.java.io.DiscardingOutputFormat
@@ -34,6 +35,7 @@ import org.apache.flink.api.table.plan.PlanGenException
 import org.apache.flink.api.table.plan.nodes.dataset.{DataSetRel, DataSetConvention}
 import org.apache.flink.api.table.plan.rules.FlinkRuleSets
 import org.apache.flink.api.table.plan.schema.DataSetTable
+import org.apache.flink.streaming.api.datastream.DataStream
 
 /**
   * The abstract base class for batch TableEnvironments.
@@ -226,11 +228,22 @@ abstract class BatchTableEnvironment(config: TableConfig) extends TableEnvironme
     dataSetPlan match {
       case node: DataSetRel =>
         node.translateToPlan(
-          config,
+          this,
           Some(tpe.asInstanceOf[TypeInformation[Any]])
         ).asInstanceOf[DataSet[A]]
       case _ => ???
     }
   }
 
+  /**
+    * Creates a [[Row]] [[DataSet]] from an [[InputFormat]].
+    *
+    * @param inputFormat [[InputFormat]] from which the [[DataSet]] is created.
+    * @param typeInfo [[TypeInformation]] of the type of the [[DataSet]].
+    * @return A [[Row]] [[DataSet]] created from the [[InputFormat]].
+    */
+  private[flink] def createDataSetSource(
+      inputFormat: InputFormat[Row, _],
+      typeInfo: TypeInformation[Row]): DataSet[Row]
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
index 7644e6d..8724b5a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
@@ -24,6 +24,7 @@ import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.Programs
+import org.apache.flink.api.common.io.InputFormat
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.api.table.plan.PlanGenException
@@ -185,7 +186,7 @@ abstract class StreamTableEnvironment(config: TableConfig) extends TableEnvironm
     dataStreamPlan match {
       case node: DataStreamRel =>
         node.translateToPlan(
-          config,
+          this,
           Some(tpe.asInstanceOf[TypeInformation[Any]])
         ).asInstanceOf[DataStream[A]]
       case _ => ???
@@ -193,4 +194,15 @@ abstract class StreamTableEnvironment(config: TableConfig) extends TableEnvironm
 
   }
 
+  /**
+    * Creates a [[Row]] [[DataStream]] from an [[InputFormat]].
+    *
+    * @param inputFormat [[InputFormat]] from which the [[DataStream]] is created.
+    * @param typeInfo [[TypeInformation]] of the type of the [[DataStream]].
+    * @return A [[Row]] [[DataStream]] created from the [[InputFormat]].
+    */
+  private[flink] def createDataStreamSource(
+      inputFormat: InputFormat[Row, _],
+      typeInfo: TypeInformation[Row]): DataStream[Row]
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
index 7b3b02a..d41674c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
@@ -580,7 +580,14 @@ class CodeGenerator(
       case VARCHAR | CHAR =>
         generateNonNullLiteral(resultType, "\"" + value.toString + "\"")
       case SYMBOL =>
-        val symbolOrdinal = value.asInstanceOf[SqlLiteral.SqlSymbol].ordinal()
+
+        val symbolOrdinal =
+        if (classOf[Enum[_]].isAssignableFrom(value.getClass) ) {
+          value.asInstanceOf[Enum[_]].ordinal()
+        } else {
+          value.asInstanceOf[SqlLiteral.SqlSymbol].ordinal()
+        }
+
         generateNonNullLiteral(resultType, symbolOrdinal.toString)
       case _ => ??? // TODO more types
     }
@@ -747,6 +754,8 @@ class CodeGenerator(
 
   override def visitOver(over: RexOver): GeneratedExpression = ???
 
+  override def visitSubQuery(subQuery: RexSubQuery): GeneratedExpression = ???
+
   // ----------------------------------------------------------------------------------------------
   // generator helping methods
   // ----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
index 6e42130..12095a2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -31,7 +31,7 @@ import org.apache.flink.api.table.runtime.MapRunner
 import org.apache.flink.api.table.runtime.aggregate.AggregateUtil
 import org.apache.flink.api.table.runtime.aggregate.AggregateUtil.CalcitePair
 import org.apache.flink.api.table.typeutils.{TypeConverter, RowTypeInfo}
-import org.apache.flink.api.table.{Row, TableConfig}
+import org.apache.flink.api.table.{BatchTableEnvironment, Row, TableConfig}
 
 import scala.collection.JavaConverters._
 
@@ -76,26 +76,28 @@ class DataSetAggregate(
       .item("select", aggregationToString)
   }
 
-  override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
+  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
 
     val child = this.getInput
-    val rowCnt = RelMetadataQuery.getRowCount(child)
+    val rowCnt = metadata.getRowCount(child)
     val rowSize = this.estimateRowSize(child.getRowType)
     val aggCnt = this.namedAggregates.size
     planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize)
   }
 
   override def translateToPlan(
-      config: TableConfig,
+      tableEnv: BatchTableEnvironment,
       expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
 
+    val config = tableEnv.getConfig
+
     val groupingKeys = grouping.indices.toArray
     // add grouping fields, position keys in the input, and input type
     val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates,
       inputType, rowType, grouping, config)
 
     val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(
-      config,
+      tableEnv,
       // tell the input operator that this operator currently only supports Rows as input
       Some(TypeConverter.DEFAULT_ROW_TYPE))
 

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
index 67daffc..13bb39d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
@@ -29,7 +29,7 @@ import org.apache.flink.api.table.codegen.CodeGenerator
 import org.apache.flink.api.table.plan.nodes.FlinkCalc
 import org.apache.flink.api.table.typeutils.TypeConverter
 import TypeConverter._
-import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.BatchTableEnvironment
 import org.apache.calcite.rex._
 
 /**
@@ -69,17 +69,17 @@ class DataSetCalc(
         calcProgram.getCondition != null)
   }
 
-  override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
+  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
 
     val child = this.getInput
-    val rowCnt = RelMetadataQuery.getRowCount(child)
+    val rowCnt = metadata.getRowCount(child)
     val exprCnt = calcProgram.getExprCount
     planner.getCostFactory.makeCost(rowCnt, rowCnt * exprCnt, 0)
   }
 
-  override def getRows: Double = {
+  override def estimateRowCount(metadata: RelMetadataQuery): Double = {
     val child = this.getInput
-    val rowCnt = RelMetadataQuery.getRowCount(child)
+    val rowCnt = metadata.getRowCount(child)
 
     if (calcProgram.getCondition != null) {
       // we reduce the result card to push filters down
@@ -89,10 +89,13 @@ class DataSetCalc(
     }
   }
 
-  override def translateToPlan(config: TableConfig,
+  override def translateToPlan(
+      tableEnv: BatchTableEnvironment,
       expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
 
-    val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config)
+    val config = tableEnv.getConfig
+
+    val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
     val returnType = determineReturnType(
       getRowType,

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
index de54897..61e8995 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
@@ -21,7 +21,6 @@ package org.apache.flink.api.table.plan.nodes.dataset
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.JoinInfo
-import org.apache.calcite.rel.logical.LogicalJoin
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelWriter, BiRel, RelNode}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
@@ -31,10 +30,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.java.operators.join.JoinType
 import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.plan.PlanGenException
 import org.apache.flink.api.table.runtime.FlatJoinRunner
 import org.apache.flink.api.table.typeutils.TypeConverter
-import org.apache.flink.api.table.{TableException, TableConfig}
+import org.apache.flink.api.table.{BatchTableEnvironment, TableException}
 import org.apache.flink.api.common.functions.FlatJoinFunction
 import TypeConverter.determineReturnType
 import scala.collection.mutable.ArrayBuffer
@@ -92,7 +90,7 @@ class DataSetJoin(
       .item("join", joinSelectionToString)
   }
 
-  override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
+  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
 
     if (!translatable) {
       // join cannot be translated. Make huge costs
@@ -101,7 +99,7 @@ class DataSetJoin(
       // join can be translated. Compute cost estimate
       val children = this.getInputs
       children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, child) =>
-        val rowCnt = RelMetadataQuery.getRowCount(child)
+        val rowCnt = metadata.getRowCount(child)
         val rowSize = this.estimateRowSize(child.getRowType)
         cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize))
       }
@@ -110,9 +108,11 @@ class DataSetJoin(
   }
 
   override def translateToPlan(
-      config: TableConfig,
+      tableEnv: BatchTableEnvironment,
       expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
 
+    val config = tableEnv.getConfig
+
     val returnType = determineReturnType(
       getRowType,
       expectedType,
@@ -156,8 +156,8 @@ class DataSetJoin(
       })
     }
 
-    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(config)
-    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(config)
+    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
     val generator = new CodeGenerator(config, leftDataSet.getType, Some(rightDataSet.getType))
     val conversion = generator.generateConverterResultExpression(

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
index eaf6b26..e8f81fd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
@@ -23,7 +23,7 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.{BatchTableEnvironment, TableEnvironment, TableConfig}
 import org.apache.flink.api.table.plan.nodes.FlinkRel
 
 import scala.collection.JavaConversions._
@@ -31,9 +31,9 @@ import scala.collection.JavaConversions._
 trait DataSetRel extends RelNode with FlinkRel {
 
   /**
-    * Translates the FlinkRelNode into a Flink operator.
+    * Translates the [[DataSetRel]] node into a [[DataSet]] operator.
     *
-    * @param config runtime configuration
+    * @param tableEnv [[org.apache.flink.api.table.BatchTableEnvironment]] of the translated Table.
     * @param expectedType specifies the type the Flink operator should return. The type must
     *                     have the same arity as the result. For instance, if the
     *                     expected type is a RowTypeInfo this method will return a DataSet of
@@ -42,9 +42,8 @@ trait DataSetRel extends RelNode with FlinkRel {
     * @return DataSet of type expectedType or RowTypeInfo
     */
   def translateToPlan(
-      config: TableConfig,
-      expectedType: Option[TypeInformation[Any]] = None)
-    : DataSet[Any]
+     tableEnv: BatchTableEnvironment,
+     expectedType: Option[TypeInformation[Any]] = None) : DataSet[Any]
 
   private[flink] def estimateRowSize(rowType: RelDataType): Double = {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
index 6f94251..01d71b7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.java.typeutils.PojoTypeInfo
-import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.BatchTableEnvironment
 import org.apache.flink.api.table.codegen.CodeGenerator
 import org.apache.flink.api.table.typeutils.TypeConverter
 import TypeConverter.determineReturnType
@@ -67,16 +67,17 @@ class DataSetSource(
     s"Source(from: (${rowType.getFieldNames.asScala.toList.mkString(", ")}))"
   }
 
-  override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
+  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
 
-    val rowCnt = RelMetadataQuery.getRowCount(this)
+    val rowCnt = metadata.getRowCount(this)
     planner.getCostFactory.makeCost(rowCnt, rowCnt, 0)
   }
 
   override def translateToPlan(
-      config: TableConfig,
-      expectedType: Option[TypeInformation[Any]])
-    : DataSet[Any] = {
+      tableEnv: BatchTableEnvironment,
+      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+    val config = tableEnv.getConfig
 
     val inputDataSet: DataSet[Any] = dataSetTable.dataSet
     val inputType = inputDataSet.getType

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
index e4e0a14..b6f6a19 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
@@ -24,7 +24,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelWriter, BiRel, RelNode}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig}
 
 import scala.collection.JavaConverters._
 import scala.collection.JavaConversions._
@@ -62,22 +62,22 @@ class DataSetUnion(
     super.explainTerms(pw).item("union", unionSelectionToString)
   }
 
-  override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
+  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
 
     val children = this.getInputs
     val rowCnt = children.foldLeft(0D) { (rows, child) =>
-      rows + RelMetadataQuery.getRowCount(child)
+      rows + metadata.getRowCount(child)
     }
 
     planner.getCostFactory.makeCost(rowCnt, 0, 0)
   }
 
   override def translateToPlan(
-      config: TableConfig,
+      tableEnv: BatchTableEnvironment,
       expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
 
-    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(config)
-    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(config)
+    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
     leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
new file mode 100644
index 0000000..9dfd346
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.{RelTraitSet, RelOptCluster}
+import org.apache.calcite.rel.{RelWriter, RelNode}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Values
+import org.apache.calcite.rex.RexLiteral
+import org.apache.flink.api.common.io.{GenericInputFormat, NonParallelInput}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.scala._
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.typeutils.TypeConverter._
+import org.apache.flink.api.table.{BatchTableEnvironment, Row, TableConfig}
+
+import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
+
+/**
+  * DataSet RelNode for a LogicalValues.
+  *
+  */
+class DataSetValues(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    rowType: RelDataType,
+    tuples: ImmutableList[ImmutableList[RexLiteral]])
+  extends Values(cluster, rowType, tuples, traitSet)
+  with DataSetRel {
+
+  override def deriveRowType() = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataSetValues(
+      cluster,
+      traitSet,
+      rowType,
+      tuples
+    )
+  }
+
+  override def toString: String = {
+    "Values(values: (${rowType.getFieldNames.asScala.toList.mkString(\", \")}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw).item("values", valuesFieldsToString)
+  }
+
+  override def translateToPlan(
+      tableEnv: BatchTableEnvironment,
+      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+    val config = tableEnv.getConfig
+
+    val returnType = determineReturnType(
+      getRowType,
+      expectedType,
+      config.getNullCheck,
+      config.getEfficientTypeUsage).asInstanceOf[RowTypeInfo]
+
+    val inputFormat = new ValuesInputFormat(tuples)
+
+    tableEnv.createDataSetSource(inputFormat, returnType).asInstanceOf[DataSet[Any]]
+  }
+
+  private def valuesFieldsToString: String = {
+    rowType.getFieldNames.asScala.toList.mkString(", ")
+  }
+
+}
+
+class ValuesInputFormat(val tuples: ImmutableList[ImmutableList[RexLiteral]])
+  extends GenericInputFormat[Row]
+  with NonParallelInput {
+
+  var readIdx = 0
+
+  override def reachedEnd(): Boolean = readIdx == tuples.size()
+
+  override def nextRecord(reuse: Row): Row = {
+
+    if (readIdx == tuples.size()) {
+      return null
+    }
+
+    val t = tuples.get(readIdx)
+    readIdx += 1
+
+    var i = 0
+    for(f <- t) {
+      reuse.setField(i, f.getValue)
+      i += 1
+    }
+    reuse
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala
index fb058f3..6dfcd03 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -23,7 +23,7 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.calcite.rex.RexProgram
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.StreamTableEnvironment
 import org.apache.flink.api.table.codegen.CodeGenerator
 import org.apache.flink.api.table.plan.nodes.FlinkCalc
 import org.apache.flink.api.table.typeutils.TypeConverter._
@@ -68,10 +68,13 @@ class DataStreamCalc(
         calcProgram.getCondition != null)
   }
 
-  override def translateToPlan(config: TableConfig,
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
       expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
 
-    val inputDataStream = input.asInstanceOf[DataStreamRel].translateToPlan(config)
+    val config = tableEnv.getConfig
+
+    val inputDataStream = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
 
     val returnType = determineReturnType(
       getRowType,

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala
index 0673a35..6cf13a5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.table.plan.nodes.datastream
 
 import org.apache.calcite.rel.RelNode
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.{StreamTableEnvironment, TableConfig}
 import org.apache.flink.api.table.plan.nodes.FlinkRel
 import org.apache.flink.streaming.api.datastream.DataStream
 
@@ -29,7 +29,7 @@ trait DataStreamRel extends RelNode with FlinkRel {
   /**
     * Translates the FlinkRelNode into a Flink operator.
     *
-    * @param config runtime configuration
+    * @param tableEnv The [[StreamTableEnvironment]] of the translated Table.
     * @param expectedType specifies the type the Flink operator should return. The type must
     *                     have the same arity as the result. For instance, if the
     *                     expected type is a RowTypeInfo this method will return a DataSet of
@@ -38,9 +38,8 @@ trait DataStreamRel extends RelNode with FlinkRel {
     * @return DataStream of type expectedType or RowTypeInfo
     */
   def translateToPlan(
-      config: TableConfig,
-      expectedType: Option[TypeInformation[Any]] = None)
-    : DataStream[Any]
+    tableEnv: StreamTableEnvironment,
+    expectedType: Option[TypeInformation[Any]] = None) : DataStream[Any]
 
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamSource.scala
index 314759c..9fc4d5e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamSource.scala
@@ -25,7 +25,7 @@ import org.apache.calcite.rel.core.TableScan
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.PojoTypeInfo
-import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.StreamTableEnvironment
 import org.apache.flink.api.table.codegen.CodeGenerator
 import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
 import org.apache.flink.api.table.plan.schema.DataStreamTable
@@ -60,10 +60,12 @@ class DataStreamSource(
   }
 
   override def translateToPlan(
-      config: TableConfig,
+      tableEnv: StreamTableEnvironment,
       expectedType: Option[TypeInformation[Any]])
     : DataStream[Any] = {
 
+    val config = tableEnv.getConfig
+
     val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
     val inputType = inputDataStream.getType
 

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala
index 8c9cca0..e72e9a8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala
@@ -22,7 +22,7 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{RelNode, RelWriter, BiRel}
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.StreamTableEnvironment
 import org.apache.flink.streaming.api.datastream.DataStream
 
 import scala.collection.JavaConverters._
@@ -60,11 +60,12 @@ class DataStreamUnion(
     "Union(union: (${rowType.getFieldNames.asScala.toList.mkString(\", \")}))"
   }
 
-  override def translateToPlan(config: TableConfig,
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
       expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
 
-    val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(config)
-    val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(config)
+    val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+    val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
     leftDataSet.union(rightDataSet)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
new file mode 100644
index 0000000..3e64e64
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Values
+import org.apache.calcite.rex.RexLiteral
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.StreamTableEnvironment
+import org.apache.flink.api.table.plan.nodes.dataset.ValuesInputFormat
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.typeutils.TypeConverter._
+import org.apache.flink.streaming.api.datastream.DataStream
+
+
+/**
+  * DataStream RelNode for LogicalValues.
+  */
+class DataStreamValues(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    rowType: RelDataType,
+    tuples: ImmutableList[ImmutableList[RexLiteral]])
+  extends Values(cluster, rowType, tuples, traitSet)
+  with DataStreamRel {
+
+  override def deriveRowType() = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataStreamValues(
+      cluster,
+      traitSet,
+      rowType,
+      tuples
+    )
+  }
+
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      expectedType: Option[TypeInformation[Any]]) : DataStream[Any] = {
+
+    val config = tableEnv.getConfig
+
+    val returnType = determineReturnType(
+      getRowType,
+      expectedType,
+      config.getNullCheck,
+      config.getEfficientTypeUsage).asInstanceOf[RowTypeInfo]
+
+    val inputFormat = new ValuesInputFormat(tuples)
+
+    tableEnv.createDataStreamSource(inputFormat, returnType).asInstanceOf[DataStream[Any]]
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index 427530b..e9bcaa2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -21,9 +21,7 @@ package org.apache.flink.api.table.plan.rules
 import org.apache.calcite.rel.rules._
 import org.apache.calcite.tools.{RuleSets, RuleSet}
 import org.apache.flink.api.table.plan.rules.dataSet._
-import org.apache.flink.api.table.plan.rules.datastream.DataStreamCalcRule
-import org.apache.flink.api.table.plan.rules.datastream.DataStreamScanRule
-import org.apache.flink.api.table.plan.rules.datastream.DataStreamUnionRule
+import org.apache.flink.api.table.plan.rules.datastream._
 
 object FlinkRuleSets {
 
@@ -41,7 +39,7 @@ object FlinkRuleSets {
     // push filter into the children of a join
     FilterJoinRule.JOIN,
     // push filter through an aggregation
-    FlinkFilterAggregateTransposeRule.INSTANCE,
+    FilterAggregateTransposeRule.INSTANCE,
 
     // aggregation and projection rules
     AggregateProjectMergeRule.INSTANCE,
@@ -100,7 +98,8 @@ object FlinkRuleSets {
     DataSetCalcRule.INSTANCE,
     DataSetJoinRule.INSTANCE,
     DataSetScanRule.INSTANCE,
-    DataSetUnionRule.INSTANCE
+    DataSetUnionRule.INSTANCE,
+    DataSetValuesRule.INSTANCE
   )
 
   /**
@@ -108,11 +107,6 @@ object FlinkRuleSets {
   */
   val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(
 
-    // translate to DataStream nodes
-    DataStreamCalcRule.INSTANCE,
-    DataStreamScanRule.INSTANCE,
-    DataStreamUnionRule.INSTANCE,
-
     // calc rules
     FilterToCalcRule.INSTANCE,
     ProjectToCalcRule.INSTANCE,
@@ -134,7 +128,13 @@ object FlinkRuleSets {
     ProjectRemoveRule.INSTANCE,
 
     // merge and push unions rules
-    UnionEliminatorRule.INSTANCE
+    UnionEliminatorRule.INSTANCE,
+
+    // translate to DataStream nodes
+    DataStreamCalcRule.INSTANCE,
+    DataStreamScanRule.INSTANCE,
+    DataStreamUnionRule.INSTANCE,
+    DataStreamValuesRule.INSTANCE
   )
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
index e6a6993..0449fc3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
@@ -30,7 +30,7 @@ class DataSetAggregateRule
       classOf[LogicalAggregate],
       Convention.NONE,
       DataSetConvention.INSTANCE,
-      "FlinkAggregateRule")
+      "DataSetAggregateRule")
   {
 
     def convert(rel: RelNode): RelNode = {

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala
index 3821024..88e74a9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala
@@ -29,7 +29,7 @@ class DataSetCalcRule
       classOf[LogicalCalc],
       Convention.NONE,
       DataSetConvention.INSTANCE,
-      "FlinkCalcRule")
+      "DataSetCalcRule")
   {
 
     def convert(rel: RelNode): RelNode = {

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala
index 89d33c9..55100d2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala
@@ -32,7 +32,7 @@ class DataSetJoinRule
       classOf[LogicalJoin],
       Convention.NONE,
       DataSetConvention.INSTANCE,
-      "FlinkJoinRule")
+      "DataSetJoinRule")
   {
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala
index 3cdaca3..f95f6ea 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala
@@ -31,7 +31,7 @@ class DataSetScanRule
       classOf[LogicalTableScan],
       Convention.NONE,
       DataSetConvention.INSTANCE,
-      "FlinkScanRule")
+      "DataSetScanRule")
   {
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala
index cd1de1e..7809d6d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala
@@ -29,7 +29,7 @@ class DataSetUnionRule
       classOf[LogicalUnion],
       Convention.NONE,
       DataSetConvention.INSTANCE,
-      "FlinkUnionRule")
+      "DataSetUnionRule")
   {
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala
new file mode 100644
index 0000000..c28b458
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{RelOptRule, RelTraitSet, Convention}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalValues
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetValues, DataSetConvention}
+
+class DataSetValuesRule
+  extends ConverterRule(
+    classOf[LogicalValues],
+    Convention.NONE,
+    DataSetConvention.INSTANCE,
+    "DataSetValuesRule")
+{
+
+  def convert(rel: RelNode): RelNode = {
+
+    val values: LogicalValues = rel.asInstanceOf[LogicalValues]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+
+    new DataSetValues(
+      rel.getCluster,
+      traitSet,
+      rel.getRowType,
+      values.getTuples)
+  }
+}
+
+object DataSetValuesRule {
+  val INSTANCE: RelOptRule = new DataSetValuesRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/FlinkFilterAggregateTransposeRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/FlinkFilterAggregateTransposeRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/FlinkFilterAggregateTransposeRule.scala
deleted file mode 100644
index c27fc75..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/FlinkFilterAggregateTransposeRule.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.RelOptRule
-import org.apache.calcite.plan.RelOptRuleCall
-import org.apache.calcite.plan.RelOptUtil
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.core.Aggregate
-import org.apache.calcite.rel.core.Filter
-import org.apache.calcite.rel.core.RelFactories
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.tools.RelBuilder
-import org.apache.calcite.tools.RelBuilderFactory
-import org.apache.calcite.util.ImmutableBitSet
-import com.google.common.collect.ImmutableList
-
-import scala.collection.JavaConversions._
-
-/**
-  * This rule is a (fixed) copy of the FilterAggregateTransposeRule of Apache Calcite.
-  *
-  * A fix for this rule is contained Calcite's master branch.
-  * This custom rule can be removed once Calcite 1.7 is released and our dependency adjusted.
-  *
-  * Planner rule that pushes a `org.apache.calcite.rel.core.Filter`
-  * past a `org.apache.calcite.rel.core.Aggregate`.
-  *
-  * @see org.apache.calcite.rel.rules.AggregateFilterTransposeRule
-  */
-class FlinkFilterAggregateTransposeRule(
-    filterClass: Class[_ <: Filter],
-    builderFactory: RelBuilderFactory,
-    aggregateClass: Class[_ <: Aggregate])
-  extends RelOptRule(
-    RelOptRule.operand(filterClass, RelOptRule.operand(aggregateClass, RelOptRule.any)),
-    builderFactory,
-    null)
-{
-
-  def onMatch(call: RelOptRuleCall) {
-    val filterRel: Filter = call.rel(0)
-    val aggRel: Aggregate = call.rel(1)
-    val conditions = RelOptUtil.conjunctions(filterRel.getCondition).toList
-    val rexBuilder = filterRel.getCluster.getRexBuilder
-    val origFields = aggRel.getRowType.getFieldList.toList
-
-    // Fixed computation of adjustments
-    val adjustments = aggRel.getGroupSet.asList().zipWithIndex.map {
-      case (g, i) => g - i
-    }.toArray
-
-    var pushedConditions: List[RexNode] = Nil
-    var remainingConditions: List[RexNode] = Nil
-
-    for (condition <- conditions) {
-      val rCols: ImmutableBitSet = RelOptUtil.InputFinder.bits(condition)
-      if (canPush(aggRel, rCols)) {
-        pushedConditions = condition.accept(
-            new RelOptUtil.RexInputConverter(
-              rexBuilder,
-              origFields,
-              aggRel.getInput(0).getRowType.getFieldList,
-              adjustments)) :: pushedConditions
-      }
-      else {
-        remainingConditions = condition :: remainingConditions
-      }
-    }
-    val builder: RelBuilder = call.builder
-    var rel: RelNode = builder.push(aggRel.getInput).filter(pushedConditions).build
-    if (rel eq aggRel.getInput(0)) {
-      return
-    }
-    rel = aggRel.copy(aggRel.getTraitSet, ImmutableList.of(rel))
-    rel = builder.push(rel).filter(remainingConditions).build
-    call.transformTo(rel)
-  }
-
-  private def canPush(aggregate: Aggregate, rCols: ImmutableBitSet): Boolean = {
-    val groupKeys: ImmutableBitSet = ImmutableBitSet.range(0, aggregate.getGroupSet.cardinality)
-    if (!groupKeys.contains(rCols)) {
-      return false
-    }
-    if (aggregate.indicator) {
-      import scala.collection.JavaConversions._
-      for (groupingSet <- aggregate.getGroupSets) {
-        if (!groupingSet.contains(rCols)) {
-          return false
-        }
-      }
-    }
-    true
-  }
-}
-
-object FlinkFilterAggregateTransposeRule {
-  /** The default instance of `FilterAggregateTransposeRule`.
-    *
-    * It matches any kind of agg. or filter
-    */
-  val INSTANCE: FlinkFilterAggregateTransposeRule =
-    new FlinkFilterAggregateTransposeRule(
-      classOf[Filter],
-      RelFactories.LOGICAL_BUILDER,
-      classOf[Aggregate])
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala
new file mode 100644
index 0000000..fa2b428
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalValues
+import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamValues, DataStreamConvention}
+
+class DataStreamValuesRule
+  extends ConverterRule(
+    classOf[LogicalValues],
+    Convention.NONE,
+    DataStreamConvention.INSTANCE,
+    "DataStreamValuesRule")
+{
+
+  def convert(rel: RelNode): RelNode = {
+
+    val values: LogicalValues = rel.asInstanceOf[LogicalValues]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+
+    new DataStreamValues(
+      rel.getCluster,
+      traitSet,
+      rel.getRowType,
+      values.getTuples)
+  }
+}
+
+object DataStreamValuesRule {
+  val INSTANCE: RelOptRule = new DataStreamValuesRule
+}