You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:48 UTC
[19/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
new file mode 100644
index 0000000..9902486
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.FlinkAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo, TypeConverter}
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
+import org.apache.flink.table.api.StreamTableEnvironment
+
+import scala.collection.JavaConverters._
+
+class DataStreamAggregate(
+ window: LogicalWindow,
+ namedProperties: Seq[NamedWindowProperty],
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ inputNode: RelNode,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ rowRelDataType: RelDataType,
+ inputType: RelDataType,
+ grouping: Array[Int])
+ extends SingleRel(cluster, traitSet, inputNode)
+ with FlinkAggregate
+ with DataStreamRel {
+
+ override def deriveRowType() = rowRelDataType
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataStreamAggregate(
+ window,
+ namedProperties,
+ cluster,
+ traitSet,
+ inputs.get(0),
+ namedAggregates,
+ getRowType,
+ inputType,
+ grouping)
+ }
+
+ override def toString: String = {
+ s"Aggregate(${
+ if (!grouping.isEmpty) {
+ s"groupBy: (${groupingToString(inputType, grouping)}), "
+ } else {
+ ""
+ }
+ }window: ($window), " +
+ s"select: (${
+ aggregationToString(
+ inputType,
+ grouping,
+ getRowType,
+ namedAggregates,
+ namedProperties)
+ }))"
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ super.explainTerms(pw)
+ .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty)
+ .item("window", window)
+ .item("select", aggregationToString(
+ inputType,
+ grouping,
+ getRowType,
+ namedAggregates,
+ namedProperties))
+ }
+
+ override def translateToPlan(
+ tableEnv: StreamTableEnvironment,
+ expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+
+ val config = tableEnv.getConfig
+ val groupingKeys = grouping.indices.toArray
+ val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(
+ tableEnv,
+ // tell the input operator that this operator currently only supports Rows as input
+ Some(TypeConverter.DEFAULT_ROW_TYPE))
+
+ // get the output types
+ val fieldTypes: Array[TypeInformation[_]] =
+ getRowType.getFieldList.asScala
+ .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
+ .toArray
+
+ val rowTypeInfo = new RowTypeInfo(fieldTypes: _*)
+
+ val aggString = aggregationToString(
+ inputType,
+ grouping,
+ getRowType,
+ namedAggregates,
+ namedProperties)
+
+ val prepareOpName = s"prepare select: ($aggString)"
+ val keyedAggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " +
+ s"window: ($window), " +
+ s"select: ($aggString)"
+ val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
+
+ val mapFunction = AggregateUtil.createPrepareMapFunction(
+ namedAggregates,
+ grouping,
+ inputType)
+
+ val mappedInput = inputDS.map(mapFunction).name(prepareOpName)
+
+ val result: DataStream[Any] = {
+ // check whether all aggregates support partial aggregate
+ if (AggregateUtil.doAllSupportPartialAggregation(
+ namedAggregates.map(_.getKey),
+ inputType,
+ grouping.length)) {
+ // do Incremental Aggregation
+ val reduceFunction = AggregateUtil.createIncrementalAggregateReduceFunction(
+ namedAggregates,
+ inputType,
+ getRowType,
+ grouping)
+ // grouped / keyed aggregation
+ if (groupingKeys.length > 0) {
+ val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction(
+ window,
+ namedAggregates,
+ inputType,
+ rowRelDataType,
+ grouping,
+ namedProperties)
+
+ val keyedStream = mappedInput.keyBy(groupingKeys: _*)
+ val windowedStream =
+ createKeyedWindowedStream(window, keyedStream)
+ .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
+
+ windowedStream
+ .apply(reduceFunction, windowFunction)
+ .returns(rowTypeInfo)
+ .name(keyedAggOpName)
+ .asInstanceOf[DataStream[Any]]
+ }
+ // global / non-keyed aggregation
+ else {
+ val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction(
+ window,
+ namedAggregates,
+ inputType,
+ rowRelDataType,
+ grouping,
+ namedProperties)
+
+ val windowedStream =
+ createNonKeyedWindowedStream(window, mappedInput)
+ .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
+
+ windowedStream
+ .apply(reduceFunction, windowFunction)
+ .returns(rowTypeInfo)
+ .name(nonKeyedAggOpName)
+ .asInstanceOf[DataStream[Any]]
+ }
+ }
+ else {
+ // do non-Incremental Aggregation
+ // grouped / keyed aggregation
+ if (groupingKeys.length > 0) {
+
+ val windowFunction = AggregateUtil.createWindowAggregationFunction(
+ window,
+ namedAggregates,
+ inputType,
+ rowRelDataType,
+ grouping,
+ namedProperties)
+
+ val keyedStream = mappedInput.keyBy(groupingKeys: _*)
+ val windowedStream =
+ createKeyedWindowedStream(window, keyedStream)
+ .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
+
+ windowedStream
+ .apply(windowFunction)
+ .returns(rowTypeInfo)
+ .name(keyedAggOpName)
+ .asInstanceOf[DataStream[Any]]
+ }
+ // global / non-keyed aggregation
+ else {
+ val windowFunction = AggregateUtil.createAllWindowAggregationFunction(
+ window,
+ namedAggregates,
+ inputType,
+ rowRelDataType,
+ grouping,
+ namedProperties)
+
+ val windowedStream =
+ createNonKeyedWindowedStream(window, mappedInput)
+ .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
+
+ windowedStream
+ .apply(windowFunction)
+ .returns(rowTypeInfo)
+ .name(nonKeyedAggOpName)
+ .asInstanceOf[DataStream[Any]]
+ }
+ }
+ }
+ // if the expected type is not a Row, inject a mapper to convert to the expected type
+ expectedType match {
+ case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
+ val mapName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
+ result.map(getConversionMapper(
+ config = config,
+ nullableInput = false,
+ inputType = rowTypeInfo.asInstanceOf[TypeInformation[Any]],
+ expectedType = expectedType.get,
+ conversionOperatorName = "DataStreamAggregateConversion",
+ fieldNames = getRowType.getFieldNames.asScala
+ ))
+ .name(mapName)
+ case _ => result
+ }
+ }
+}
+object DataStreamAggregate {
+
+
+ private def createKeyedWindowedStream(groupWindow: LogicalWindow, stream: KeyedStream[Row, Tuple])
+ : WindowedStream[Row, Tuple, _ <: DataStreamWindow] = groupWindow match {
+
+ case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
+ stream.window(TumblingProcessingTimeWindows.of(asTime(size)))
+
+ case ProcessingTimeTumblingGroupWindow(_, size) =>
+ stream.countWindow(asCount(size))
+
+ case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
+ stream.window(TumblingEventTimeWindows.of(asTime(size)))
+
+ case EventTimeTumblingGroupWindow(_, _, size) =>
+ // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
+ // before applying the windowing logic. Otherwise, this would be the same as a
+ // ProcessingTimeTumblingGroupWindow
+ throw new UnsupportedOperationException("Event-time grouping windows on row intervals are " +
+ "currently not supported.")
+
+ case ProcessingTimeSlidingGroupWindow(_, size, slide) if isTimeInterval(size.resultType) =>
+ stream.window(SlidingProcessingTimeWindows.of(asTime(size), asTime(slide)))
+
+ case ProcessingTimeSlidingGroupWindow(_, size, slide) =>
+ stream.countWindow(asCount(size), asCount(slide))
+
+ case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
+ stream.window(SlidingEventTimeWindows.of(asTime(size), asTime(slide)))
+
+ case EventTimeSlidingGroupWindow(_, _, size, slide) =>
+ // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
+ // before applying the windowing logic. Otherwise, this would be the same as a
+ // ProcessingTimeTumblingGroupWindow
+ throw new UnsupportedOperationException("Event-time grouping windows on row intervals are " +
+ "currently not supported.")
+
+ case ProcessingTimeSessionGroupWindow(_, gap: Expression) =>
+ stream.window(ProcessingTimeSessionWindows.withGap(asTime(gap)))
+
+ case EventTimeSessionGroupWindow(_, _, gap) =>
+ stream.window(EventTimeSessionWindows.withGap(asTime(gap)))
+ }
+
+ private def createNonKeyedWindowedStream(groupWindow: LogicalWindow, stream: DataStream[Row])
+ : AllWindowedStream[Row, _ <: DataStreamWindow] = groupWindow match {
+
+ case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
+ stream.windowAll(TumblingProcessingTimeWindows.of(asTime(size)))
+
+ case ProcessingTimeTumblingGroupWindow(_, size) =>
+ stream.countWindowAll(asCount(size))
+
+ case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
+ stream.windowAll(TumblingEventTimeWindows.of(asTime(size)))
+
+ case EventTimeTumblingGroupWindow(_, _, size) =>
+ // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
+ // before applying the windowing logic. Otherwise, this would be the same as a
+ // ProcessingTimeTumblingGroupWindow
+ throw new UnsupportedOperationException("Event-time grouping windows on row intervals are " +
+ "currently not supported.")
+
+ case ProcessingTimeSlidingGroupWindow(_, size, slide) if isTimeInterval(size.resultType) =>
+ stream.windowAll(SlidingProcessingTimeWindows.of(asTime(size), asTime(slide)))
+
+ case ProcessingTimeSlidingGroupWindow(_, size, slide) =>
+ stream.countWindowAll(asCount(size), asCount(slide))
+
+ case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
+ stream.windowAll(SlidingEventTimeWindows.of(asTime(size), asTime(slide)))
+
+ case EventTimeSlidingGroupWindow(_, _, size, slide) =>
+ // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
+ // before applying the windowing logic. Otherwise, this would be the same as a
+ // ProcessingTimeTumblingGroupWindow
+ throw new UnsupportedOperationException("Event-time grouping windows on row intervals are " +
+ "currently not supported.")
+
+ case ProcessingTimeSessionGroupWindow(_, gap) =>
+ stream.windowAll(ProcessingTimeSessionWindows.withGap(asTime(gap)))
+
+ case EventTimeSessionGroupWindow(_, _, gap) =>
+ stream.windowAll(EventTimeSessionWindows.withGap(asTime(gap)))
+ }
+
+ def asTime(expr: Expression): Time = expr match {
+ case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) => Time.milliseconds(value)
+ case _ => throw new IllegalArgumentException()
+ }
+
+ def asCount(expr: Expression): Long = expr match {
+ case Literal(value: Long, RowIntervalTypeInfo.INTERVAL_ROWS) => value
+ case _ => throw new IllegalArgumentException()
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
new file mode 100644
index 0000000..774c17b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex.RexProgram
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.plan.nodes.FlinkCalc
+import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.StreamTableEnvironment
+
+/**
+ * Flink RelNode which matches along with FlatMapOperator.
+ *
+ */
+class DataStreamCalc(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ input: RelNode,
+ rowRelDataType: RelDataType,
+ calcProgram: RexProgram,
+ ruleDescription: String)
+ extends SingleRel(cluster, traitSet, input)
+ with FlinkCalc
+ with DataStreamRel {
+
+ override def deriveRowType() = rowRelDataType
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataStreamCalc(
+ cluster,
+ traitSet,
+ inputs.get(0),
+ getRowType,
+ calcProgram,
+ ruleDescription
+ )
+ }
+
+ override def toString: String = calcToString(calcProgram, getExpressionString)
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ super.explainTerms(pw)
+ .item("select", selectionToString(calcProgram, getExpressionString))
+ .itemIf("where",
+ conditionToString(calcProgram, getExpressionString),
+ calcProgram.getCondition != null)
+ }
+
+ override def translateToPlan(
+ tableEnv: StreamTableEnvironment,
+ expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+
+ val config = tableEnv.getConfig
+
+ val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+
+ val returnType = determineReturnType(
+ getRowType,
+ expectedType,
+ config.getNullCheck,
+ config.getEfficientTypeUsage)
+
+ val generator = new CodeGenerator(config, false, inputDataStream.getType)
+
+ val body = functionBody(
+ generator,
+ inputDataStream.getType,
+ getRowType,
+ calcProgram,
+ config,
+ expectedType)
+
+ val genFunction = generator.generateFunction(
+ ruleDescription,
+ classOf[FlatMapFunction[Any, Any]],
+ body,
+ returnType)
+
+ val mapFunc = calcMapFunction(genFunction)
+ inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamConvention.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamConvention.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamConvention.scala
new file mode 100644
index 0000000..9525d1f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamConvention.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+
+class DataStreamConvention extends Convention {
+
+ override def toString: String = getName
+
+ override def useAbstractConvertersForConversion(
+ fromTraits: RelTraitSet,
+ toTraits: RelTraitSet): Boolean = false
+
+ override def canConvertConvention(toConvention: Convention): Boolean = false
+
+ def getInterface: Class[_] = classOf[DataStreamRel]
+
+ def getName: String = "DATASTREAM"
+
+ def getTraitDef: RelTraitDef[_ <: RelTrait] = ConventionTraitDef.INSTANCE
+
+ def satisfies(`trait`: RelTrait): Boolean = this eq `trait`
+
+ def register(planner: RelOptPlanner): Unit = { }
+}
+
+object DataStreamConvention {
+
+ val INSTANCE = new DataStreamConvention
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
new file mode 100644
index 0000000..a2d167b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.nodes.FlinkCorrelate
+import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.StreamTableEnvironment
+
+/**
+ * Flink RelNode which matches along with join a user defined table function.
+ */
+class DataStreamCorrelate(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ inputNode: RelNode,
+ scan: LogicalTableFunctionScan,
+ condition: Option[RexNode],
+ relRowType: RelDataType,
+ joinRowType: RelDataType,
+ joinType: SemiJoinType,
+ ruleDescription: String)
+ extends SingleRel(cluster, traitSet, inputNode)
+ with FlinkCorrelate
+ with DataStreamRel {
+
+ override def deriveRowType() = relRowType
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataStreamCorrelate(
+ cluster,
+ traitSet,
+ inputs.get(0),
+ scan,
+ condition,
+ relRowType,
+ joinRowType,
+ joinType,
+ ruleDescription)
+ }
+
+ override def toString: String = {
+ val rexCall = scan.getCall.asInstanceOf[RexCall]
+ val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
+ correlateToString(rexCall, sqlFunction)
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ val rexCall = scan.getCall.asInstanceOf[RexCall]
+ val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
+ super.explainTerms(pw)
+ .item("invocation", scan.getCall)
+ .item("function", sqlFunction.getTableFunction.getClass.getCanonicalName)
+ .item("rowType", relRowType)
+ .item("joinType", joinType)
+ .itemIf("condition", condition.orNull, condition.isDefined)
+ }
+
+ override def translateToPlan(
+ tableEnv: StreamTableEnvironment,
+ expectedType: Option[TypeInformation[Any]])
+ : DataStream[Any] = {
+
+ val config = tableEnv.getConfig
+ val returnType = determineReturnType(
+ getRowType,
+ expectedType,
+ config.getNullCheck,
+ config.getEfficientTypeUsage)
+
+ // we do not need to specify input type
+ val inputDS = inputNode.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+
+ val funcRel = scan.asInstanceOf[LogicalTableFunctionScan]
+ val rexCall = funcRel.getCall.asInstanceOf[RexCall]
+ val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
+ val pojoFieldMapping = sqlFunction.getPojoFieldMapping
+ val udtfTypeInfo = sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]]
+
+ val generator = new CodeGenerator(
+ config,
+ false,
+ inputDS.getType,
+ Some(udtfTypeInfo),
+ None,
+ Some(pojoFieldMapping))
+
+ val body = functionBody(
+ generator,
+ udtfTypeInfo,
+ getRowType,
+ rexCall,
+ condition,
+ config,
+ joinType,
+ expectedType)
+
+ val genFunction = generator.generateFunction(
+ ruleDescription,
+ classOf[FlatMapFunction[Any, Any]],
+ body,
+ returnType)
+
+ val mapFunc = correlateMapFunction(genFunction)
+
+ inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, relRowType))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
new file mode 100644
index 0000000..16427b8
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.rel.RelNode
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.plan.nodes.FlinkRel
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.StreamTableEnvironment
+
+trait DataStreamRel extends RelNode with FlinkRel {
+
+ /**
+ * Translates the FlinkRelNode into a Flink operator.
+ *
+ * @param tableEnv The [[StreamTableEnvironment]] of the translated Table.
+ * @param expectedType specifies the type the Flink operator should return. The type must
+ * have the same arity as the result. For instance, if the
+ * expected type is a RowTypeInfo this method will return a DataSet of
+ * type Row. If the expected type is Tuple2, the operator will return
+ * a Tuple2 if possible. Row otherwise.
+ * @return DataStream of type expectedType or RowTypeInfo
+ */
+ def translateToPlan(
+ tableEnv: StreamTableEnvironment,
+ expectedType: Option[TypeInformation[Any]] = None) : DataStream[Any]
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
new file mode 100644
index 0000000..2d5ec09
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.plan.schema.DataStreamTable
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.StreamTableEnvironment
+
+/**
+ * Flink RelNode which matches along with DataStreamSource.
+ * It ensures that types without deterministic field order (e.g. POJOs) are not part of
+ * the plan translation.
+ */
+class DataStreamScan(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ table: RelOptTable,
+ rowRelDataType: RelDataType)
+ extends StreamScan(cluster, traitSet, table) {
+
+ val dataStreamTable: DataStreamTable[Any] = getTable.unwrap(classOf[DataStreamTable[Any]])
+
+ override def deriveRowType() = rowRelDataType
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataStreamScan(
+ cluster,
+ traitSet,
+ getTable,
+ getRowType
+ )
+ }
+
+ override def translateToPlan(
+ tableEnv: StreamTableEnvironment,
+ expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+
+ val config = tableEnv.getConfig
+ val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
+
+ convertToExpectedType(inputDataStream, dataStreamTable, expectedType, config)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
new file mode 100644
index 0000000..beb15d2
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.StreamTableEnvironment
+
+import scala.collection.JavaConverters._
+
+/**
+ * Flink RelNode which matches along with Union.
+ *
+ */
+class DataStreamUnion(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ leftNode: RelNode,
+ rightNode: RelNode,
+ rowRelDataType: RelDataType)
+ extends BiRel(cluster, traitSet, leftNode, rightNode)
+ with DataStreamRel {
+
+ override def deriveRowType() = rowRelDataType
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataStreamUnion(
+ cluster,
+ traitSet,
+ inputs.get(0),
+ inputs.get(1),
+ getRowType
+ )
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ super.explainTerms(pw).item("union", unionSelectionToString)
+ }
+
+ override def toString = {
+ s"Union(union: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
+ }
+
+ override def translateToPlan(
+ tableEnv: StreamTableEnvironment,
+ expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+
+ val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+ val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+ leftDataSet.union(rightDataSet)
+ }
+
+ private def unionSelectionToString: String = {
+ getRowType.getFieldNames.asScala.toList.mkString(", ")
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
new file mode 100644
index 0000000..f2a3d72
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Values
+import org.apache.calcite.rex.RexLiteral
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.runtime.io.ValuesInputFormat
+import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.StreamTableEnvironment
+
+import scala.collection.JavaConverters._
+
+/**
+ * DataStream RelNode for LogicalValues.
+ */
+class DataStreamValues(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ rowRelDataType: RelDataType,
+ tuples: ImmutableList[ImmutableList[RexLiteral]],
+ ruleDescription: String)
+ extends Values(cluster, rowRelDataType, tuples, traitSet)
+ with DataStreamRel {
+
+ override def deriveRowType() = rowRelDataType
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataStreamValues(
+ cluster,
+ traitSet,
+ getRowType,
+ getTuples,
+ ruleDescription
+ )
+ }
+
+ override def translateToPlan(
+ tableEnv: StreamTableEnvironment,
+ expectedType: Option[TypeInformation[Any]])
+ : DataStream[Any] = {
+
+ val config = tableEnv.getConfig
+
+ val returnType = determineReturnType(
+ getRowType,
+ expectedType,
+ config.getNullCheck,
+ config.getEfficientTypeUsage)
+
+ val generator = new CodeGenerator(config)
+
+ // generate code for every record
+ val generatedRecords = getTuples.asScala.map { r =>
+ generator.generateResultExpression(
+ returnType,
+ getRowType.getFieldNames.asScala,
+ r.asScala)
+ }
+
+ // generate input format
+ val generatedFunction = generator.generateValuesInputFormat(
+ ruleDescription,
+ generatedRecords.map(_.code),
+ returnType)
+
+ val inputFormat = new ValuesInputFormat[Any](
+ generatedFunction.name,
+ generatedFunction.code,
+ generatedFunction.returnType)
+
+ tableEnv.execEnv.createInput(inputFormat, returnType).asInstanceOf[DataStream[Any]]
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
new file mode 100644
index 0000000..ddac958
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.TableScan
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.PojoTypeInfo
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.plan.schema.FlinkTable
+import org.apache.flink.table.runtime.MapRunner
+import org.apache.flink.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.TableConfig
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+abstract class StreamScan(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ table: RelOptTable)
+ extends TableScan(cluster, traitSet, table)
+ with DataStreamRel {
+
+ protected def convertToExpectedType(
+ input: DataStream[Any],
+ flinkTable: FlinkTable[_],
+ expectedType: Option[TypeInformation[Any]],
+ config: TableConfig): DataStream[Any] = {
+
+ val inputType = input.getType
+
+ expectedType match {
+
+ // special case:
+ // if efficient type usage is enabled and no expected type is set
+ // we can simply forward the DataSet to the next operator.
+ // however, we cannot forward PojoTypes as their fields don't have an order
+ case None if config.getEfficientTypeUsage && !inputType.isInstanceOf[PojoTypeInfo[_]] =>
+ input
+
+ case _ =>
+ val determinedType = determineReturnType(
+ getRowType,
+ expectedType,
+ config.getNullCheck,
+ config.getEfficientTypeUsage)
+
+ // conversion
+ if (determinedType != inputType) {
+ val generator = new CodeGenerator(
+ config,
+ nullableInput = false,
+ input.getType,
+ flinkTable.fieldIndexes)
+
+ val conversion = generator.generateConverterResultExpression(
+ determinedType,
+ getRowType.getFieldNames)
+
+ val body =
+ s"""
+ |${conversion.code}
+ |return ${conversion.resultTerm};
+ |""".stripMargin
+
+ val genFunction = generator.generateFunction(
+ "DataSetSourceConversion",
+ classOf[MapFunction[Any, Any]],
+ body,
+ determinedType)
+
+ val mapFunc = new MapRunner[Any, Any](
+ genFunction.name,
+ genFunction.code,
+ genFunction.returnType)
+
+ val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
+
+ input.map(mapFunc).name(opName)
+ }
+ // no conversion necessary, forward
+ else {
+ input
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
new file mode 100644
index 0000000..f86a54b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.sources.StreamTableSource
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.StreamTableEnvironment
+
+/** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
+class StreamTableSourceScan(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ table: RelOptTable,
+ tableSource: StreamTableSource[_])
+ extends StreamScan(cluster, traitSet, table) {
+
+ override def deriveRowType() = {
+ val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+ flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes)
+ }
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new StreamTableSourceScan(
+ cluster,
+ traitSet,
+ getTable,
+ tableSource
+ )
+ }
+
+ override def translateToPlan(
+ tableEnv: StreamTableEnvironment,
+ expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+
+ val config = tableEnv.getConfig
+ val inputDataStream: DataStream[Any] = tableSource
+ .getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
+
+ convertToExpectedType(inputDataStream, new TableSourceTable(tableSource), expectedType, config)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/EnumerableToLogicalTableScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/EnumerableToLogicalTableScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/EnumerableToLogicalTableScan.scala
new file mode 100644
index 0000000..86b8a23
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/EnumerableToLogicalTableScan.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.plan.{RelOptRuleCall, RelOptRule, RelOptRuleOperand}
+import org.apache.calcite.rel.logical.LogicalTableScan
+
+/**
+ * Rule that converts an EnumerableTableScan into a LogicalTableScan.
+ * We need this rule because Calcite creates an EnumerableTableScan
+ * when parsing a SQL query. We convert it into a LogicalTableScan
+ * so we can merge the optimization process with any plan that might be created
+ * by the Table API.
+ */
+class EnumerableToLogicalTableScan(
+ operand: RelOptRuleOperand,
+ description: String) extends RelOptRule(operand, description) {
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val oldRel = call.rel(0).asInstanceOf[EnumerableTableScan]
+ val table = oldRel.getTable
+ val newRel = LogicalTableScan.create(oldRel.getCluster, table)
+ call.transformTo(newRel)
+ }
+}
+
+object EnumerableToLogicalTableScan {
+ val INSTANCE = new EnumerableToLogicalTableScan(
+ operand(classOf[EnumerableTableScan], any),
+ "EnumerableToLogicalTableScan")
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
new file mode 100644
index 0000000..bcd12a4
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules
+
+import org.apache.calcite.rel.rules._
+import org.apache.calcite.tools.{RuleSets, RuleSet}
+import org.apache.flink.table.plan.rules.dataSet._
+import org.apache.flink.table.plan.rules.datastream._
+import org.apache.flink.table.plan.rules.datastream.{DataStreamCalcRule, DataStreamScanRule, DataStreamUnionRule}
+
+object FlinkRuleSets {
+
+ /**
+ * RuleSet to optimize plans for batch / DataSet execution
+ */
+ val DATASET_OPT_RULES: RuleSet = RuleSets.ofList(
+
+ // convert a logical table scan to a relational expression
+ TableScanRule.INSTANCE,
+ EnumerableToLogicalTableScan.INSTANCE,
+
+ // push a filter into a join
+ FilterJoinRule.FILTER_ON_JOIN,
+ // push filter into the children of a join
+ FilterJoinRule.JOIN,
+ // push filter through an aggregation
+ FilterAggregateTransposeRule.INSTANCE,
+
+ // aggregation and projection rules
+ AggregateProjectMergeRule.INSTANCE,
+ AggregateProjectPullUpConstantsRule.INSTANCE,
+ // push a projection past a filter or vice versa
+ ProjectFilterTransposeRule.INSTANCE,
+ FilterProjectTransposeRule.INSTANCE,
+ // push a projection to the children of a join
+ ProjectJoinTransposeRule.INSTANCE,
+ // remove identity project
+ ProjectRemoveRule.INSTANCE,
+ // reorder sort and projection
+ SortProjectTransposeRule.INSTANCE,
+ ProjectSortTransposeRule.INSTANCE,
+
+ // join rules
+ JoinPushExpressionsRule.INSTANCE,
+
+ // remove union with only a single child
+ UnionEliminatorRule.INSTANCE,
+ // convert non-all union into all-union + distinct
+ UnionToDistinctRule.INSTANCE,
+
+ // remove aggregation if it does not aggregate and input is already distinct
+ AggregateRemoveRule.INSTANCE,
+ // push aggregate through join
+ AggregateJoinTransposeRule.EXTENDED,
+ // aggregate union rule
+ AggregateUnionAggregateRule.INSTANCE,
+
+ // remove unnecessary sort rule
+ SortRemoveRule.INSTANCE,
+
+ // simplify expressions rules
+ ReduceExpressionsRule.FILTER_INSTANCE,
+ ReduceExpressionsRule.PROJECT_INSTANCE,
+ ReduceExpressionsRule.CALC_INSTANCE,
+ ReduceExpressionsRule.JOIN_INSTANCE,
+
+ // prune empty results rules
+ PruneEmptyRules.AGGREGATE_INSTANCE,
+ PruneEmptyRules.FILTER_INSTANCE,
+ PruneEmptyRules.JOIN_LEFT_INSTANCE,
+ PruneEmptyRules.JOIN_RIGHT_INSTANCE,
+ PruneEmptyRules.PROJECT_INSTANCE,
+ PruneEmptyRules.SORT_INSTANCE,
+ PruneEmptyRules.UNION_INSTANCE,
+
+ // calc rules
+ FilterCalcMergeRule.INSTANCE,
+ ProjectCalcMergeRule.INSTANCE,
+ FilterToCalcRule.INSTANCE,
+ ProjectToCalcRule.INSTANCE,
+ CalcMergeRule.INSTANCE,
+
+ // translate to Flink DataSet nodes
+ DataSetAggregateRule.INSTANCE,
+ DataSetAggregateWithNullValuesRule.INSTANCE,
+ DataSetCalcRule.INSTANCE,
+ DataSetJoinRule.INSTANCE,
+ DataSetSingleRowJoinRule.INSTANCE,
+ DataSetScanRule.INSTANCE,
+ DataSetUnionRule.INSTANCE,
+ DataSetIntersectRule.INSTANCE,
+ DataSetMinusRule.INSTANCE,
+ DataSetSortRule.INSTANCE,
+ DataSetValuesRule.INSTANCE,
+ DataSetCorrelateRule.INSTANCE,
+ BatchTableSourceScanRule.INSTANCE,
+ // project pushdown optimization
+ PushProjectIntoBatchTableSourceScanRule.INSTANCE
+ )
+
+ /**
+ * RuleSet to optimize plans for stream / DataStream execution
+ */
+ val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(
+
+ // convert a logical table scan to a relational expression
+ TableScanRule.INSTANCE,
+ EnumerableToLogicalTableScan.INSTANCE,
+
+ // calc rules
+ FilterToCalcRule.INSTANCE,
+ ProjectToCalcRule.INSTANCE,
+ FilterCalcMergeRule.INSTANCE,
+ ProjectCalcMergeRule.INSTANCE,
+ CalcMergeRule.INSTANCE,
+
+ // prune empty results rules
+ PruneEmptyRules.FILTER_INSTANCE,
+ PruneEmptyRules.PROJECT_INSTANCE,
+ PruneEmptyRules.UNION_INSTANCE,
+
+ // push and merge projection rules
+ ProjectFilterTransposeRule.INSTANCE,
+ FilterProjectTransposeRule.INSTANCE,
+ ProjectRemoveRule.INSTANCE,
+
+ // simplify expressions rules
+ ReduceExpressionsRule.FILTER_INSTANCE,
+ ReduceExpressionsRule.PROJECT_INSTANCE,
+ ReduceExpressionsRule.CALC_INSTANCE,
+
+ // merge and push unions rules
+ UnionEliminatorRule.INSTANCE,
+
+ // translate to DataStream nodes
+ DataStreamAggregateRule.INSTANCE,
+ DataStreamCalcRule.INSTANCE,
+ DataStreamScanRule.INSTANCE,
+ DataStreamUnionRule.INSTANCE,
+ DataStreamValuesRule.INSTANCE,
+ DataStreamCorrelateRule.INSTANCE,
+ StreamTableSourceScanRule.INSTANCE
+ )
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
new file mode 100644
index 0000000..d699585
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetConvention}
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.sources.BatchTableSource
+
+/** Rule to convert a [[LogicalTableScan]] into a [[BatchTableSourceScan]]. */
+class BatchTableSourceScanRule
+ extends ConverterRule(
+ classOf[LogicalTableScan],
+ Convention.NONE,
+ DataSetConvention.INSTANCE,
+ "BatchTableSourceScanRule")
+ {
+
+ /** Rule must only match if TableScan targets a [[BatchTableSource]] */
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
+ val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable])
+ dataSetTable match {
+ case tst: TableSourceTable =>
+ tst.tableSource match {
+ case _: BatchTableSource[_] =>
+ true
+ case _ =>
+ false
+ }
+ case _ =>
+ false
+ }
+ }
+
+ def convert(rel: RelNode): RelNode = {
+ val scan: TableScan = rel.asInstanceOf[TableScan]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+
+ val tableSource = scan.getTable.unwrap(classOf[TableSourceTable]).tableSource
+ .asInstanceOf[BatchTableSource[_]]
+ new BatchTableSourceScan(
+ rel.getCluster,
+ traitSet,
+ scan.getTable,
+ tableSource
+ )
+ }
+}
+
+object BatchTableSourceScanRule {
+ val INSTANCE: RelOptRule = new BatchTableSourceScanRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala
new file mode 100644
index 0000000..d634a6c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalAggregate
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention}
+
+import scala.collection.JavaConversions._
+
+class DataSetAggregateRule
+ extends ConverterRule(
+ classOf[LogicalAggregate],
+ Convention.NONE,
+ DataSetConvention.INSTANCE,
+ "DataSetAggregateRule")
+ {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]
+
+ //for non grouped agg sets should attach null row to source data
+ //need apply DataSetAggregateWithNullValuesRule
+ if (agg.getGroupSet.isEmpty) {
+ return false
+ }
+
+ // check if we have distinct aggregates
+ val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
+ if (distinctAggs) {
+ throw TableException("DISTINCT aggregates are currently not supported.")
+ }
+
+ // check if we have grouping sets
+ val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
+ if (groupSets || agg.indicator) {
+ throw TableException("GROUPING SETS are currently not supported.")
+ }
+
+ !distinctAggs && !groupSets && !agg.indicator
+ }
+
+ override def convert(rel: RelNode): RelNode = {
+ val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+ val convInput: RelNode = RelOptRule.convert(agg.getInput, DataSetConvention.INSTANCE)
+
+ new DataSetAggregate(
+ rel.getCluster,
+ traitSet,
+ convInput,
+ agg.getNamedAggCalls,
+ rel.getRowType,
+ agg.getInput.getRowType,
+ agg.getGroupSet.toArray)
+ }
+ }
+
+object DataSetAggregateRule {
+ val INSTANCE: RelOptRule = new DataSetAggregateRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
new file mode 100644
index 0000000..b708af4
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan._
+
+import scala.collection.JavaConversions._
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalUnion, LogicalValues}
+import org.apache.calcite.rex.RexLiteral
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention}
+
+/**
+ * Rule for insert [[org.apache.flink.types.Row]] with null records into a [[DataSetAggregate]]
+ * Rule apply for non grouped aggregate query
+ */
+class DataSetAggregateWithNullValuesRule
+ extends ConverterRule(
+ classOf[LogicalAggregate],
+ Convention.NONE,
+ DataSetConvention.INSTANCE,
+ "DataSetAggregateWithNullValuesRule")
+{
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]
+
+ //for grouped agg sets shouldn't attach of null row
+ //need apply other rules. e.g. [[DataSetAggregateRule]]
+ if (!agg.getGroupSet.isEmpty) {
+ return false
+ }
+
+ // check if we have distinct aggregates
+ val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
+ if (distinctAggs) {
+ throw TableException("DISTINCT aggregates are currently not supported.")
+ }
+
+ // check if we have grouping sets
+ val groupSets = agg.getGroupSets.size() == 0 || agg.getGroupSets.get(0) != agg.getGroupSet
+ if (groupSets || agg.indicator) {
+ throw TableException("GROUPING SETS are currently not supported.")
+ }
+ !distinctAggs && !groupSets && !agg.indicator
+ }
+
+ override def convert(rel: RelNode): RelNode = {
+ val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+ val cluster: RelOptCluster = rel.getCluster
+
+ val fieldTypes = agg.getInput.getRowType.getFieldList.map(_.getType)
+ val nullLiterals: ImmutableList[ImmutableList[RexLiteral]] =
+ ImmutableList.of(ImmutableList.copyOf[RexLiteral](
+ for (fieldType <- fieldTypes)
+ yield {
+ cluster.getRexBuilder.
+ makeLiteral(null, fieldType, false).asInstanceOf[RexLiteral]
+ }))
+
+ val logicalValues = LogicalValues.create(cluster, agg.getInput.getRowType, nullLiterals)
+ val logicalUnion = LogicalUnion.create(List(logicalValues, agg.getInput), true)
+
+ new DataSetAggregate(
+ cluster,
+ traitSet,
+ RelOptRule.convert(logicalUnion, DataSetConvention.INSTANCE),
+ agg.getNamedAggCalls,
+ rel.getRowType,
+ agg.getInput.getRowType,
+ agg.getGroupSet.toArray
+ )
+ }
+}
+
+object DataSetAggregateWithNullValuesRule {
+ val INSTANCE: RelOptRule = new DataSetAggregateWithNullValuesRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCalcRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCalcRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCalcRule.scala
new file mode 100644
index 0000000..1d100fa
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCalcRule.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalCalc
+import org.apache.flink.table.plan.nodes.dataset.{DataSetCalc, DataSetConvention}
+
+class DataSetCalcRule
+ extends ConverterRule(
+ classOf[LogicalCalc],
+ Convention.NONE,
+ DataSetConvention.INSTANCE,
+ "DataSetCalcRule")
+ {
+
+ def convert(rel: RelNode): RelNode = {
+ val calc: LogicalCalc = rel.asInstanceOf[LogicalCalc]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+ val convInput: RelNode = RelOptRule.convert(calc.getInput, DataSetConvention.INSTANCE)
+
+ new DataSetCalc(
+ rel.getCluster,
+ traitSet,
+ convInput,
+ rel.getRowType,
+ calc.getProgram,
+ description)
+ }
+ }
+
+object DataSetCalcRule {
+ val INSTANCE: RelOptRule = new DataSetCalcRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCorrelateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCorrelateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCorrelateRule.scala
new file mode 100644
index 0000000..819bcae
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCorrelateRule.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan}
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetCorrelate}
+
+/**
+ * Rule to convert a LogicalCorrelate into a DataSetCorrelate.
+ */
+class DataSetCorrelateRule
+ extends ConverterRule(
+ classOf[LogicalCorrelate],
+ Convention.NONE,
+ DataSetConvention.INSTANCE,
+ "DataSetCorrelateRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate]
+ val right = join.getRight.asInstanceOf[RelSubset].getOriginal
+
+
+ right match {
+ // right node is a table function
+ case scan: LogicalTableFunctionScan => true
+ // a filter is pushed above the table function
+ case filter: LogicalFilter =>
+ filter
+ .getInput.asInstanceOf[RelSubset]
+ .getOriginal
+ .isInstanceOf[LogicalTableFunctionScan]
+ case _ => false
+ }
+ }
+
+ override def convert(rel: RelNode): RelNode = {
+ val join: LogicalCorrelate = rel.asInstanceOf[LogicalCorrelate]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+ val convInput: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE)
+ val right: RelNode = join.getInput(1)
+
+ def convertToCorrelate(relNode: RelNode, condition: Option[RexNode]): DataSetCorrelate = {
+ relNode match {
+ case rel: RelSubset =>
+ convertToCorrelate(rel.getRelList.get(0), condition)
+
+ case filter: LogicalFilter =>
+ convertToCorrelate(
+ filter.getInput.asInstanceOf[RelSubset].getOriginal,
+ Some(filter.getCondition))
+
+ case scan: LogicalTableFunctionScan =>
+ new DataSetCorrelate(
+ rel.getCluster,
+ traitSet,
+ convInput,
+ scan,
+ condition,
+ rel.getRowType,
+ join.getRowType,
+ join.getJoinType,
+ description)
+ }
+ }
+ convertToCorrelate(right, None)
+ }
+ }
+
+object DataSetCorrelateRule {
+ val INSTANCE: RelOptRule = new DataSetCorrelateRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetIntersectRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetIntersectRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetIntersectRule.scala
new file mode 100644
index 0000000..d158f34
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetIntersectRule.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalIntersect
+import org.apache.flink.table.plan.nodes.dataset.{DataSetIntersect, DataSetConvention}
+
+class DataSetIntersectRule
+ extends ConverterRule(
+ classOf[LogicalIntersect],
+ Convention.NONE,
+ DataSetConvention.INSTANCE,
+ "DataSetIntersectRule")
+{
+
+ def convert(rel: RelNode): RelNode = {
+
+ val intersect: LogicalIntersect = rel.asInstanceOf[LogicalIntersect]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+ val convLeft: RelNode = RelOptRule.convert(intersect.getInput(0), DataSetConvention.INSTANCE)
+ val convRight: RelNode = RelOptRule.convert(intersect.getInput(1), DataSetConvention.INSTANCE)
+
+ new DataSetIntersect(
+ rel.getCluster,
+ traitSet,
+ convLeft,
+ convRight,
+ rel.getRowType,
+ intersect.all)
+ }
+}
+
+object DataSetIntersectRule {
+ val INSTANCE: RelOptRule = new DataSetIntersectRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala
new file mode 100644
index 0000000..2874198
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalJoin
+
+import org.apache.flink.table.plan.nodes.dataset.{DataSetJoin, DataSetConvention}
+
+import scala.collection.JavaConversions._
+
+class DataSetJoinRule
+ extends ConverterRule(
+ classOf[LogicalJoin],
+ Convention.NONE,
+ DataSetConvention.INSTANCE,
+ "DataSetJoinRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin]
+
+ val joinInfo = join.analyzeCondition
+
+ // joins require an equi-condition or a conjunctive predicate with at least one equi-condition
+ !joinInfo.pairs().isEmpty
+ }
+
+ override def convert(rel: RelNode): RelNode = {
+
+ val join: LogicalJoin = rel.asInstanceOf[LogicalJoin]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+ val convLeft: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE)
+ val convRight: RelNode = RelOptRule.convert(join.getInput(1), DataSetConvention.INSTANCE)
+ val joinInfo = join.analyzeCondition
+
+ new DataSetJoin(
+ rel.getCluster,
+ traitSet,
+ convLeft,
+ convRight,
+ rel.getRowType,
+ join.getCondition,
+ join.getRowType,
+ joinInfo,
+ joinInfo.pairs.toList,
+ join.getJoinType,
+ null,
+ description)
+ }
+
+}
+
+object DataSetJoinRule {
+ val INSTANCE: RelOptRule = new DataSetJoinRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetMinusRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetMinusRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetMinusRule.scala
new file mode 100644
index 0000000..7172596
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetMinusRule.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalMinus
+import org.apache.calcite.rel.rules.UnionToDistinctRule
+import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetMinus}
+
+class DataSetMinusRule
+ extends ConverterRule(
+ classOf[LogicalMinus],
+ Convention.NONE,
+ DataSetConvention.INSTANCE,
+ "DataSetMinusRule")
+{
+
+ def convert(rel: RelNode): RelNode = {
+
+ val minus: LogicalMinus = rel.asInstanceOf[LogicalMinus]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+ val convLeft: RelNode = RelOptRule.convert(minus.getInput(0), DataSetConvention.INSTANCE)
+ val convRight: RelNode = RelOptRule.convert(minus.getInput(1), DataSetConvention.INSTANCE)
+
+ new DataSetMinus(
+ rel.getCluster,
+ traitSet,
+ convLeft,
+ convRight,
+ rel.getRowType,
+ minus.all)
+ }
+}
+
+object DataSetMinusRule {
+ val INSTANCE: RelOptRule = new DataSetMinusRule
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetScanRule.scala
new file mode 100644
index 0000000..9d593aa
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetScanRule.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetScan}
+import org.apache.flink.table.plan.schema.DataSetTable
+
+class DataSetScanRule
+ extends ConverterRule(
+ classOf[LogicalTableScan],
+ Convention.NONE,
+ DataSetConvention.INSTANCE,
+ "DataSetScanRule")
+ {
+
+ /**
+ * If the input is not a DataSetTable, we want the TableScanRule to match instead
+ */
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
+ val dataSetTable = scan.getTable.unwrap(classOf[DataSetTable[Any]])
+ dataSetTable match {
+ case _: DataSetTable[Any] =>
+ true
+ case _ =>
+ false
+ }
+ }
+
+ def convert(rel: RelNode): RelNode = {
+ val scan: TableScan = rel.asInstanceOf[TableScan]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+
+ new DataSetScan(
+ rel.getCluster,
+ traitSet,
+ scan.getTable,
+ rel.getRowType
+ )
+ }
+}
+
+object DataSetScanRule {
+ val INSTANCE: RelOptRule = new DataSetScanRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
new file mode 100644
index 0000000..1f5c91a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin}
+import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetSingleRowJoin}
+
+class DataSetSingleRowJoinRule
+ extends ConverterRule(
+ classOf[LogicalJoin],
+ Convention.NONE,
+ DataSetConvention.INSTANCE,
+ "DataSetSingleRowCrossRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val join = call.rel(0).asInstanceOf[LogicalJoin]
+
+ if (isInnerJoin(join)) {
+ isGlobalAggregation(join.getRight.asInstanceOf[RelSubset].getOriginal) ||
+ isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal)
+ } else {
+ false
+ }
+ }
+
+ private def isInnerJoin(join: LogicalJoin) = {
+ join.getJoinType == JoinRelType.INNER
+ }
+
+ private def isGlobalAggregation(node: RelNode) = {
+ node.isInstanceOf[LogicalAggregate] &&
+ isSingleRow(node.asInstanceOf[LogicalAggregate])
+ }
+
+ private def isSingleRow(agg: LogicalAggregate) = {
+ agg.getGroupSet.isEmpty
+ }
+
+ override def convert(rel: RelNode): RelNode = {
+ val join = rel.asInstanceOf[LogicalJoin]
+ val traitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+ val dataSetLeftNode = RelOptRule.convert(join.getLeft, DataSetConvention.INSTANCE)
+ val dataSetRightNode = RelOptRule.convert(join.getRight, DataSetConvention.INSTANCE)
+ val leftIsSingle = isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal)
+
+ new DataSetSingleRowJoin(
+ rel.getCluster,
+ traitSet,
+ dataSetLeftNode,
+ dataSetRightNode,
+ leftIsSingle,
+ rel.getRowType,
+ join.getCondition,
+ join.getRowType,
+ description)
+ }
+}
+
+object DataSetSingleRowJoinRule {
+ val INSTANCE: RelOptRule = new DataSetSingleRowJoinRule
+}