You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/04/18 16:56:49 UTC
flink git commit: [FLINK-3587] Bump Calcite version to 1.7.0
Repository: flink
Updated Branches:
refs/heads/master 7eb58773e -> 367687df1
[FLINK-3587] Bump Calcite version to 1.7.0
- Add DataSetValues and DataStreamValues due to changed Calcite RelNode generation.
- Pass TableEnvironment instead of TableConfig for DataSet and DataStream translation.
- Add methods to create new DataSources to BatchTableEnvironment and StreamTableEnvironment.
- Remove copied Calcite rule that got fixed.
This closes #1897
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/367687df
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/367687df
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/367687df
Branch: refs/heads/master
Commit: 367687df1e2107e07622d3ffb0fdf0466bce7ff1
Parents: 7eb5877
Author: Fabian Hueske <fh...@apache.org>
Authored: Tue Mar 22 18:51:46 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Apr 18 16:55:29 2016 +0200
----------------------------------------------------------------------
flink-libraries/flink-table/pom.xml | 2 +-
.../api/java/table/BatchTableEnvironment.scala | 17 ++-
.../api/java/table/StreamTableEnvironment.scala | 17 ++-
.../api/scala/table/BatchTableEnvironment.scala | 18 ++-
.../scala/table/StreamTableEnvironment.scala | 19 ++-
.../flink/api/table/BatchTableEnvironment.scala | 15 ++-
.../api/table/StreamTableEnvironment.scala | 14 ++-
.../flink/api/table/codegen/CodeGenerator.scala | 11 +-
.../plan/nodes/dataset/DataSetAggregate.scala | 12 +-
.../table/plan/nodes/dataset/DataSetCalc.scala | 17 +--
.../table/plan/nodes/dataset/DataSetJoin.scala | 16 +--
.../table/plan/nodes/dataset/DataSetRel.scala | 11 +-
.../plan/nodes/dataset/DataSetSource.scala | 13 +-
.../table/plan/nodes/dataset/DataSetUnion.scala | 12 +-
.../plan/nodes/dataset/DataSetValues.scala | 118 ++++++++++++++++++
.../plan/nodes/datastream/DataStreamCalc.scala | 9 +-
.../plan/nodes/datastream/DataStreamRel.scala | 9 +-
.../nodes/datastream/DataStreamSource.scala | 6 +-
.../plan/nodes/datastream/DataStreamUnion.scala | 9 +-
.../nodes/datastream/DataStreamValues.scala | 75 +++++++++++
.../api/table/plan/rules/FlinkRuleSets.scala | 22 ++--
.../rules/dataSet/DataSetAggregateRule.scala | 2 +-
.../plan/rules/dataSet/DataSetCalcRule.scala | 2 +-
.../plan/rules/dataSet/DataSetJoinRule.scala | 2 +-
.../plan/rules/dataSet/DataSetScanRule.scala | 2 +-
.../plan/rules/dataSet/DataSetUnionRule.scala | 2 +-
.../plan/rules/dataSet/DataSetValuesRule.scala | 50 ++++++++
.../FlinkFilterAggregateTransposeRule.scala | 123 -------------------
.../rules/datastream/DataStreamValuesRule.scala | 50 ++++++++
29 files changed, 476 insertions(+), 199 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml
index 088df98..bb841a3 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -111,7 +111,7 @@ under the License.
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
- <version>1.5.0</version>
+ <version>1.7.0</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
index 69bff95..dd9033e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
@@ -17,11 +17,12 @@
*/
package org.apache.flink.api.java.table
+import org.apache.flink.api.common.io.InputFormat
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.{ExecutionEnvironment, DataSet}
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.table.expressions.ExpressionParser
-import org.apache.flink.api.table.{TableConfig, Table}
+import org.apache.flink.api.table.{Row, TableConfig, Table}
/**
* The [[org.apache.flink.api.table.TableEnvironment]] for a Java batch [[DataSet]]
@@ -162,4 +163,18 @@ class BatchTableEnvironment(
translate[T](table)(typeInfo)
}
+ /**
+ * Creates a [[Row]] [[DataSet]] from an [[InputFormat]].
+ *
+ * @param inputFormat [[InputFormat]] from which the [[DataSet]] is created.
+ * @param typeInfo [[TypeInformation]] of the type of the [[DataSet]].
+ * @return A [[Row]] [[DataSet]] created from the [[InputFormat]].
+ */
+ override private[flink] def createDataSetSource(
+ inputFormat: InputFormat[Row, _],
+ typeInfo: TypeInformation[Row]): DataSet[Row] = {
+
+ execEnv.createInput(inputFormat, typeInfo)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
index 7479426..980e45b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
@@ -17,9 +17,10 @@
*/
package org.apache.flink.api.java.table
+import org.apache.flink.api.common.io.InputFormat
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.api.table.{TableConfig, Table}
+import org.apache.flink.api.table.{Row, TableConfig, Table}
import org.apache.flink.api.table.expressions.ExpressionParser
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
@@ -120,4 +121,18 @@ class StreamTableEnvironment(
translate[T](table)(typeInfo)
}
+ /**
+ * Creates a [[Row]] [[DataStream]] from an [[InputFormat]].
+ *
+ * @param inputFormat [[InputFormat]] from which the [[DataStream]] is created.
+ * @param typeInfo [[TypeInformation]] of the type of the [[DataStream]].
+ * @return A [[Row]] [[DataStream]] created from the [[InputFormat]].
+ */
+ override private[flink] def createDataStreamSource(
+ inputFormat: InputFormat[Row, _],
+ typeInfo: TypeInformation[Row]): DataStream[Row] = {
+
+ execEnv.createInput(inputFormat, typeInfo)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
index a18f338..24f23f2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
@@ -17,10 +17,12 @@
*/
package org.apache.flink.api.scala.table
+import org.apache.flink.api.common.io.InputFormat
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
+import org.apache.flink.api.java.{DataSet => JavaSet}
import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.{TableConfig, Table}
+import org.apache.flink.api.table.{Row, TableConfig, Table}
import scala.reflect.ClassTag
@@ -139,4 +141,18 @@ class BatchTableEnvironment(
wrap[T](translate(table))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]])
}
+ /**
+ * Creates a [[Row]] [[JavaSet]] from an [[InputFormat]].
+ *
+ * @param inputFormat [[InputFormat]] from which the [[JavaSet]] is created.
+ * @param typeInfo [[TypeInformation]] of the type of the [[JavaSet]].
+ * @return A [[Row]] [[JavaSet]] created from the [[InputFormat]].
+ */
+ override private[flink] def createDataSetSource(
+ inputFormat: InputFormat[Row, _],
+ typeInfo: TypeInformation[Row]): JavaSet[Row] = {
+
+ execEnv.createInput(inputFormat).javaSet
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
index 15ef55e..48de953 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
@@ -17,9 +17,12 @@
*/
package org.apache.flink.api.scala.table
+import org.apache.flink.api.common.io.InputFormat
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.{TableConfig, Table}
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.{Row, TableConfig, Table}
import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, DataStream}
import org.apache.flink.streaming.api.scala.asScalaStream
@@ -99,4 +102,18 @@ class StreamTableEnvironment(
asScalaStream(translate(table))
}
+ /**
+ * Creates a [[Row]] [[JavaStream]] from an [[InputFormat]].
+ *
+ * @param inputFormat [[InputFormat]] from which the [[JavaStream]] is created.
+ * @param typeInfo [[TypeInformation]] of the type of the [[JavaStream]].
+ * @return A [[Row]] [[JavaStream]] created from the [[InputFormat]].
+ */
+ override private[flink] def createDataStreamSource(
+ inputFormat: InputFormat[Row, _],
+ typeInfo: TypeInformation[Row]): JavaStream[Row] = {
+
+ execEnv.createInput(inputFormat)(typeInfo).javaStream
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
index b6aa229..ade3b49 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
@@ -24,6 +24,7 @@ import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.Programs
+import org.apache.flink.api.common.io.InputFormat
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.java.io.DiscardingOutputFormat
@@ -34,6 +35,7 @@ import org.apache.flink.api.table.plan.PlanGenException
import org.apache.flink.api.table.plan.nodes.dataset.{DataSetRel, DataSetConvention}
import org.apache.flink.api.table.plan.rules.FlinkRuleSets
import org.apache.flink.api.table.plan.schema.DataSetTable
+import org.apache.flink.streaming.api.datastream.DataStream
/**
* The abstract base class for batch TableEnvironments.
@@ -226,11 +228,22 @@ abstract class BatchTableEnvironment(config: TableConfig) extends TableEnvironme
dataSetPlan match {
case node: DataSetRel =>
node.translateToPlan(
- config,
+ this,
Some(tpe.asInstanceOf[TypeInformation[Any]])
).asInstanceOf[DataSet[A]]
case _ => ???
}
}
+ /**
+ * Creates a [[Row]] [[DataSet]] from an [[InputFormat]].
+ *
+ * @param inputFormat [[InputFormat]] from which the [[DataSet]] is created.
+ * @param typeInfo [[TypeInformation]] of the type of the [[DataSet]].
+ * @return A [[Row]] [[DataSet]] created from the [[InputFormat]].
+ */
+ private[flink] def createDataSetSource(
+ inputFormat: InputFormat[Row, _],
+ typeInfo: TypeInformation[Row]): DataSet[Row]
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
index 7644e6d..8724b5a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
@@ -24,6 +24,7 @@ import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.Programs
+import org.apache.flink.api.common.io.InputFormat
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.table.plan.PlanGenException
@@ -185,7 +186,7 @@ abstract class StreamTableEnvironment(config: TableConfig) extends TableEnvironm
dataStreamPlan match {
case node: DataStreamRel =>
node.translateToPlan(
- config,
+ this,
Some(tpe.asInstanceOf[TypeInformation[Any]])
).asInstanceOf[DataStream[A]]
case _ => ???
@@ -193,4 +194,15 @@ abstract class StreamTableEnvironment(config: TableConfig) extends TableEnvironm
}
+ /**
+ * Creates a [[Row]] [[DataStream]] from an [[InputFormat]].
+ *
+ * @param inputFormat [[InputFormat]] from which the [[DataStream]] is created.
+ * @param typeInfo [[TypeInformation]] of the type of the [[DataStream]].
+ * @return A [[Row]] [[DataStream]] created from the [[InputFormat]].
+ */
+ private[flink] def createDataStreamSource(
+ inputFormat: InputFormat[Row, _],
+ typeInfo: TypeInformation[Row]): DataStream[Row]
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
index 7b3b02a..d41674c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
@@ -580,7 +580,14 @@ class CodeGenerator(
case VARCHAR | CHAR =>
generateNonNullLiteral(resultType, "\"" + value.toString + "\"")
case SYMBOL =>
- val symbolOrdinal = value.asInstanceOf[SqlLiteral.SqlSymbol].ordinal()
+
+ val symbolOrdinal =
+ if (classOf[Enum[_]].isAssignableFrom(value.getClass) ) {
+ value.asInstanceOf[Enum[_]].ordinal()
+ } else {
+ value.asInstanceOf[SqlLiteral.SqlSymbol].ordinal()
+ }
+
generateNonNullLiteral(resultType, symbolOrdinal.toString)
case _ => ??? // TODO more types
}
@@ -747,6 +754,8 @@ class CodeGenerator(
override def visitOver(over: RexOver): GeneratedExpression = ???
+ override def visitSubQuery(subQuery: RexSubQuery): GeneratedExpression = ???
+
// ----------------------------------------------------------------------------------------------
// generator helping methods
// ----------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
index 6e42130..12095a2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -31,7 +31,7 @@ import org.apache.flink.api.table.runtime.MapRunner
import org.apache.flink.api.table.runtime.aggregate.AggregateUtil
import org.apache.flink.api.table.runtime.aggregate.AggregateUtil.CalcitePair
import org.apache.flink.api.table.typeutils.{TypeConverter, RowTypeInfo}
-import org.apache.flink.api.table.{Row, TableConfig}
+import org.apache.flink.api.table.{BatchTableEnvironment, Row, TableConfig}
import scala.collection.JavaConverters._
@@ -76,26 +76,28 @@ class DataSetAggregate(
.item("select", aggregationToString)
}
- override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
+ override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
val child = this.getInput
- val rowCnt = RelMetadataQuery.getRowCount(child)
+ val rowCnt = metadata.getRowCount(child)
val rowSize = this.estimateRowSize(child.getRowType)
val aggCnt = this.namedAggregates.size
planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize)
}
override def translateToPlan(
- config: TableConfig,
+ tableEnv: BatchTableEnvironment,
expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+ val config = tableEnv.getConfig
+
val groupingKeys = grouping.indices.toArray
// add grouping fields, position keys in the input, and input type
val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates,
inputType, rowType, grouping, config)
val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(
- config,
+ tableEnv,
// tell the input operator that this operator currently only supports Rows as input
Some(TypeConverter.DEFAULT_ROW_TYPE))
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
index 67daffc..13bb39d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
@@ -29,7 +29,7 @@ import org.apache.flink.api.table.codegen.CodeGenerator
import org.apache.flink.api.table.plan.nodes.FlinkCalc
import org.apache.flink.api.table.typeutils.TypeConverter
import TypeConverter._
-import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.BatchTableEnvironment
import org.apache.calcite.rex._
/**
@@ -69,17 +69,17 @@ class DataSetCalc(
calcProgram.getCondition != null)
}
- override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
+ override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
val child = this.getInput
- val rowCnt = RelMetadataQuery.getRowCount(child)
+ val rowCnt = metadata.getRowCount(child)
val exprCnt = calcProgram.getExprCount
planner.getCostFactory.makeCost(rowCnt, rowCnt * exprCnt, 0)
}
- override def getRows: Double = {
+ override def estimateRowCount(metadata: RelMetadataQuery): Double = {
val child = this.getInput
- val rowCnt = RelMetadataQuery.getRowCount(child)
+ val rowCnt = metadata.getRowCount(child)
if (calcProgram.getCondition != null) {
// we reduce the result card to push filters down
@@ -89,10 +89,13 @@ class DataSetCalc(
}
}
- override def translateToPlan(config: TableConfig,
+ override def translateToPlan(
+ tableEnv: BatchTableEnvironment,
expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
- val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config)
+ val config = tableEnv.getConfig
+
+ val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val returnType = determineReturnType(
getRowType,
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
index de54897..61e8995 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
@@ -21,7 +21,6 @@ package org.apache.flink.api.table.plan.nodes.dataset
import org.apache.calcite.plan._
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.JoinInfo
-import org.apache.calcite.rel.logical.LogicalJoin
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelWriter, BiRel, RelNode}
import org.apache.calcite.sql.fun.SqlStdOperatorTable
@@ -31,10 +30,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.java.operators.join.JoinType
import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.plan.PlanGenException
import org.apache.flink.api.table.runtime.FlatJoinRunner
import org.apache.flink.api.table.typeutils.TypeConverter
-import org.apache.flink.api.table.{TableException, TableConfig}
+import org.apache.flink.api.table.{BatchTableEnvironment, TableException}
import org.apache.flink.api.common.functions.FlatJoinFunction
import TypeConverter.determineReturnType
import scala.collection.mutable.ArrayBuffer
@@ -92,7 +90,7 @@ class DataSetJoin(
.item("join", joinSelectionToString)
}
- override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
+ override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
if (!translatable) {
// join cannot be translated. Make huge costs
@@ -101,7 +99,7 @@ class DataSetJoin(
// join can be translated. Compute cost estimate
val children = this.getInputs
children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, child) =>
- val rowCnt = RelMetadataQuery.getRowCount(child)
+ val rowCnt = metadata.getRowCount(child)
val rowSize = this.estimateRowSize(child.getRowType)
cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize))
}
@@ -110,9 +108,11 @@ class DataSetJoin(
}
override def translateToPlan(
- config: TableConfig,
+ tableEnv: BatchTableEnvironment,
expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+ val config = tableEnv.getConfig
+
val returnType = determineReturnType(
getRowType,
expectedType,
@@ -156,8 +156,8 @@ class DataSetJoin(
})
}
- val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(config)
- val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(config)
+ val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+ val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val generator = new CodeGenerator(config, leftDataSet.getType, Some(rightDataSet.getType))
val conversion = generator.generateConverterResultExpression(
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
index eaf6b26..e8f81fd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
@@ -23,7 +23,7 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.{BatchTableEnvironment, TableEnvironment, TableConfig}
import org.apache.flink.api.table.plan.nodes.FlinkRel
import scala.collection.JavaConversions._
@@ -31,9 +31,9 @@ import scala.collection.JavaConversions._
trait DataSetRel extends RelNode with FlinkRel {
/**
- * Translates the FlinkRelNode into a Flink operator.
+ * Translates the [[DataSetRel]] node into a [[DataSet]] operator.
*
- * @param config runtime configuration
+ * @param tableEnv [[org.apache.flink.api.table.BatchTableEnvironment]] of the translated Table.
* @param expectedType specifies the type the Flink operator should return. The type must
* have the same arity as the result. For instance, if the
* expected type is a RowTypeInfo this method will return a DataSet of
@@ -42,9 +42,8 @@ trait DataSetRel extends RelNode with FlinkRel {
* @return DataSet of type expectedType or RowTypeInfo
*/
def translateToPlan(
- config: TableConfig,
- expectedType: Option[TypeInformation[Any]] = None)
- : DataSet[Any]
+ tableEnv: BatchTableEnvironment,
+ expectedType: Option[TypeInformation[Any]] = None) : DataSet[Any]
private[flink] def estimateRowSize(rowType: RelDataType): Double = {
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
index 6f94251..01d71b7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.java.typeutils.PojoTypeInfo
-import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.BatchTableEnvironment
import org.apache.flink.api.table.codegen.CodeGenerator
import org.apache.flink.api.table.typeutils.TypeConverter
import TypeConverter.determineReturnType
@@ -67,16 +67,17 @@ class DataSetSource(
s"Source(from: (${rowType.getFieldNames.asScala.toList.mkString(", ")}))"
}
- override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
+ override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
- val rowCnt = RelMetadataQuery.getRowCount(this)
+ val rowCnt = metadata.getRowCount(this)
planner.getCostFactory.makeCost(rowCnt, rowCnt, 0)
}
override def translateToPlan(
- config: TableConfig,
- expectedType: Option[TypeInformation[Any]])
- : DataSet[Any] = {
+ tableEnv: BatchTableEnvironment,
+ expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+ val config = tableEnv.getConfig
val inputDataSet: DataSet[Any] = dataSetTable.dataSet
val inputType = inputDataSet.getType
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
index e4e0a14..b6f6a19 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
@@ -24,7 +24,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelWriter, BiRel, RelNode}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig}
import scala.collection.JavaConverters._
import scala.collection.JavaConversions._
@@ -62,22 +62,22 @@ class DataSetUnion(
super.explainTerms(pw).item("union", unionSelectionToString)
}
- override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
+ override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
val children = this.getInputs
val rowCnt = children.foldLeft(0D) { (rows, child) =>
- rows + RelMetadataQuery.getRowCount(child)
+ rows + metadata.getRowCount(child)
}
planner.getCostFactory.makeCost(rowCnt, 0, 0)
}
override def translateToPlan(
- config: TableConfig,
+ tableEnv: BatchTableEnvironment,
expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
- val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(config)
- val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(config)
+ val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+ val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
}
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
new file mode 100644
index 0000000..9dfd346
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.{RelTraitSet, RelOptCluster}
+import org.apache.calcite.rel.{RelWriter, RelNode}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Values
+import org.apache.calcite.rex.RexLiteral
+import org.apache.flink.api.common.io.{GenericInputFormat, NonParallelInput}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.scala._
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.typeutils.TypeConverter._
+import org.apache.flink.api.table.{BatchTableEnvironment, Row, TableConfig}
+
+import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
+
+/**
+ * DataSet RelNode for a LogicalValues.
+ *
+ */
+class DataSetValues(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ rowType: RelDataType,
+ tuples: ImmutableList[ImmutableList[RexLiteral]])
+ extends Values(cluster, rowType, tuples, traitSet)
+ with DataSetRel {
+
+ override def deriveRowType() = rowType
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataSetValues(
+ cluster,
+ traitSet,
+ rowType,
+ tuples
+ )
+ }
+
+ override def toString: String = {
+ "Values(values: (${rowType.getFieldNames.asScala.toList.mkString(\", \")}))"
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ super.explainTerms(pw).item("values", valuesFieldsToString)
+ }
+
+ override def translateToPlan(
+ tableEnv: BatchTableEnvironment,
+ expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+ val config = tableEnv.getConfig
+
+ val returnType = determineReturnType(
+ getRowType,
+ expectedType,
+ config.getNullCheck,
+ config.getEfficientTypeUsage).asInstanceOf[RowTypeInfo]
+
+ val inputFormat = new ValuesInputFormat(tuples)
+
+ tableEnv.createDataSetSource(inputFormat, returnType).asInstanceOf[DataSet[Any]]
+ }
+
+ private def valuesFieldsToString: String = {
+ rowType.getFieldNames.asScala.toList.mkString(", ")
+ }
+
+}
+
+class ValuesInputFormat(val tuples: ImmutableList[ImmutableList[RexLiteral]])
+ extends GenericInputFormat[Row]
+ with NonParallelInput {
+
+ var readIdx = 0
+
+ override def reachedEnd(): Boolean = readIdx == tuples.size()
+
+ override def nextRecord(reuse: Row): Row = {
+
+ if (readIdx == tuples.size()) {
+ return null
+ }
+
+ val t = tuples.get(readIdx)
+ readIdx += 1
+
+ var i = 0
+ for(f <- t) {
+ reuse.setField(i, f.getValue)
+ i += 1
+ }
+ reuse
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala
index fb058f3..6dfcd03 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -23,7 +23,7 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.calcite.rex.RexProgram
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.StreamTableEnvironment
import org.apache.flink.api.table.codegen.CodeGenerator
import org.apache.flink.api.table.plan.nodes.FlinkCalc
import org.apache.flink.api.table.typeutils.TypeConverter._
@@ -68,10 +68,13 @@ class DataStreamCalc(
calcProgram.getCondition != null)
}
- override def translateToPlan(config: TableConfig,
+ override def translateToPlan(
+ tableEnv: StreamTableEnvironment,
expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
- val inputDataStream = input.asInstanceOf[DataStreamRel].translateToPlan(config)
+ val config = tableEnv.getConfig
+
+ val inputDataStream = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
val returnType = determineReturnType(
getRowType,
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala
index 0673a35..6cf13a5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.table.plan.nodes.datastream
import org.apache.calcite.rel.RelNode
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.{StreamTableEnvironment, TableConfig}
import org.apache.flink.api.table.plan.nodes.FlinkRel
import org.apache.flink.streaming.api.datastream.DataStream
@@ -29,7 +29,7 @@ trait DataStreamRel extends RelNode with FlinkRel {
/**
* Translates the FlinkRelNode into a Flink operator.
*
- * @param config runtime configuration
+ * @param tableEnv The [[StreamTableEnvironment]] of the translated Table.
* @param expectedType specifies the type the Flink operator should return. The type must
* have the same arity as the result. For instance, if the
* expected type is a RowTypeInfo this method will return a DataSet of
@@ -38,9 +38,8 @@ trait DataStreamRel extends RelNode with FlinkRel {
* @return DataStream of type expectedType or RowTypeInfo
*/
def translateToPlan(
- config: TableConfig,
- expectedType: Option[TypeInformation[Any]] = None)
- : DataStream[Any]
+ tableEnv: StreamTableEnvironment,
+ expectedType: Option[TypeInformation[Any]] = None) : DataStream[Any]
}
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamSource.scala
index 314759c..9fc4d5e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamSource.scala
@@ -25,7 +25,7 @@ import org.apache.calcite.rel.core.TableScan
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.PojoTypeInfo
-import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.StreamTableEnvironment
import org.apache.flink.api.table.codegen.CodeGenerator
import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
import org.apache.flink.api.table.plan.schema.DataStreamTable
@@ -60,10 +60,12 @@ class DataStreamSource(
}
override def translateToPlan(
- config: TableConfig,
+ tableEnv: StreamTableEnvironment,
expectedType: Option[TypeInformation[Any]])
: DataStream[Any] = {
+ val config = tableEnv.getConfig
+
val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
val inputType = inputDataStream.getType
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala
index 8c9cca0..e72e9a8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala
@@ -22,7 +22,7 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{RelNode, RelWriter, BiRel}
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.StreamTableEnvironment
import org.apache.flink.streaming.api.datastream.DataStream
import scala.collection.JavaConverters._
@@ -60,11 +60,12 @@ class DataStreamUnion(
"Union(union: (${rowType.getFieldNames.asScala.toList.mkString(\", \")}))"
}
- override def translateToPlan(config: TableConfig,
+ override def translateToPlan(
+ tableEnv: StreamTableEnvironment,
expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
- val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(config)
- val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(config)
+ val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+ val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
leftDataSet.union(rightDataSet)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
new file mode 100644
index 0000000..3e64e64
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Values
+import org.apache.calcite.rex.RexLiteral
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.StreamTableEnvironment
+import org.apache.flink.api.table.plan.nodes.dataset.ValuesInputFormat
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.typeutils.TypeConverter._
+import org.apache.flink.streaming.api.datastream.DataStream
+
+
+/**
+ * DataStream RelNode for LogicalValues.
+ */
+class DataStreamValues(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ rowType: RelDataType,
+ tuples: ImmutableList[ImmutableList[RexLiteral]])
+ extends Values(cluster, rowType, tuples, traitSet)
+ with DataStreamRel {
+
+ override def deriveRowType() = rowType
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataStreamValues(
+ cluster,
+ traitSet,
+ rowType,
+ tuples
+ )
+ }
+
+ override def translateToPlan(
+ tableEnv: StreamTableEnvironment,
+ expectedType: Option[TypeInformation[Any]]) : DataStream[Any] = {
+
+ val config = tableEnv.getConfig
+
+ val returnType = determineReturnType(
+ getRowType,
+ expectedType,
+ config.getNullCheck,
+ config.getEfficientTypeUsage).asInstanceOf[RowTypeInfo]
+
+ val inputFormat = new ValuesInputFormat(tuples)
+
+ tableEnv.createDataStreamSource(inputFormat, returnType).asInstanceOf[DataStream[Any]]
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index 427530b..e9bcaa2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -21,9 +21,7 @@ package org.apache.flink.api.table.plan.rules
import org.apache.calcite.rel.rules._
import org.apache.calcite.tools.{RuleSets, RuleSet}
import org.apache.flink.api.table.plan.rules.dataSet._
-import org.apache.flink.api.table.plan.rules.datastream.DataStreamCalcRule
-import org.apache.flink.api.table.plan.rules.datastream.DataStreamScanRule
-import org.apache.flink.api.table.plan.rules.datastream.DataStreamUnionRule
+import org.apache.flink.api.table.plan.rules.datastream._
object FlinkRuleSets {
@@ -41,7 +39,7 @@ object FlinkRuleSets {
// push filter into the children of a join
FilterJoinRule.JOIN,
// push filter through an aggregation
- FlinkFilterAggregateTransposeRule.INSTANCE,
+ FilterAggregateTransposeRule.INSTANCE,
// aggregation and projection rules
AggregateProjectMergeRule.INSTANCE,
@@ -100,7 +98,8 @@ object FlinkRuleSets {
DataSetCalcRule.INSTANCE,
DataSetJoinRule.INSTANCE,
DataSetScanRule.INSTANCE,
- DataSetUnionRule.INSTANCE
+ DataSetUnionRule.INSTANCE,
+ DataSetValuesRule.INSTANCE
)
/**
@@ -108,11 +107,6 @@ object FlinkRuleSets {
*/
val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(
- // translate to DataStream nodes
- DataStreamCalcRule.INSTANCE,
- DataStreamScanRule.INSTANCE,
- DataStreamUnionRule.INSTANCE,
-
// calc rules
FilterToCalcRule.INSTANCE,
ProjectToCalcRule.INSTANCE,
@@ -134,7 +128,13 @@ object FlinkRuleSets {
ProjectRemoveRule.INSTANCE,
// merge and push unions rules
- UnionEliminatorRule.INSTANCE
+ UnionEliminatorRule.INSTANCE,
+
+ // translate to DataStream nodes
+ DataStreamCalcRule.INSTANCE,
+ DataStreamScanRule.INSTANCE,
+ DataStreamUnionRule.INSTANCE,
+ DataStreamValuesRule.INSTANCE
)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
index e6a6993..0449fc3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
@@ -30,7 +30,7 @@ class DataSetAggregateRule
classOf[LogicalAggregate],
Convention.NONE,
DataSetConvention.INSTANCE,
- "FlinkAggregateRule")
+ "DataSetAggregateRule")
{
def convert(rel: RelNode): RelNode = {
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala
index 3821024..88e74a9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala
@@ -29,7 +29,7 @@ class DataSetCalcRule
classOf[LogicalCalc],
Convention.NONE,
DataSetConvention.INSTANCE,
- "FlinkCalcRule")
+ "DataSetCalcRule")
{
def convert(rel: RelNode): RelNode = {
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala
index 89d33c9..55100d2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala
@@ -32,7 +32,7 @@ class DataSetJoinRule
classOf[LogicalJoin],
Convention.NONE,
DataSetConvention.INSTANCE,
- "FlinkJoinRule")
+ "DataSetJoinRule")
{
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala
index 3cdaca3..f95f6ea 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala
@@ -31,7 +31,7 @@ class DataSetScanRule
classOf[LogicalTableScan],
Convention.NONE,
DataSetConvention.INSTANCE,
- "FlinkScanRule")
+ "DataSetScanRule")
{
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala
index cd1de1e..7809d6d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala
@@ -29,7 +29,7 @@ class DataSetUnionRule
classOf[LogicalUnion],
Convention.NONE,
DataSetConvention.INSTANCE,
- "FlinkUnionRule")
+ "DataSetUnionRule")
{
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala
new file mode 100644
index 0000000..c28b458
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{RelOptRule, RelTraitSet, Convention}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalValues
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetValues, DataSetConvention}
+
+class DataSetValuesRule
+ extends ConverterRule(
+ classOf[LogicalValues],
+ Convention.NONE,
+ DataSetConvention.INSTANCE,
+ "DataSetValuesRule")
+{
+
+ def convert(rel: RelNode): RelNode = {
+
+ val values: LogicalValues = rel.asInstanceOf[LogicalValues]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+
+ new DataSetValues(
+ rel.getCluster,
+ traitSet,
+ rel.getRowType,
+ values.getTuples)
+ }
+}
+
+object DataSetValuesRule {
+ val INSTANCE: RelOptRule = new DataSetValuesRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/FlinkFilterAggregateTransposeRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/FlinkFilterAggregateTransposeRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/FlinkFilterAggregateTransposeRule.scala
deleted file mode 100644
index c27fc75..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/FlinkFilterAggregateTransposeRule.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.RelOptRule
-import org.apache.calcite.plan.RelOptRuleCall
-import org.apache.calcite.plan.RelOptUtil
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.core.Aggregate
-import org.apache.calcite.rel.core.Filter
-import org.apache.calcite.rel.core.RelFactories
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.tools.RelBuilder
-import org.apache.calcite.tools.RelBuilderFactory
-import org.apache.calcite.util.ImmutableBitSet
-import com.google.common.collect.ImmutableList
-
-import scala.collection.JavaConversions._
-
-/**
- * This rule is a (fixed) copy of the FilterAggregateTransposeRule of Apache Calcite.
- *
- * A fix for this rule is contained Calcite's master branch.
- * This custom rule can be removed once Calcite 1.7 is released and our dependency adjusted.
- *
- * Planner rule that pushes a `org.apache.calcite.rel.core.Filter`
- * past a `org.apache.calcite.rel.core.Aggregate`.
- *
- * @see org.apache.calcite.rel.rules.AggregateFilterTransposeRule
- */
-class FlinkFilterAggregateTransposeRule(
- filterClass: Class[_ <: Filter],
- builderFactory: RelBuilderFactory,
- aggregateClass: Class[_ <: Aggregate])
- extends RelOptRule(
- RelOptRule.operand(filterClass, RelOptRule.operand(aggregateClass, RelOptRule.any)),
- builderFactory,
- null)
-{
-
- def onMatch(call: RelOptRuleCall) {
- val filterRel: Filter = call.rel(0)
- val aggRel: Aggregate = call.rel(1)
- val conditions = RelOptUtil.conjunctions(filterRel.getCondition).toList
- val rexBuilder = filterRel.getCluster.getRexBuilder
- val origFields = aggRel.getRowType.getFieldList.toList
-
- // Fixed computation of adjustments
- val adjustments = aggRel.getGroupSet.asList().zipWithIndex.map {
- case (g, i) => g - i
- }.toArray
-
- var pushedConditions: List[RexNode] = Nil
- var remainingConditions: List[RexNode] = Nil
-
- for (condition <- conditions) {
- val rCols: ImmutableBitSet = RelOptUtil.InputFinder.bits(condition)
- if (canPush(aggRel, rCols)) {
- pushedConditions = condition.accept(
- new RelOptUtil.RexInputConverter(
- rexBuilder,
- origFields,
- aggRel.getInput(0).getRowType.getFieldList,
- adjustments)) :: pushedConditions
- }
- else {
- remainingConditions = condition :: remainingConditions
- }
- }
- val builder: RelBuilder = call.builder
- var rel: RelNode = builder.push(aggRel.getInput).filter(pushedConditions).build
- if (rel eq aggRel.getInput(0)) {
- return
- }
- rel = aggRel.copy(aggRel.getTraitSet, ImmutableList.of(rel))
- rel = builder.push(rel).filter(remainingConditions).build
- call.transformTo(rel)
- }
-
- private def canPush(aggregate: Aggregate, rCols: ImmutableBitSet): Boolean = {
- val groupKeys: ImmutableBitSet = ImmutableBitSet.range(0, aggregate.getGroupSet.cardinality)
- if (!groupKeys.contains(rCols)) {
- return false
- }
- if (aggregate.indicator) {
- import scala.collection.JavaConversions._
- for (groupingSet <- aggregate.getGroupSets) {
- if (!groupingSet.contains(rCols)) {
- return false
- }
- }
- }
- true
- }
-}
-
-object FlinkFilterAggregateTransposeRule {
- /** The default instance of `FilterAggregateTransposeRule`.
- *
- * It matches any kind of agg. or filter
- */
- val INSTANCE: FlinkFilterAggregateTransposeRule =
- new FlinkFilterAggregateTransposeRule(
- classOf[Filter],
- RelFactories.LOGICAL_BUILDER,
- classOf[Aggregate])
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/367687df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala
new file mode 100644
index 0000000..fa2b428
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalValues
+import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamValues, DataStreamConvention}
+
+class DataStreamValuesRule
+ extends ConverterRule(
+ classOf[LogicalValues],
+ Convention.NONE,
+ DataStreamConvention.INSTANCE,
+ "DataStreamValuesRule")
+{
+
+ def convert(rel: RelNode): RelNode = {
+
+ val values: LogicalValues = rel.asInstanceOf[LogicalValues]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+
+ new DataStreamValues(
+ rel.getCluster,
+ traitSet,
+ rel.getRowType,
+ values.getTuples)
+ }
+}
+
+object DataStreamValuesRule {
+ val INSTANCE: RelOptRule = new DataStreamValuesRule
+}