You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:47:02 UTC
[33/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
deleted file mode 100644
index 7133773..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
+++ /dev/null
@@ -1,358 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.datastream
-
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.AggregateCall
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.table.plan.logical._
-import org.apache.flink.api.table.plan.nodes.FlinkAggregate
-import org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate._
-import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._
-import org.apache.flink.api.table.runtime.aggregate.{Aggregate, _}
-import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval
-import org.apache.flink.api.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo, TypeConverter}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.table.{FlinkTypeFactory, StreamTableEnvironment}
-import org.apache.flink.types.Row
-import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
-import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
-
-import scala.collection.JavaConverters._
-
-class DataStreamAggregate(
- window: LogicalWindow,
- namedProperties: Seq[NamedWindowProperty],
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- inputNode: RelNode,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- rowRelDataType: RelDataType,
- inputType: RelDataType,
- grouping: Array[Int])
- extends SingleRel(cluster, traitSet, inputNode)
- with FlinkAggregate
- with DataStreamRel {
-
- override def deriveRowType() = rowRelDataType
-
- override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
- new DataStreamAggregate(
- window,
- namedProperties,
- cluster,
- traitSet,
- inputs.get(0),
- namedAggregates,
- getRowType,
- inputType,
- grouping)
- }
-
- override def toString: String = {
- s"Aggregate(${
- if (!grouping.isEmpty) {
- s"groupBy: (${groupingToString(inputType, grouping)}), "
- } else {
- ""
- }
- }window: ($window), " +
- s"select: (${
- aggregationToString(
- inputType,
- grouping,
- getRowType,
- namedAggregates,
- namedProperties)
- }))"
- }
-
- override def explainTerms(pw: RelWriter): RelWriter = {
- super.explainTerms(pw)
- .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty)
- .item("window", window)
- .item("select", aggregationToString(
- inputType,
- grouping,
- getRowType,
- namedAggregates,
- namedProperties))
- }
-
- override def translateToPlan(
- tableEnv: StreamTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
-
- val config = tableEnv.getConfig
- val groupingKeys = grouping.indices.toArray
- val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(
- tableEnv,
- // tell the input operator that this operator currently only supports Rows as input
- Some(TypeConverter.DEFAULT_ROW_TYPE))
-
- // get the output types
- val fieldTypes: Array[TypeInformation[_]] =
- getRowType.getFieldList.asScala
- .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
- .toArray
-
- val rowTypeInfo = new RowTypeInfo(fieldTypes: _*)
-
- val aggString = aggregationToString(
- inputType,
- grouping,
- getRowType,
- namedAggregates,
- namedProperties)
-
- val prepareOpName = s"prepare select: ($aggString)"
- val keyedAggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " +
- s"window: ($window), " +
- s"select: ($aggString)"
- val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
-
- val mapFunction = AggregateUtil.createPrepareMapFunction(
- namedAggregates,
- grouping,
- inputType)
-
- val mappedInput = inputDS.map(mapFunction).name(prepareOpName)
-
- val result: DataStream[Any] = {
- // check whether all aggregates support partial aggregate
- if (AggregateUtil.doAllSupportPartialAggregation(
- namedAggregates.map(_.getKey),
- inputType,
- grouping.length)) {
- // do Incremental Aggregation
- val reduceFunction = AggregateUtil.createIncrementalAggregateReduceFunction(
- namedAggregates,
- inputType,
- getRowType,
- grouping)
- // grouped / keyed aggregation
- if (groupingKeys.length > 0) {
- val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction(
- window,
- namedAggregates,
- inputType,
- rowRelDataType,
- grouping,
- namedProperties)
-
- val keyedStream = mappedInput.keyBy(groupingKeys: _*)
- val windowedStream =
- createKeyedWindowedStream(window, keyedStream)
- .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
- windowedStream
- .apply(reduceFunction, windowFunction)
- .returns(rowTypeInfo)
- .name(keyedAggOpName)
- .asInstanceOf[DataStream[Any]]
- }
- // global / non-keyed aggregation
- else {
- val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction(
- window,
- namedAggregates,
- inputType,
- rowRelDataType,
- grouping,
- namedProperties)
-
- val windowedStream =
- createNonKeyedWindowedStream(window, mappedInput)
- .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
-
- windowedStream
- .apply(reduceFunction, windowFunction)
- .returns(rowTypeInfo)
- .name(nonKeyedAggOpName)
- .asInstanceOf[DataStream[Any]]
- }
- }
- else {
- // do non-Incremental Aggregation
- // grouped / keyed aggregation
- if (groupingKeys.length > 0) {
-
- val windowFunction = AggregateUtil.createWindowAggregationFunction(
- window,
- namedAggregates,
- inputType,
- rowRelDataType,
- grouping,
- namedProperties)
-
- val keyedStream = mappedInput.keyBy(groupingKeys: _*)
- val windowedStream =
- createKeyedWindowedStream(window, keyedStream)
- .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
- windowedStream
- .apply(windowFunction)
- .returns(rowTypeInfo)
- .name(keyedAggOpName)
- .asInstanceOf[DataStream[Any]]
- }
- // global / non-keyed aggregation
- else {
- val windowFunction = AggregateUtil.createAllWindowAggregationFunction(
- window,
- namedAggregates,
- inputType,
- rowRelDataType,
- grouping,
- namedProperties)
-
- val windowedStream =
- createNonKeyedWindowedStream(window, mappedInput)
- .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
-
- windowedStream
- .apply(windowFunction)
- .returns(rowTypeInfo)
- .name(nonKeyedAggOpName)
- .asInstanceOf[DataStream[Any]]
- }
- }
- }
- // if the expected type is not a Row, inject a mapper to convert to the expected type
- expectedType match {
- case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
- val mapName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
- result.map(getConversionMapper(
- config = config,
- nullableInput = false,
- inputType = rowTypeInfo.asInstanceOf[TypeInformation[Any]],
- expectedType = expectedType.get,
- conversionOperatorName = "DataStreamAggregateConversion",
- fieldNames = getRowType.getFieldNames.asScala
- ))
- .name(mapName)
- case _ => result
- }
- }
-}
-object DataStreamAggregate {
-
-
- private def createKeyedWindowedStream(groupWindow: LogicalWindow, stream: KeyedStream[Row, Tuple])
- : WindowedStream[Row, Tuple, _ <: DataStreamWindow] = groupWindow match {
-
- case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
- stream.window(TumblingProcessingTimeWindows.of(asTime(size)))
-
- case ProcessingTimeTumblingGroupWindow(_, size) =>
- stream.countWindow(asCount(size))
-
- case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
- stream.window(TumblingEventTimeWindows.of(asTime(size)))
-
- case EventTimeTumblingGroupWindow(_, _, size) =>
- // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
- // before applying the windowing logic. Otherwise, this would be the same as a
- // ProcessingTimeTumblingGroupWindow
- throw new UnsupportedOperationException("Event-time grouping windows on row intervals are " +
- "currently not supported.")
-
- case ProcessingTimeSlidingGroupWindow(_, size, slide) if isTimeInterval(size.resultType) =>
- stream.window(SlidingProcessingTimeWindows.of(asTime(size), asTime(slide)))
-
- case ProcessingTimeSlidingGroupWindow(_, size, slide) =>
- stream.countWindow(asCount(size), asCount(slide))
-
- case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
- stream.window(SlidingEventTimeWindows.of(asTime(size), asTime(slide)))
-
- case EventTimeSlidingGroupWindow(_, _, size, slide) =>
- // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
- // before applying the windowing logic. Otherwise, this would be the same as a
- // ProcessingTimeTumblingGroupWindow
- throw new UnsupportedOperationException("Event-time grouping windows on row intervals are " +
- "currently not supported.")
-
- case ProcessingTimeSessionGroupWindow(_, gap: Expression) =>
- stream.window(ProcessingTimeSessionWindows.withGap(asTime(gap)))
-
- case EventTimeSessionGroupWindow(_, _, gap) =>
- stream.window(EventTimeSessionWindows.withGap(asTime(gap)))
- }
-
- private def createNonKeyedWindowedStream(groupWindow: LogicalWindow, stream: DataStream[Row])
- : AllWindowedStream[Row, _ <: DataStreamWindow] = groupWindow match {
-
- case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
- stream.windowAll(TumblingProcessingTimeWindows.of(asTime(size)))
-
- case ProcessingTimeTumblingGroupWindow(_, size) =>
- stream.countWindowAll(asCount(size))
-
- case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
- stream.windowAll(TumblingEventTimeWindows.of(asTime(size)))
-
- case EventTimeTumblingGroupWindow(_, _, size) =>
- // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
- // before applying the windowing logic. Otherwise, this would be the same as a
- // ProcessingTimeTumblingGroupWindow
- throw new UnsupportedOperationException("Event-time grouping windows on row intervals are " +
- "currently not supported.")
-
- case ProcessingTimeSlidingGroupWindow(_, size, slide) if isTimeInterval(size.resultType) =>
- stream.windowAll(SlidingProcessingTimeWindows.of(asTime(size), asTime(slide)))
-
- case ProcessingTimeSlidingGroupWindow(_, size, slide) =>
- stream.countWindowAll(asCount(size), asCount(slide))
-
- case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
- stream.windowAll(SlidingEventTimeWindows.of(asTime(size), asTime(slide)))
-
- case EventTimeSlidingGroupWindow(_, _, size, slide) =>
- // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
- // before applying the windowing logic. Otherwise, this would be the same as a
- // ProcessingTimeTumblingGroupWindow
- throw new UnsupportedOperationException("Event-time grouping windows on row intervals are " +
- "currently not supported.")
-
- case ProcessingTimeSessionGroupWindow(_, gap) =>
- stream.windowAll(ProcessingTimeSessionWindows.withGap(asTime(gap)))
-
- case EventTimeSessionGroupWindow(_, _, gap) =>
- stream.windowAll(EventTimeSessionWindows.withGap(asTime(gap)))
- }
-
- def asTime(expr: Expression): Time = expr match {
- case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) => Time.milliseconds(value)
- case _ => throw new IllegalArgumentException()
- }
-
- def asCount(expr: Expression): Long = expr match {
- case Literal(value: Long, RowIntervalTypeInfo.INTERVAL_ROWS) => value
- case _ => throw new IllegalArgumentException()
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala
deleted file mode 100644
index 5312a5f..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.datastream
-
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.calcite.rex.RexProgram
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.StreamTableEnvironment
-import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.plan.nodes.FlinkCalc
-import org.apache.flink.api.table.typeutils.TypeConverter._
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.streaming.api.datastream.DataStream
-
-/**
- * Flink RelNode which matches along with FlatMapOperator.
- *
- */
-class DataStreamCalc(
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- input: RelNode,
- rowRelDataType: RelDataType,
- calcProgram: RexProgram,
- ruleDescription: String)
- extends SingleRel(cluster, traitSet, input)
- with FlinkCalc
- with DataStreamRel {
-
- override def deriveRowType() = rowRelDataType
-
- override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
- new DataStreamCalc(
- cluster,
- traitSet,
- inputs.get(0),
- getRowType,
- calcProgram,
- ruleDescription
- )
- }
-
- override def toString: String = calcToString(calcProgram, getExpressionString)
-
- override def explainTerms(pw: RelWriter): RelWriter = {
- super.explainTerms(pw)
- .item("select", selectionToString(calcProgram, getExpressionString))
- .itemIf("where",
- conditionToString(calcProgram, getExpressionString),
- calcProgram.getCondition != null)
- }
-
- override def translateToPlan(
- tableEnv: StreamTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
-
- val config = tableEnv.getConfig
-
- val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
-
- val returnType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
- val generator = new CodeGenerator(config, false, inputDataStream.getType)
-
- val body = functionBody(
- generator,
- inputDataStream.getType,
- getRowType,
- calcProgram,
- config,
- expectedType)
-
- val genFunction = generator.generateFunction(
- ruleDescription,
- classOf[FlatMapFunction[Any, Any]],
- body,
- returnType)
-
- val mapFunc = calcMapFunction(genFunction)
- inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala
deleted file mode 100644
index 3b6a653..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.datastream
-
-import org.apache.calcite.plan._
-
-class DataStreamConvention extends Convention {
-
- override def toString: String = getName
-
- override def useAbstractConvertersForConversion(
- fromTraits: RelTraitSet,
- toTraits: RelTraitSet): Boolean = false
-
- override def canConvertConvention(toConvention: Convention): Boolean = false
-
- def getInterface: Class[_] = classOf[DataStreamRel]
-
- def getName: String = "DATASTREAM"
-
- def getTraitDef: RelTraitDef[_ <: RelTrait] = ConventionTraitDef.INSTANCE
-
- def satisfies(`trait`: RelTrait): Boolean = this eq `trait`
-
- def register(planner: RelOptPlanner): Unit = { }
-}
-
-object DataStreamConvention {
-
- val INSTANCE = new DataStreamConvention
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala
deleted file mode 100644
index 3bfa6e2..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan.nodes.datastream
-
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.logical.LogicalTableFunctionScan
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.calcite.rex.{RexCall, RexNode}
-import org.apache.calcite.sql.SemiJoinType
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.StreamTableEnvironment
-import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.functions.utils.TableSqlFunction
-import org.apache.flink.api.table.plan.nodes.FlinkCorrelate
-import org.apache.flink.api.table.typeutils.TypeConverter._
-import org.apache.flink.streaming.api.datastream.DataStream
-
-/**
- * Flink RelNode which matches along with join a user defined table function.
- */
-class DataStreamCorrelate(
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- inputNode: RelNode,
- scan: LogicalTableFunctionScan,
- condition: Option[RexNode],
- relRowType: RelDataType,
- joinRowType: RelDataType,
- joinType: SemiJoinType,
- ruleDescription: String)
- extends SingleRel(cluster, traitSet, inputNode)
- with FlinkCorrelate
- with DataStreamRel {
-
- override def deriveRowType() = relRowType
-
- override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
- new DataStreamCorrelate(
- cluster,
- traitSet,
- inputs.get(0),
- scan,
- condition,
- relRowType,
- joinRowType,
- joinType,
- ruleDescription)
- }
-
- override def toString: String = {
- val rexCall = scan.getCall.asInstanceOf[RexCall]
- val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
- correlateToString(rexCall, sqlFunction)
- }
-
- override def explainTerms(pw: RelWriter): RelWriter = {
- val rexCall = scan.getCall.asInstanceOf[RexCall]
- val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
- super.explainTerms(pw)
- .item("invocation", scan.getCall)
- .item("function", sqlFunction.getTableFunction.getClass.getCanonicalName)
- .item("rowType", relRowType)
- .item("joinType", joinType)
- .itemIf("condition", condition.orNull, condition.isDefined)
- }
-
- override def translateToPlan(
- tableEnv: StreamTableEnvironment,
- expectedType: Option[TypeInformation[Any]])
- : DataStream[Any] = {
-
- val config = tableEnv.getConfig
- val returnType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
- // we do not need to specify input type
- val inputDS = inputNode.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
-
- val funcRel = scan.asInstanceOf[LogicalTableFunctionScan]
- val rexCall = funcRel.getCall.asInstanceOf[RexCall]
- val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
- val pojoFieldMapping = sqlFunction.getPojoFieldMapping
- val udtfTypeInfo = sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]]
-
- val generator = new CodeGenerator(
- config,
- false,
- inputDS.getType,
- Some(udtfTypeInfo),
- None,
- Some(pojoFieldMapping))
-
- val body = functionBody(
- generator,
- udtfTypeInfo,
- getRowType,
- rexCall,
- condition,
- config,
- joinType,
- expectedType)
-
- val genFunction = generator.generateFunction(
- ruleDescription,
- classOf[FlatMapFunction[Any, Any]],
- body,
- returnType)
-
- val mapFunc = correlateMapFunction(genFunction)
-
- inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, relRowType))
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala
deleted file mode 100644
index 6cf13a5..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.datastream
-
-import org.apache.calcite.rel.RelNode
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.{StreamTableEnvironment, TableConfig}
-import org.apache.flink.api.table.plan.nodes.FlinkRel
-import org.apache.flink.streaming.api.datastream.DataStream
-
-trait DataStreamRel extends RelNode with FlinkRel {
-
- /**
- * Translates the FlinkRelNode into a Flink operator.
- *
- * @param tableEnv The [[StreamTableEnvironment]] of the translated Table.
- * @param expectedType specifies the type the Flink operator should return. The type must
- * have the same arity as the result. For instance, if the
- * expected type is a RowTypeInfo this method will return a DataSet of
- * type Row. If the expected type is Tuple2, the operator will return
- * a Tuple2 if possible. Row otherwise.
- * @return DataStream of type expectedType or RowTypeInfo
- */
- def translateToPlan(
- tableEnv: StreamTableEnvironment,
- expectedType: Option[TypeInformation[Any]] = None) : DataStream[Any]
-
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala
deleted file mode 100644
index da83b64..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.datastream
-
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.StreamTableEnvironment
-import org.apache.flink.api.table.plan.schema.DataStreamTable
-import org.apache.flink.streaming.api.datastream.DataStream
-
-/**
- * Flink RelNode which matches along with DataStreamSource.
- * It ensures that types without deterministic field order (e.g. POJOs) are not part of
- * the plan translation.
- */
-class DataStreamScan(
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- table: RelOptTable,
- rowRelDataType: RelDataType)
- extends StreamScan(cluster, traitSet, table) {
-
- val dataStreamTable: DataStreamTable[Any] = getTable.unwrap(classOf[DataStreamTable[Any]])
-
- override def deriveRowType() = rowRelDataType
-
- override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
- new DataStreamScan(
- cluster,
- traitSet,
- getTable,
- getRowType
- )
- }
-
- override def translateToPlan(
- tableEnv: StreamTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
-
- val config = tableEnv.getConfig
- val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
-
- convertToExpectedType(inputDataStream, dataStreamTable, expectedType, config)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala
deleted file mode 100644
index f490d31..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.datastream
-
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{RelNode, RelWriter, BiRel}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.StreamTableEnvironment
-import org.apache.flink.streaming.api.datastream.DataStream
-
-import scala.collection.JavaConverters._
-
-/**
- * Flink RelNode which matches along with Union.
- *
- */
-class DataStreamUnion(
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- leftNode: RelNode,
- rightNode: RelNode,
- rowRelDataType: RelDataType)
- extends BiRel(cluster, traitSet, leftNode, rightNode)
- with DataStreamRel {
-
- override def deriveRowType() = rowRelDataType
-
- override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
- new DataStreamUnion(
- cluster,
- traitSet,
- inputs.get(0),
- inputs.get(1),
- getRowType
- )
- }
-
- override def explainTerms(pw: RelWriter): RelWriter = {
- super.explainTerms(pw).item("union", unionSelectionToString)
- }
-
- override def toString = {
- s"Union(union: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
- }
-
- override def translateToPlan(
- tableEnv: StreamTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
-
- val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
- val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
- leftDataSet.union(rightDataSet)
- }
-
- private def unionSelectionToString: String = {
- getRowType.getFieldNames.asScala.toList.mkString(", ")
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
deleted file mode 100644
index 3b98653..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.datastream
-
-import com.google.common.collect.ImmutableList
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.Values
-import org.apache.calcite.rex.RexLiteral
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.StreamTableEnvironment
-import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.runtime.io.ValuesInputFormat
-import org.apache.flink.api.table.typeutils.TypeConverter._
-import org.apache.flink.streaming.api.datastream.DataStream
-
-import scala.collection.JavaConverters._
-
-/**
- * DataStream RelNode for LogicalValues.
- */
-class DataStreamValues(
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- rowRelDataType: RelDataType,
- tuples: ImmutableList[ImmutableList[RexLiteral]],
- ruleDescription: String)
- extends Values(cluster, rowRelDataType, tuples, traitSet)
- with DataStreamRel {
-
- override def deriveRowType() = rowRelDataType
-
- override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
- new DataStreamValues(
- cluster,
- traitSet,
- getRowType,
- getTuples,
- ruleDescription
- )
- }
-
- override def translateToPlan(
- tableEnv: StreamTableEnvironment,
- expectedType: Option[TypeInformation[Any]])
- : DataStream[Any] = {
-
- val config = tableEnv.getConfig
-
- val returnType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
- val generator = new CodeGenerator(config)
-
- // generate code for every record
- val generatedRecords = getTuples.asScala.map { r =>
- generator.generateResultExpression(
- returnType,
- getRowType.getFieldNames.asScala,
- r.asScala)
- }
-
- // generate input format
- val generatedFunction = generator.generateValuesInputFormat(
- ruleDescription,
- generatedRecords.map(_.code),
- returnType)
-
- val inputFormat = new ValuesInputFormat[Any](
- generatedFunction.name,
- generatedFunction.code,
- generatedFunction.returnType)
-
- tableEnv.execEnv.createInput(inputFormat, returnType).asInstanceOf[DataStream[Any]]
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala
deleted file mode 100644
index b13770e..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.datastream
-
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.TableScan
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.PojoTypeInfo
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.plan.schema.FlinkTable
-import org.apache.flink.api.table.runtime.MapRunner
-import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
-import org.apache.flink.streaming.api.datastream.DataStream
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
-abstract class StreamScan(
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- table: RelOptTable)
- extends TableScan(cluster, traitSet, table)
- with DataStreamRel {
-
- protected def convertToExpectedType(
- input: DataStream[Any],
- flinkTable: FlinkTable[_],
- expectedType: Option[TypeInformation[Any]],
- config: TableConfig): DataStream[Any] = {
-
- val inputType = input.getType
-
- expectedType match {
-
- // special case:
- // if efficient type usage is enabled and no expected type is set
- // we can simply forward the DataSet to the next operator.
- // however, we cannot forward PojoTypes as their fields don't have an order
- case None if config.getEfficientTypeUsage && !inputType.isInstanceOf[PojoTypeInfo[_]] =>
- input
-
- case _ =>
- val determinedType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
- // conversion
- if (determinedType != inputType) {
- val generator = new CodeGenerator(
- config,
- nullableInput = false,
- input.getType,
- flinkTable.fieldIndexes)
-
- val conversion = generator.generateConverterResultExpression(
- determinedType,
- getRowType.getFieldNames)
-
- val body =
- s"""
- |${conversion.code}
- |return ${conversion.resultTerm};
- |""".stripMargin
-
- val genFunction = generator.generateFunction(
- "DataSetSourceConversion",
- classOf[MapFunction[Any, Any]],
- body,
- determinedType)
-
- val mapFunc = new MapRunner[Any, Any](
- genFunction.name,
- genFunction.code,
- genFunction.returnType)
-
- val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
- input.map(mapFunc).name(opName)
- }
- // no conversion necessary, forward
- else {
- input
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
deleted file mode 100644
index 8201070..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.datastream
-
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.RelNode
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.plan.schema.TableSourceTable
-import org.apache.flink.api.table.sources.StreamTableSource
-import org.apache.flink.api.table.{FlinkTypeFactory, StreamTableEnvironment}
-import org.apache.flink.streaming.api.datastream.DataStream
-
-/** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
-class StreamTableSourceScan(
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- table: RelOptTable,
- tableSource: StreamTableSource[_])
- extends StreamScan(cluster, traitSet, table) {
-
- override def deriveRowType() = {
- val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
- flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes)
- }
-
- override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
- new StreamTableSourceScan(
- cluster,
- traitSet,
- getTable,
- tableSource
- )
- }
-
- override def translateToPlan(
- tableEnv: StreamTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
-
- val config = tableEnv.getConfig
- val inputDataStream: DataStream[Any] = tableSource
- .getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
-
- convertToExpectedType(inputDataStream, new TableSourceTable(tableSource), expectedType, config)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala
deleted file mode 100644
index ee515c9..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules
-
-import org.apache.calcite.plan.RelOptRule.{any, operand}
-import org.apache.calcite.adapter.enumerable.EnumerableTableScan
-import org.apache.calcite.plan.{RelOptRuleCall, RelOptRule, RelOptRuleOperand}
-import org.apache.calcite.rel.logical.LogicalTableScan
-
-/**
- * Rule that converts an EnumerableTableScan into a LogicalTableScan.
- * We need this rule because Calcite creates an EnumerableTableScan
- * when parsing a SQL query. We convert it into a LogicalTableScan
- * so we can merge the optimization process with any plan that might be created
- * by the Table API.
- */
-class EnumerableToLogicalTableScan(
- operand: RelOptRuleOperand,
- description: String) extends RelOptRule(operand, description) {
-
- override def onMatch(call: RelOptRuleCall): Unit = {
- val oldRel = call.rel(0).asInstanceOf[EnumerableTableScan]
- val table = oldRel.getTable
- val newRel = LogicalTableScan.create(oldRel.getCluster, table)
- call.transformTo(newRel)
- }
-}
-
-object EnumerableToLogicalTableScan {
- val INSTANCE = new EnumerableToLogicalTableScan(
- operand(classOf[EnumerableTableScan], any),
- "EnumerableToLogicalTableScan")
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
deleted file mode 100644
index 183065c..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules
-
-import org.apache.calcite.rel.rules._
-import org.apache.calcite.tools.{RuleSets, RuleSet}
-import org.apache.flink.api.table.plan.rules.dataSet._
-import org.apache.flink.api.table.plan.rules.datastream._
-import org.apache.flink.api.table.plan.rules.datastream.{DataStreamCalcRule, DataStreamScanRule, DataStreamUnionRule}
-
-object FlinkRuleSets {
-
- /**
- * RuleSet to optimize plans for batch / DataSet execution
- */
- val DATASET_OPT_RULES: RuleSet = RuleSets.ofList(
-
- // convert a logical table scan to a relational expression
- TableScanRule.INSTANCE,
- EnumerableToLogicalTableScan.INSTANCE,
-
- // push a filter into a join
- FilterJoinRule.FILTER_ON_JOIN,
- // push filter into the children of a join
- FilterJoinRule.JOIN,
- // push filter through an aggregation
- FilterAggregateTransposeRule.INSTANCE,
-
- // aggregation and projection rules
- AggregateProjectMergeRule.INSTANCE,
- AggregateProjectPullUpConstantsRule.INSTANCE,
- // push a projection past a filter or vice versa
- ProjectFilterTransposeRule.INSTANCE,
- FilterProjectTransposeRule.INSTANCE,
- // push a projection to the children of a join
- ProjectJoinTransposeRule.INSTANCE,
- // remove identity project
- ProjectRemoveRule.INSTANCE,
- // reorder sort and projection
- SortProjectTransposeRule.INSTANCE,
- ProjectSortTransposeRule.INSTANCE,
-
- // join rules
- JoinPushExpressionsRule.INSTANCE,
-
- // remove union with only a single child
- UnionEliminatorRule.INSTANCE,
- // convert non-all union into all-union + distinct
- UnionToDistinctRule.INSTANCE,
-
- // remove aggregation if it does not aggregate and input is already distinct
- AggregateRemoveRule.INSTANCE,
- // push aggregate through join
- AggregateJoinTransposeRule.EXTENDED,
- // aggregate union rule
- AggregateUnionAggregateRule.INSTANCE,
-
- // remove unnecessary sort rule
- SortRemoveRule.INSTANCE,
-
- // simplify expressions rules
- ReduceExpressionsRule.FILTER_INSTANCE,
- ReduceExpressionsRule.PROJECT_INSTANCE,
- ReduceExpressionsRule.CALC_INSTANCE,
- ReduceExpressionsRule.JOIN_INSTANCE,
-
- // prune empty results rules
- PruneEmptyRules.AGGREGATE_INSTANCE,
- PruneEmptyRules.FILTER_INSTANCE,
- PruneEmptyRules.JOIN_LEFT_INSTANCE,
- PruneEmptyRules.JOIN_RIGHT_INSTANCE,
- PruneEmptyRules.PROJECT_INSTANCE,
- PruneEmptyRules.SORT_INSTANCE,
- PruneEmptyRules.UNION_INSTANCE,
-
- // calc rules
- FilterCalcMergeRule.INSTANCE,
- ProjectCalcMergeRule.INSTANCE,
- FilterToCalcRule.INSTANCE,
- ProjectToCalcRule.INSTANCE,
- CalcMergeRule.INSTANCE,
-
- // translate to Flink DataSet nodes
- DataSetAggregateRule.INSTANCE,
- DataSetAggregateWithNullValuesRule.INSTANCE,
- DataSetCalcRule.INSTANCE,
- DataSetJoinRule.INSTANCE,
- DataSetSingleRowJoinRule.INSTANCE,
- DataSetScanRule.INSTANCE,
- DataSetUnionRule.INSTANCE,
- DataSetIntersectRule.INSTANCE,
- DataSetMinusRule.INSTANCE,
- DataSetSortRule.INSTANCE,
- DataSetValuesRule.INSTANCE,
- DataSetCorrelateRule.INSTANCE,
- BatchTableSourceScanRule.INSTANCE,
- // project pushdown optimization
- PushProjectIntoBatchTableSourceScanRule.INSTANCE
- )
-
- /**
- * RuleSet to optimize plans for stream / DataStream execution
- */
- val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(
-
- // convert a logical table scan to a relational expression
- TableScanRule.INSTANCE,
- EnumerableToLogicalTableScan.INSTANCE,
-
- // calc rules
- FilterToCalcRule.INSTANCE,
- ProjectToCalcRule.INSTANCE,
- FilterCalcMergeRule.INSTANCE,
- ProjectCalcMergeRule.INSTANCE,
- CalcMergeRule.INSTANCE,
-
- // prune empty results rules
- PruneEmptyRules.FILTER_INSTANCE,
- PruneEmptyRules.PROJECT_INSTANCE,
- PruneEmptyRules.UNION_INSTANCE,
-
- // push and merge projection rules
- ProjectFilterTransposeRule.INSTANCE,
- FilterProjectTransposeRule.INSTANCE,
- ProjectRemoveRule.INSTANCE,
-
- // simplify expressions rules
- ReduceExpressionsRule.FILTER_INSTANCE,
- ReduceExpressionsRule.PROJECT_INSTANCE,
- ReduceExpressionsRule.CALC_INSTANCE,
-
- // merge and push unions rules
- UnionEliminatorRule.INSTANCE,
-
- // translate to DataStream nodes
- DataStreamAggregateRule.INSTANCE,
- DataStreamCalcRule.INSTANCE,
- DataStreamScanRule.INSTANCE,
- DataStreamUnionRule.INSTANCE,
- DataStreamValuesRule.INSTANCE,
- DataStreamCorrelateRule.INSTANCE,
- StreamTableSourceScanRule.INSTANCE
- )
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
deleted file mode 100644
index 8e3d8bb..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.core.TableScan
-import org.apache.calcite.rel.logical.LogicalTableScan
-import org.apache.flink.api.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetConvention}
-import org.apache.flink.api.table.plan.schema.TableSourceTable
-import org.apache.flink.api.table.sources.BatchTableSource
-
-/** Rule to convert a [[LogicalTableScan]] into a [[BatchTableSourceScan]]. */
-class BatchTableSourceScanRule
- extends ConverterRule(
- classOf[LogicalTableScan],
- Convention.NONE,
- DataSetConvention.INSTANCE,
- "BatchTableSourceScanRule")
- {
-
- /** Rule must only match if TableScan targets a [[BatchTableSource]] */
- override def matches(call: RelOptRuleCall): Boolean = {
- val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
- val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable])
- dataSetTable match {
- case tst: TableSourceTable =>
- tst.tableSource match {
- case _: BatchTableSource[_] =>
- true
- case _ =>
- false
- }
- case _ =>
- false
- }
- }
-
- def convert(rel: RelNode): RelNode = {
- val scan: TableScan = rel.asInstanceOf[TableScan]
- val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-
- val tableSource = scan.getTable.unwrap(classOf[TableSourceTable]).tableSource
- .asInstanceOf[BatchTableSource[_]]
- new BatchTableSourceScan(
- rel.getCluster,
- traitSet,
- scan.getTable,
- tableSource
- )
- }
-}
-
-object BatchTableSourceScanRule {
- val INSTANCE: RelOptRule = new BatchTableSourceScanRule
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
deleted file mode 100644
index 0311c48..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalAggregate
-import org.apache.flink.api.table.TableException
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention}
-import scala.collection.JavaConversions._
-
-class DataSetAggregateRule
- extends ConverterRule(
- classOf[LogicalAggregate],
- Convention.NONE,
- DataSetConvention.INSTANCE,
- "DataSetAggregateRule")
- {
-
- override def matches(call: RelOptRuleCall): Boolean = {
- val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]
-
- //for non grouped agg sets should attach null row to source data
- //need apply DataSetAggregateWithNullValuesRule
- if (agg.getGroupSet.isEmpty) {
- return false
- }
-
- // check if we have distinct aggregates
- val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
- if (distinctAggs) {
- throw TableException("DISTINCT aggregates are currently not supported.")
- }
-
- // check if we have grouping sets
- val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
- if (groupSets || agg.indicator) {
- throw TableException("GROUPING SETS are currently not supported.")
- }
-
- !distinctAggs && !groupSets && !agg.indicator
- }
-
- override def convert(rel: RelNode): RelNode = {
- val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate]
- val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
- val convInput: RelNode = RelOptRule.convert(agg.getInput, DataSetConvention.INSTANCE)
-
- new DataSetAggregate(
- rel.getCluster,
- traitSet,
- convInput,
- agg.getNamedAggCalls,
- rel.getRowType,
- agg.getInput.getRowType,
- agg.getGroupSet.toArray)
- }
- }
-
-object DataSetAggregateRule {
- val INSTANCE: RelOptRule = new DataSetAggregateRule
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
deleted file mode 100644
index 3bf3e0c..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan._
-import scala.collection.JavaConversions._
-import com.google.common.collect.ImmutableList
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.{LogicalValues, LogicalUnion, LogicalAggregate}
-import org.apache.calcite.rex.RexLiteral
-import org.apache.flink.api.table._
-import org.apache.flink.types.Row
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention}
-
-/**
- * Rule for insert [[Row]] with null records into a [[DataSetAggregate]]
- * Rule apply for non grouped aggregate query
- */
-class DataSetAggregateWithNullValuesRule
- extends ConverterRule(
- classOf[LogicalAggregate],
- Convention.NONE,
- DataSetConvention.INSTANCE,
- "DataSetAggregateWithNullValuesRule")
-{
-
- override def matches(call: RelOptRuleCall): Boolean = {
- val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]
-
- //for grouped agg sets shouldn't attach of null row
- //need apply other rules. e.g. [[DataSetAggregateRule]]
- if (!agg.getGroupSet.isEmpty) {
- return false
- }
-
- // check if we have distinct aggregates
- val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
- if (distinctAggs) {
- throw TableException("DISTINCT aggregates are currently not supported.")
- }
-
- // check if we have grouping sets
- val groupSets = agg.getGroupSets.size() == 0 || agg.getGroupSets.get(0) != agg.getGroupSet
- if (groupSets || agg.indicator) {
- throw TableException("GROUPING SETS are currently not supported.")
- }
- !distinctAggs && !groupSets && !agg.indicator
- }
-
- override def convert(rel: RelNode): RelNode = {
- val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate]
- val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
- val cluster: RelOptCluster = rel.getCluster
-
- val fieldTypes = agg.getInput.getRowType.getFieldList.map(_.getType)
- val nullLiterals: ImmutableList[ImmutableList[RexLiteral]] =
- ImmutableList.of(ImmutableList.copyOf[RexLiteral](
- for (fieldType <- fieldTypes)
- yield {
- cluster.getRexBuilder.
- makeLiteral(null, fieldType, false).asInstanceOf[RexLiteral]
- }))
-
- val logicalValues = LogicalValues.create(cluster, agg.getInput.getRowType, nullLiterals)
- val logicalUnion = LogicalUnion.create(List(logicalValues, agg.getInput), true)
-
- new DataSetAggregate(
- cluster,
- traitSet,
- RelOptRule.convert(logicalUnion, DataSetConvention.INSTANCE),
- agg.getNamedAggCalls,
- rel.getRowType,
- agg.getInput.getRowType,
- agg.getGroupSet.toArray
- )
- }
-}
-
-object DataSetAggregateWithNullValuesRule {
- val INSTANCE: RelOptRule = new DataSetAggregateWithNullValuesRule
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala
deleted file mode 100644
index 88e74a9..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalCalc
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetCalc, DataSetConvention}
-
-class DataSetCalcRule
- extends ConverterRule(
- classOf[LogicalCalc],
- Convention.NONE,
- DataSetConvention.INSTANCE,
- "DataSetCalcRule")
- {
-
- def convert(rel: RelNode): RelNode = {
- val calc: LogicalCalc = rel.asInstanceOf[LogicalCalc]
- val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
- val convInput: RelNode = RelOptRule.convert(calc.getInput, DataSetConvention.INSTANCE)
-
- new DataSetCalc(
- rel.getCluster,
- traitSet,
- convInput,
- rel.getRowType,
- calc.getProgram,
- description)
- }
- }
-
-object DataSetCalcRule {
- val INSTANCE: RelOptRule = new DataSetCalcRule
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala
deleted file mode 100644
index 39756be..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.volcano.RelSubset
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan}
-import org.apache.calcite.rex.RexNode
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetCorrelate}
-
-/**
- * Rule to convert a LogicalCorrelate into a DataSetCorrelate.
- */
-class DataSetCorrelateRule
- extends ConverterRule(
- classOf[LogicalCorrelate],
- Convention.NONE,
- DataSetConvention.INSTANCE,
- "DataSetCorrelateRule") {
-
- override def matches(call: RelOptRuleCall): Boolean = {
- val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate]
- val right = join.getRight.asInstanceOf[RelSubset].getOriginal
-
-
- right match {
- // right node is a table function
- case scan: LogicalTableFunctionScan => true
- // a filter is pushed above the table function
- case filter: LogicalFilter =>
- filter
- .getInput.asInstanceOf[RelSubset]
- .getOriginal
- .isInstanceOf[LogicalTableFunctionScan]
- case _ => false
- }
- }
-
- override def convert(rel: RelNode): RelNode = {
- val join: LogicalCorrelate = rel.asInstanceOf[LogicalCorrelate]
- val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
- val convInput: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE)
- val right: RelNode = join.getInput(1)
-
- def convertToCorrelate(relNode: RelNode, condition: Option[RexNode]): DataSetCorrelate = {
- relNode match {
- case rel: RelSubset =>
- convertToCorrelate(rel.getRelList.get(0), condition)
-
- case filter: LogicalFilter =>
- convertToCorrelate(
- filter.getInput.asInstanceOf[RelSubset].getOriginal,
- Some(filter.getCondition))
-
- case scan: LogicalTableFunctionScan =>
- new DataSetCorrelate(
- rel.getCluster,
- traitSet,
- convInput,
- scan,
- condition,
- rel.getRowType,
- join.getRowType,
- join.getJoinType,
- description)
- }
- }
- convertToCorrelate(right, None)
- }
- }
-
-object DataSetCorrelateRule {
- val INSTANCE: RelOptRule = new DataSetCorrelateRule
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala
deleted file mode 100644
index c0e3269..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalIntersect
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetIntersect, DataSetConvention}
-
-class DataSetIntersectRule
- extends ConverterRule(
- classOf[LogicalIntersect],
- Convention.NONE,
- DataSetConvention.INSTANCE,
- "DataSetIntersectRule")
-{
-
- def convert(rel: RelNode): RelNode = {
-
- val intersect: LogicalIntersect = rel.asInstanceOf[LogicalIntersect]
- val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
- val convLeft: RelNode = RelOptRule.convert(intersect.getInput(0), DataSetConvention.INSTANCE)
- val convRight: RelNode = RelOptRule.convert(intersect.getInput(1), DataSetConvention.INSTANCE)
-
- new DataSetIntersect(
- rel.getCluster,
- traitSet,
- convLeft,
- convRight,
- rel.getRowType,
- intersect.all)
- }
-}
-
-object DataSetIntersectRule {
- val INSTANCE: RelOptRule = new DataSetIntersectRule
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala
deleted file mode 100644
index 3fab8bf..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalJoin
-
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetJoin, DataSetConvention}
-
-import scala.collection.JavaConversions._
-
-class DataSetJoinRule
- extends ConverterRule(
- classOf[LogicalJoin],
- Convention.NONE,
- DataSetConvention.INSTANCE,
- "DataSetJoinRule") {
-
- override def matches(call: RelOptRuleCall): Boolean = {
- val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin]
-
- val joinInfo = join.analyzeCondition
-
- // joins require an equi-condition or a conjunctive predicate with at least one equi-condition
- !joinInfo.pairs().isEmpty
- }
-
- override def convert(rel: RelNode): RelNode = {
-
- val join: LogicalJoin = rel.asInstanceOf[LogicalJoin]
- val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
- val convLeft: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE)
- val convRight: RelNode = RelOptRule.convert(join.getInput(1), DataSetConvention.INSTANCE)
- val joinInfo = join.analyzeCondition
-
- new DataSetJoin(
- rel.getCluster,
- traitSet,
- convLeft,
- convRight,
- rel.getRowType,
- join.getCondition,
- join.getRowType,
- joinInfo,
- joinInfo.pairs.toList,
- join.getJoinType,
- null,
- description)
- }
-
-}
-
-object DataSetJoinRule {
- val INSTANCE: RelOptRule = new DataSetJoinRule
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetMinusRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetMinusRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetMinusRule.scala
deleted file mode 100644
index 44bead0..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetMinusRule.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalMinus
-import org.apache.calcite.rel.rules.UnionToDistinctRule
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetMinus}
-
-class DataSetMinusRule
- extends ConverterRule(
- classOf[LogicalMinus],
- Convention.NONE,
- DataSetConvention.INSTANCE,
- "DataSetMinusRule")
-{
-
- def convert(rel: RelNode): RelNode = {
-
- val minus: LogicalMinus = rel.asInstanceOf[LogicalMinus]
- val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
- val convLeft: RelNode = RelOptRule.convert(minus.getInput(0), DataSetConvention.INSTANCE)
- val convRight: RelNode = RelOptRule.convert(minus.getInput(1), DataSetConvention.INSTANCE)
-
- new DataSetMinus(
- rel.getCluster,
- traitSet,
- convLeft,
- convRight,
- rel.getRowType,
- minus.all)
- }
-}
-
-object DataSetMinusRule {
- val INSTANCE: RelOptRule = new DataSetMinusRule
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala
deleted file mode 100644
index 7477690..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.core.TableScan
-import org.apache.calcite.rel.logical.LogicalTableScan
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetScan}
-import org.apache.flink.api.table.plan.schema.DataSetTable
-
-class DataSetScanRule
- extends ConverterRule(
- classOf[LogicalTableScan],
- Convention.NONE,
- DataSetConvention.INSTANCE,
- "DataSetScanRule")
- {
-
- /**
- * If the input is not a DataSetTable, we want the TableScanRule to match instead
- */
- override def matches(call: RelOptRuleCall): Boolean = {
- val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
- val dataSetTable = scan.getTable.unwrap(classOf[DataSetTable[Any]])
- dataSetTable match {
- case _: DataSetTable[Any] =>
- true
- case _ =>
- false
- }
- }
-
- def convert(rel: RelNode): RelNode = {
- val scan: TableScan = rel.asInstanceOf[TableScan]
- val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-
- new DataSetScan(
- rel.getCluster,
- traitSet,
- scan.getTable,
- rel.getRowType
- )
- }
-}
-
-object DataSetScanRule {
- val INSTANCE: RelOptRule = new DataSetScanRule
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
deleted file mode 100644
index 8109fcf..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.volcano.RelSubset
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.core.JoinRelType
-import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin}
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSingleRowJoin}
-
-class DataSetSingleRowJoinRule
- extends ConverterRule(
- classOf[LogicalJoin],
- Convention.NONE,
- DataSetConvention.INSTANCE,
- "DataSetSingleRowCrossRule") {
-
- override def matches(call: RelOptRuleCall): Boolean = {
- val join = call.rel(0).asInstanceOf[LogicalJoin]
-
- if (isInnerJoin(join)) {
- isGlobalAggregation(join.getRight.asInstanceOf[RelSubset].getOriginal) ||
- isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal)
- } else {
- false
- }
- }
-
- private def isInnerJoin(join: LogicalJoin) = {
- join.getJoinType == JoinRelType.INNER
- }
-
- private def isGlobalAggregation(node: RelNode) = {
- node.isInstanceOf[LogicalAggregate] &&
- isSingleRow(node.asInstanceOf[LogicalAggregate])
- }
-
- private def isSingleRow(agg: LogicalAggregate) = {
- agg.getGroupSet.isEmpty
- }
-
- override def convert(rel: RelNode): RelNode = {
- val join = rel.asInstanceOf[LogicalJoin]
- val traitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
- val dataSetLeftNode = RelOptRule.convert(join.getLeft, DataSetConvention.INSTANCE)
- val dataSetRightNode = RelOptRule.convert(join.getRight, DataSetConvention.INSTANCE)
- val leftIsSingle = isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal)
-
- new DataSetSingleRowJoin(
- rel.getCluster,
- traitSet,
- dataSetLeftNode,
- dataSetRightNode,
- leftIsSingle,
- rel.getRowType,
- join.getCondition,
- join.getRowType,
- description)
- }
-}
-
-object DataSetSingleRowJoinRule {
- val INSTANCE: RelOptRule = new DataSetSingleRowJoinRule
-}