You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:47 UTC
[18/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSortRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSortRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSortRule.scala
new file mode 100644
index 0000000..14b9459
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSortRule.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{LogicalJoin, LogicalSort}
+import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetSort}
+
+class DataSetSortRule
+ extends ConverterRule(
+ classOf[LogicalSort],
+ Convention.NONE,
+ DataSetConvention.INSTANCE,
+ "DataSetSortRule") {
+
+ override def convert(rel: RelNode): RelNode = {
+
+ val sort: LogicalSort = rel.asInstanceOf[LogicalSort]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+ val convInput: RelNode = RelOptRule.convert(sort.getInput, DataSetConvention.INSTANCE)
+
+ new DataSetSort(
+ rel.getCluster,
+ traitSet,
+ convInput,
+ sort.getCollation,
+ rel.getRowType,
+ sort.offset,
+ sort.fetch
+ )
+ }
+}
+
+object DataSetSortRule {
+ val INSTANCE: RelOptRule = new DataSetSortRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetUnionRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetUnionRule.scala
new file mode 100644
index 0000000..e24f477
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetUnionRule.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalUnion
+import org.apache.calcite.rel.rules.UnionToDistinctRule
+import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetUnion}
+
+class DataSetUnionRule
+ extends ConverterRule(
+ classOf[LogicalUnion],
+ Convention.NONE,
+ DataSetConvention.INSTANCE,
+ "DataSetUnionRule")
+ {
+
+ /**
+ * Only translate UNION ALL.
+ * Note: A distinct Union are translated into
+ * an Aggregate on top of a UNION ALL by [[UnionToDistinctRule]]
+ */
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val union: LogicalUnion = call.rel(0).asInstanceOf[LogicalUnion]
+ union.all
+ }
+
+ def convert(rel: RelNode): RelNode = {
+
+ val union: LogicalUnion = rel.asInstanceOf[LogicalUnion]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+ val convLeft: RelNode = RelOptRule.convert(union.getInput(0), DataSetConvention.INSTANCE)
+ val convRight: RelNode = RelOptRule.convert(union.getInput(1), DataSetConvention.INSTANCE)
+
+ new DataSetUnion(
+ rel.getCluster,
+ traitSet,
+ convLeft,
+ convRight,
+ rel.getRowType)
+ }
+}
+
+object DataSetUnionRule {
+ val INSTANCE: RelOptRule = new DataSetUnionRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetValuesRule.scala
new file mode 100644
index 0000000..8ecdc74
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetValuesRule.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{RelOptRule, RelTraitSet, Convention}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalValues
+import org.apache.flink.table.plan.nodes.dataset.{DataSetValues, DataSetConvention}
+
+class DataSetValuesRule
+ extends ConverterRule(
+ classOf[LogicalValues],
+ Convention.NONE,
+ DataSetConvention.INSTANCE,
+ "DataSetValuesRule")
+{
+
+ def convert(rel: RelNode): RelNode = {
+
+ val values: LogicalValues = rel.asInstanceOf[LogicalValues]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+
+ new DataSetValues(
+ rel.getCluster,
+ traitSet,
+ rel.getRowType,
+ values.getTuples,
+ description)
+ }
+}
+
+object DataSetValuesRule {
+ val INSTANCE: RelOptRule = new DataSetValuesRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
new file mode 100644
index 0000000..5d91c62
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc}
+import org.apache.flink.table.plan.rules.util.RexProgramProjectExtractor._
+import org.apache.flink.table.sources.{BatchTableSource, ProjectableTableSource}
+
+/**
+ * This rule tries to push projections into a BatchTableSourceScan.
+ */
+class PushProjectIntoBatchTableSourceScanRule extends RelOptRule(
+ operand(classOf[DataSetCalc],
+ operand(classOf[BatchTableSourceScan], none)),
+ "PushProjectIntoBatchTableSourceScanRule") {
+
+ override def matches(call: RelOptRuleCall) = {
+ val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
+ scan.tableSource match {
+ case _: ProjectableTableSource[_] => true
+ case _ => false
+ }
+ }
+
+ override def onMatch(call: RelOptRuleCall) {
+ val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
+ val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
+
+ val usedFields: Array[Int] = extractRefInputFields(calc.calcProgram)
+
+ // if no fields can be projected, there is no need to transform subtree
+ if (scan.tableSource.getNumberOfFields != usedFields.length) {
+ val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]]
+ val newTableSource = originTableSource.projectFields(usedFields)
+ val newScan = new BatchTableSourceScan(
+ scan.getCluster,
+ scan.getTraitSet,
+ scan.getTable,
+ newTableSource.asInstanceOf[BatchTableSource[_]])
+
+ val newCalcProgram = rewriteRexProgram(
+ calc.calcProgram,
+ newScan.getRowType,
+ usedFields,
+ calc.getCluster.getRexBuilder)
+
+ // if project merely returns its input and doesn't exist filter, remove datasetCalc nodes
+ if (newCalcProgram.isTrivial) {
+ call.transformTo(newScan)
+ } else {
+ val newCalc = new DataSetCalc(
+ calc.getCluster,
+ calc.getTraitSet,
+ newScan,
+ calc.getRowType,
+ newCalcProgram,
+ description)
+ call.transformTo(newCalc)
+ }
+ }
+ }
+}
+
+object PushProjectIntoBatchTableSourceScanRule {
+ val INSTANCE: RelOptRule = new PushProjectIntoBatchTableSourceScanRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
new file mode 100644
index 0000000..bf8a18e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.Alias
+import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamAggregate, DataStreamConvention}
+
+import scala.collection.JavaConversions._
+
+class DataStreamAggregateRule
+ extends ConverterRule(
+ classOf[LogicalWindowAggregate],
+ Convention.NONE,
+ DataStreamConvention.INSTANCE,
+ "DataStreamAggregateRule")
+ {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val agg: LogicalWindowAggregate = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+ // check if we have distinct aggregates
+ val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
+ if (distinctAggs) {
+ throw TableException("DISTINCT aggregates are currently not supported.")
+ }
+
+ // check if we have grouping sets
+ val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
+ if (groupSets || agg.indicator) {
+ throw TableException("GROUPING SETS are currently not supported.")
+ }
+
+ !distinctAggs && !groupSets && !agg.indicator
+ }
+
+ override def convert(rel: RelNode): RelNode = {
+ val agg: LogicalWindowAggregate = rel.asInstanceOf[LogicalWindowAggregate]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+ val convInput: RelNode = RelOptRule.convert(agg.getInput, DataStreamConvention.INSTANCE)
+
+ new DataStreamAggregate(
+ agg.getWindow,
+ agg.getNamedProperties,
+ rel.getCluster,
+ traitSet,
+ convInput,
+ agg.getNamedAggCalls,
+ rel.getRowType,
+ agg.getInput.getRowType,
+ agg.getGroupSet.toArray)
+ }
+ }
+
+object DataStreamAggregateRule {
+ val INSTANCE: RelOptRule = new DataStreamAggregateRule
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala
new file mode 100644
index 0000000..4e620c9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalCalc
+import org.apache.flink.table.plan.nodes.datastream.DataStreamCalc
+import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
+
+class DataStreamCalcRule
+ extends ConverterRule(
+ classOf[LogicalCalc],
+ Convention.NONE,
+ DataStreamConvention.INSTANCE,
+ "DataStreamCalcRule")
+{
+
+ def convert(rel: RelNode): RelNode = {
+ val calc: LogicalCalc = rel.asInstanceOf[LogicalCalc]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+ val convInput: RelNode = RelOptRule.convert(calc.getInput, DataStreamConvention.INSTANCE)
+
+ new DataStreamCalc(
+ rel.getCluster,
+ traitSet,
+ convInput,
+ rel.getRowType,
+ calc.getProgram,
+ description)
+ }
+}
+
+object DataStreamCalcRule {
+ val INSTANCE: RelOptRule = new DataStreamCalcRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala
new file mode 100644
index 0000000..adce9f4
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan}
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
+import org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate
+
+/**
+ * Rule to convert a LogicalCorrelate into a DataStreamCorrelate.
+ */
+class DataStreamCorrelateRule
+ extends ConverterRule(
+ classOf[LogicalCorrelate],
+ Convention.NONE,
+ DataStreamConvention.INSTANCE,
+ "DataStreamCorrelateRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate]
+ val right = join.getRight.asInstanceOf[RelSubset].getOriginal
+
+ right match {
+ // right node is a table function
+ case scan: LogicalTableFunctionScan => true
+ // a filter is pushed above the table function
+ case filter: LogicalFilter =>
+ filter
+ .getInput.asInstanceOf[RelSubset]
+ .getOriginal
+ .isInstanceOf[LogicalTableFunctionScan]
+ case _ => false
+ }
+ }
+
+ override def convert(rel: RelNode): RelNode = {
+ val join: LogicalCorrelate = rel.asInstanceOf[LogicalCorrelate]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+ val convInput: RelNode = RelOptRule.convert(join.getInput(0), DataStreamConvention.INSTANCE)
+ val right: RelNode = join.getInput(1)
+
+ def convertToCorrelate(relNode: RelNode, condition: Option[RexNode]): DataStreamCorrelate = {
+ relNode match {
+ case rel: RelSubset =>
+ convertToCorrelate(rel.getRelList.get(0), condition)
+
+ case filter: LogicalFilter =>
+ convertToCorrelate(
+ filter.getInput.asInstanceOf[RelSubset].getOriginal,
+ Some(filter.getCondition))
+
+ case scan: LogicalTableFunctionScan =>
+ new DataStreamCorrelate(
+ rel.getCluster,
+ traitSet,
+ convInput,
+ scan,
+ condition,
+ rel.getRowType,
+ join.getRowType,
+ join.getJoinType,
+ description)
+ }
+ }
+ convertToCorrelate(right, None)
+ }
+
+}
+
+object DataStreamCorrelateRule {
+ val INSTANCE: RelOptRule = new DataStreamCorrelateRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala
new file mode 100644
index 0000000..91fd6e2
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
+import org.apache.flink.table.plan.nodes.datastream.DataStreamScan
+import org.apache.flink.table.plan.schema.DataStreamTable
+
+class DataStreamScanRule
+ extends ConverterRule(
+ classOf[LogicalTableScan],
+ Convention.NONE,
+ DataStreamConvention.INSTANCE,
+ "DataStreamScanRule")
+{
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
+ val dataSetTable = scan.getTable.unwrap(classOf[DataStreamTable[Any]])
+ dataSetTable match {
+ case _: DataStreamTable[Any] =>
+ true
+ case _ =>
+ false
+ }
+ }
+
+ def convert(rel: RelNode): RelNode = {
+ val scan: LogicalTableScan = rel.asInstanceOf[LogicalTableScan]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+
+ new DataStreamScan(
+ rel.getCluster,
+ traitSet,
+ scan.getTable,
+ rel.getRowType
+ )
+ }
+}
+
+object DataStreamScanRule {
+ val INSTANCE: RelOptRule = new DataStreamScanRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala
new file mode 100644
index 0000000..475c050
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalUnion
+import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
+import org.apache.flink.table.plan.nodes.datastream.DataStreamUnion
+
+class DataStreamUnionRule
+ extends ConverterRule(
+ classOf[LogicalUnion],
+ Convention.NONE,
+ DataStreamConvention.INSTANCE,
+ "DataStreamUnionRule")
+{
+
+ def convert(rel: RelNode): RelNode = {
+ val union: LogicalUnion = rel.asInstanceOf[LogicalUnion]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+ val convLeft: RelNode = RelOptRule.convert(union.getInput(0), DataStreamConvention.INSTANCE)
+ val convRight: RelNode = RelOptRule.convert(union.getInput(1), DataStreamConvention.INSTANCE)
+
+ new DataStreamUnion(
+ rel.getCluster,
+ traitSet,
+ convLeft,
+ convRight,
+ rel.getRowType)
+ }
+}
+
+object DataStreamUnionRule {
+ val INSTANCE: RelOptRule = new DataStreamUnionRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala
new file mode 100644
index 0000000..db33842
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalValues
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamValues, DataStreamConvention}
+
+class DataStreamValuesRule
+ extends ConverterRule(
+ classOf[LogicalValues],
+ Convention.NONE,
+ DataStreamConvention.INSTANCE,
+ "DataStreamValuesRule")
+{
+
+ def convert(rel: RelNode): RelNode = {
+
+ val values: LogicalValues = rel.asInstanceOf[LogicalValues]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+
+ new DataStreamValues(
+ rel.getCluster,
+ traitSet,
+ rel.getRowType,
+ values.getTuples,
+ description)
+ }
+}
+
+object DataStreamValuesRule {
+ val INSTANCE: RelOptRule = new DataStreamValuesRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
new file mode 100644
index 0000000..296c86b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.flink.table.plan.nodes.datastream.
+ {StreamTableSourceScan, DataStreamConvention}
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.sources.StreamTableSource
+
+/** Rule to convert a [[LogicalTableScan]] into a [[StreamTableSourceScan]]. */
+class StreamTableSourceScanRule
+ extends ConverterRule(
+ classOf[LogicalTableScan],
+ Convention.NONE,
+ DataStreamConvention.INSTANCE,
+ "StreamTableSourceScanRule")
+{
+
+ /** Rule must only match if TableScan targets a [[StreamTableSource]] */
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
+ val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable])
+ dataSetTable match {
+ case tst: TableSourceTable =>
+ tst.tableSource match {
+ case _: StreamTableSource[_] =>
+ true
+ case _ =>
+ false
+ }
+ case _ =>
+ false
+ }
+ }
+
+ def convert(rel: RelNode): RelNode = {
+ val scan: LogicalTableScan = rel.asInstanceOf[LogicalTableScan]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+
+ // The original registered table source
+ val table: TableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable])
+ val tableSource: StreamTableSource[_] = table.tableSource.asInstanceOf[StreamTableSource[_]]
+
+ new StreamTableSourceScan(
+ rel.getCluster,
+ traitSet,
+ scan.getTable,
+ tableSource
+ )
+ }
+}
+
+object StreamTableSourceScanRule {
+ val INSTANCE: RelOptRule = new StreamTableSourceScanRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractor.scala
new file mode 100644
index 0000000..129cfd1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractor.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.util
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex._
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+object RexProgramProjectExtractor {
+
+ /**
+ * Extracts the indexes of input fields accessed by the RexProgram.
+ *
+ * @param rexProgram The RexProgram to analyze
+ * @return The indexes of accessed input fields
+ */
+ def extractRefInputFields(rexProgram: RexProgram): Array[Int] = {
+ val visitor = new RefFieldsVisitor
+ // extract input fields from project expressions
+ rexProgram.getProjectList.foreach(exp => rexProgram.expandLocalRef(exp).accept(visitor))
+ val condition = rexProgram.getCondition
+ // extract input fields from condition expression
+ if (condition != null) {
+ rexProgram.expandLocalRef(condition).accept(visitor)
+ }
+ visitor.getFields
+ }
+
+ /**
+ * Generates a new RexProgram based on mapped input fields.
+ *
+ * @param rexProgram original RexProgram
+ * @param inputRowType input row type
+ * @param usedInputFields indexes of used input fields
+ * @param rexBuilder builder for Rex expressions
+ *
+ * @return A RexProgram with mapped input field expressions.
+ */
+ def rewriteRexProgram(
+ rexProgram: RexProgram,
+ inputRowType: RelDataType,
+ usedInputFields: Array[Int],
+ rexBuilder: RexBuilder): RexProgram = {
+
+ val inputRewriter = new InputRewriter(usedInputFields)
+ val newProjectExpressions = rexProgram.getProjectList.map(
+ exp => rexProgram.expandLocalRef(exp).accept(inputRewriter)
+ ).toList.asJava
+
+ val oldCondition = rexProgram.getCondition
+ val newConditionExpression = {
+ oldCondition match {
+ case ref: RexLocalRef => rexProgram.expandLocalRef(ref).accept(inputRewriter)
+ case _ => null // null does not match any type
+ }
+ }
+ RexProgram.create(
+ inputRowType,
+ newProjectExpressions,
+ newConditionExpression,
+ rexProgram.getOutputRowType,
+ rexBuilder
+ )
+ }
+}
+
+/**
+ * A RexVisitor to extract used input fields
+ */
+class RefFieldsVisitor extends RexVisitorImpl[Unit](true) {
+ private var fields = mutable.LinkedHashSet[Int]()
+
+ def getFields: Array[Int] = fields.toArray
+
+ override def visitInputRef(inputRef: RexInputRef): Unit = fields += inputRef.getIndex
+
+ override def visitCall(call: RexCall): Unit =
+ call.operands.foreach(operand => operand.accept(this))
+}
+
+/**
+ * A RexShuttle to rewrite field accesses of a RexProgram.
+ *
+ * @param fields fields mapping
+ */
+class InputRewriter(fields: Array[Int]) extends RexShuttle {
+
+ /** old input fields ref index -> new input fields ref index mappings */
+ private val fieldMap: Map[Int, Int] =
+ fields.zipWithIndex.toMap
+
+ override def visitInputRef(inputRef: RexInputRef): RexNode =
+ new RexInputRef(relNodeIndex(inputRef), inputRef.getType)
+
+ override def visitLocalRef(localRef: RexLocalRef): RexNode =
+ new RexInputRef(relNodeIndex(localRef), localRef.getType)
+
+ private def relNodeIndex(ref: RexSlot): Int =
+ fieldMap.getOrElse(ref.getIndex,
+ throw new IllegalArgumentException("input field contains invalid index"))
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
new file mode 100644
index 0000000..f7d9e1d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql.`type`.ArraySqlType
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+/**
+ * Flink distinguishes between primitive arrays (int[], double[], ...) and
+ * object arrays (Integer[], MyPojo[], ...). This custom type supports both cases.
+ */
+class ArrayRelDataType(
+ val typeInfo: TypeInformation[_],
+ elementType: RelDataType,
+ isNullable: Boolean)
+ extends ArraySqlType(
+ elementType,
+ isNullable) {
+
+ override def toString = s"ARRAY($typeInfo)"
+
+ def canEqual(other: Any): Boolean = other.isInstanceOf[ArrayRelDataType]
+
+ override def equals(other: Any): Boolean = other match {
+ case that: ArrayRelDataType =>
+ super.equals(that) &&
+ (that canEqual this) &&
+ typeInfo == that.typeInfo
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ typeInfo.hashCode()
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
new file mode 100644
index 0000000..92f9199
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import java.util
+
+import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFieldImpl, RelRecordType}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.schema.CompositeRelDataType.createFieldList
+
+import scala.collection.JavaConverters._
+
+/**
+ * Composite type for encapsulating Flink's [[CompositeType]].
+ *
+ * @param compositeType CompositeType to encapsulate
+ * @param typeFactory Flink's type factory
+ */
+class CompositeRelDataType(
+ val compositeType: CompositeType[_],
+ typeFactory: FlinkTypeFactory)
+ extends RelRecordType(createFieldList(compositeType, typeFactory)) {
+
+ override def toString = s"COMPOSITE($compositeType)"
+
+ def canEqual(other: Any): Boolean = other.isInstanceOf[CompositeRelDataType]
+
+ override def equals(other: Any): Boolean = other match {
+ case that: CompositeRelDataType =>
+ super.equals(that) &&
+ (that canEqual this) &&
+ compositeType == that.compositeType
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ compositeType.hashCode()
+ }
+
+}
+
+object CompositeRelDataType {
+
+ /**
+ * Converts the fields of a composite type to list of [[RelDataTypeField]].
+ */
+ private def createFieldList(
+ compositeType: CompositeType[_],
+ typeFactory: FlinkTypeFactory)
+ : util.List[RelDataTypeField] = {
+
+ compositeType
+ .getFieldNames
+ .zipWithIndex
+ .map { case (name, index) =>
+ new RelDataTypeFieldImpl(
+ name,
+ index,
+ typeFactory.createTypeFromTypeInfo(compositeType.getTypeAt(index)))
+ .asInstanceOf[RelDataTypeField]
+ }
+ .toList
+ .asJava
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala
new file mode 100644
index 0000000..f8c6835
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import java.lang.Double
+import java.util
+import java.util.Collections
+
+import org.apache.calcite.rel.{RelCollation, RelDistribution}
+import org.apache.calcite.schema.Statistic
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.api.java.DataSet
+
+class DataSetTable[T](
+ val dataSet: DataSet[T],
+ override val fieldIndexes: Array[Int],
+ override val fieldNames: Array[String])
+ extends FlinkTable[T](dataSet.getType, fieldIndexes, fieldNames) {
+
+ override def getStatistic: Statistic = {
+ new DefaultDataSetStatistic
+ }
+
+}
+
+class DefaultDataSetStatistic extends Statistic {
+
+ override def getRowCount: Double = 1000d
+
+ override def getCollations: util.List[RelCollation] = Collections.emptyList()
+
+ override def isKey(columns: ImmutableBitSet): Boolean = false
+
+ override def getDistribution: RelDistribution = null
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
new file mode 100644
index 0000000..0355fac
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.flink.streaming.api.datastream.DataStream
+
+class DataStreamTable[T](
+ val dataStream: DataStream[T],
+ override val fieldIndexes: Array[Int],
+ override val fieldNames: Array[String])
+ extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames) {
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
new file mode 100644
index 0000000..8bb5c81
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+
+abstract class FlinkTable[T](
+ val typeInfo: TypeInformation[T],
+ val fieldIndexes: Array[Int],
+ val fieldNames: Array[String])
+ extends AbstractTable {
+
+ if (fieldIndexes.length != fieldNames.length) {
+ throw new TableException(
+ "Number of field indexes and field names must be equal.")
+ }
+
+ // check uniqueness of field names
+ if (fieldNames.length != fieldNames.toSet.size) {
+ throw new TableException(
+ "Table field names must be unique.")
+ }
+
+ val fieldTypes: Array[TypeInformation[_]] =
+ typeInfo match {
+ case cType: CompositeType[T] =>
+ if (fieldNames.length != cType.getArity) {
+ throw new TableException(
+ s"Arity of type (" + cType.getFieldNames.deep + ") " +
+ "not equal to number of field names " + fieldNames.deep + ".")
+ }
+ fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]])
+ case aType: AtomicType[T] =>
+ if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) {
+ throw new TableException(
+ "Non-composite input type may have only a single field and its index must be 0.")
+ }
+ Array(aType)
+ }
+
+ override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
+ val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
+ flinkTypeFactory.buildRowDataType(fieldNames, fieldTypes)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
new file mode 100644
index 0000000..1c05883
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.schema
+
+import java.lang.reflect.{Method, Type}
+import java.util
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.TableFunction
+import org.apache.calcite.schema.impl.ReflectiveFunctionBase
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+
+/**
+ * This is heavily inspired by Calcite's [[org.apache.calcite.schema.impl.TableFunctionImpl]].
+ * We need it in order to create a [[org.apache.flink.table.functions.utils.TableSqlFunction]].
+ * The main difference is that we override the [[getRowType()]] and [[getElementType()]].
+ */
+class FlinkTableFunctionImpl[T](
+ val typeInfo: TypeInformation[T],
+ val fieldIndexes: Array[Int],
+ val fieldNames: Array[String],
+ val evalMethod: Method)
+ extends ReflectiveFunctionBase(evalMethod)
+ with TableFunction {
+
+ if (fieldIndexes.length != fieldNames.length) {
+ throw new TableException(
+ "Number of field indexes and field names must be equal.")
+ }
+
+ // check uniqueness of field names
+ if (fieldNames.length != fieldNames.toSet.size) {
+ throw new TableException(
+ "Table field names must be unique.")
+ }
+
+ val fieldTypes: Array[TypeInformation[_]] =
+ typeInfo match {
+ case cType: CompositeType[T] =>
+ if (fieldNames.length != cType.getArity) {
+ throw new TableException(
+ s"Arity of type (" + cType.getFieldNames.deep + ") " +
+ "not equal to number of field names " + fieldNames.deep + ".")
+ }
+ fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]])
+ case aType: AtomicType[T] =>
+ if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) {
+ throw new TableException(
+ "Non-composite input type may have only a single field and its index must be 0.")
+ }
+ Array(aType)
+ }
+
+ override def getElementType(arguments: util.List[AnyRef]): Type = classOf[Array[Object]]
+
+ override def getRowType(typeFactory: RelDataTypeFactory,
+ arguments: util.List[AnyRef]): RelDataType = {
+ val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
+ val builder = flinkTypeFactory.builder
+ fieldNames
+ .zip(fieldTypes)
+ .foreach { f =>
+ builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2)).nullable(true)
+ }
+ builder.build
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
new file mode 100644
index 0000000..d93908b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.calcite.FlinkTypeSystem
+
+/**
+ * Generic type for encapsulating Flink's [[TypeInformation]].
+ *
+ * @param typeInfo TypeInformation to encapsulate
+ * @param typeSystem Flink's type system
+ */
+class GenericRelDataType(
+ val typeInfo: TypeInformation[_],
+ typeSystem: FlinkTypeSystem)
+ extends BasicSqlType(
+ typeSystem,
+ SqlTypeName.ANY) {
+
+ override def toString = s"ANY($typeInfo)"
+
+ def canEqual(other: Any): Boolean = other.isInstanceOf[GenericRelDataType]
+
+ override def equals(other: Any): Boolean = other match {
+ case that: GenericRelDataType =>
+ super.equals(that) &&
+ (that canEqual this) &&
+ typeInfo == that.typeInfo
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ typeInfo.hashCode()
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RelTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RelTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RelTable.scala
new file mode 100644
index 0000000..30052a8
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RelTable.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.plan.RelOptTable
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.Schema.TableType
+import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.calcite.schema.TranslatableTable
+
+/**
+ * A [[org.apache.calcite.schema.Table]] implementation for registering
+ * Table API Tables in the Calcite schema to be used by Flink SQL.
+ * It implements [[TranslatableTable]] so that its logical scan
+ * can be converted to a relational expression.
+ *
+ * @see [[DataSetTable]]
+ */
+class RelTable(relNode: RelNode) extends AbstractTable with TranslatableTable {
+
+ override def getJdbcTableType: TableType = ???
+
+ override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = relNode.getRowType
+
+ override def toRel(context: ToRelContext, relOptTable: RelOptTable): RelNode = {
+ relNode
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
new file mode 100644
index 0000000..0f55daf
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.sources.TableSource
+import org.apache.flink.types.Row
+
+/** Table which defines an external table via a [[TableSource]] */
+class TableSourceTable(val tableSource: TableSource[_])
+ extends FlinkTable[Row](
+ typeInfo = new RowTypeInfo(tableSource.getFieldTypes: _*),
+ fieldIndexes = 0.until(tableSource.getNumberOfFields).toArray,
+ fieldNames = tableSource.getFieldsNames)
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CountPartitionFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CountPartitionFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CountPartitionFunction.scala
new file mode 100644
index 0000000..9fe4ec3
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CountPartitionFunction.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichMapPartitionFunction
+import org.apache.flink.util.Collector
+
+class CountPartitionFunction[IN] extends RichMapPartitionFunction[IN, (Int, Long)] {
+
+ override def mapPartition(value: Iterable[IN], out: Collector[(Int, Long)]): Unit = {
+ val partitionIndex = getRuntimeContext.getIndexOfThisSubtask
+ var elementCount = 0L
+ val iterator = value.iterator()
+ while (iterator.hasNext) {
+ if (elementCount != Long.MaxValue) { // prevent overflow
+ elementCount += 1L
+ }
+ iterator.next()
+ }
+ out.collect(partitionIndex, elementCount)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala
new file mode 100644
index 0000000..715848d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.{FlatJoinFunction, RichFlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class FlatJoinRunner[IN1, IN2, OUT](
+ name: String,
+ code: String,
+ @transient returnType: TypeInformation[OUT])
+ extends RichFlatJoinFunction[IN1, IN2, OUT]
+ with ResultTypeQueryable[OUT]
+ with Compiler[FlatJoinFunction[IN1, IN2, OUT]] {
+
+ val LOG = LoggerFactory.getLogger(this.getClass)
+
+ private var function: FlatJoinFunction[IN1, IN2, OUT] = null
+
+ override def open(parameters: Configuration): Unit = {
+ LOG.debug(s"Compiling FlatJoinFunction: $name \n\n Code:\n$code")
+ val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+ LOG.debug("Instantiating FlatJoinFunction.")
+ function = clazz.newInstance()
+ }
+
+ override def join(first: IN1, second: IN2, out: Collector[OUT]): Unit =
+ function.join(first, second, out)
+
+ override def getProducedType: TypeInformation[OUT] = returnType
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
new file mode 100644
index 0000000..a7bd980
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class FlatMapRunner[IN, OUT](
+ name: String,
+ code: String,
+ @transient returnType: TypeInformation[OUT])
+ extends RichFlatMapFunction[IN, OUT]
+ with ResultTypeQueryable[OUT]
+ with Compiler[FlatMapFunction[IN, OUT]] {
+
+ val LOG = LoggerFactory.getLogger(this.getClass)
+
+ private var function: FlatMapFunction[IN, OUT] = _
+
+ override def open(parameters: Configuration): Unit = {
+ LOG.debug(s"Compiling FlatMapFunction: $name \n\n Code:\n$code")
+ val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+ LOG.debug("Instantiating FlatMapFunction.")
+ function = clazz.newInstance()
+ }
+
+ override def flatMap(in: IN, out: Collector[OUT]): Unit =
+ function.flatMap(in, out)
+
+ override def getProducedType: TypeInformation[OUT] = returnType
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/IntersectCoGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/IntersectCoGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/IntersectCoGroupFunction.scala
new file mode 100644
index 0000000..39d2914
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/IntersectCoGroupFunction.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.functions.CoGroupFunction
+import org.apache.flink.util.Collector
+
+class IntersectCoGroupFunction[T](all: Boolean) extends CoGroupFunction[T, T, T]{
+ override def coGroup(first: JIterable[T], second: JIterable[T], out: Collector[T]): Unit = {
+ if (first == null || second == null) return
+ val leftIter = first.iterator()
+ val rightIter = second.iterator()
+ if (all) {
+ while (leftIter.hasNext && rightIter.hasNext) {
+ out.collect(leftIter.next)
+ rightIter.next
+ }
+ } else {
+ if (leftIter.hasNext && rightIter.hasNext) {
+ out.collect(leftIter.next)
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/LimitFilterFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/LimitFilterFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/LimitFilterFunction.scala
new file mode 100644
index 0000000..8441245
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/LimitFilterFunction.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.configuration.Configuration
+
+import scala.collection.JavaConverters._
+
+
+class LimitFilterFunction[T](
+ limitStart: Long,
+ limitEnd: Long,
+ broadcastName: String)
+ extends RichFilterFunction[T] {
+
+ var partitionIndex: Int = _
+ var elementCount: Long = _
+ var countList: Array[Long] = _
+
+ override def open(config: Configuration) {
+ partitionIndex = getRuntimeContext.getIndexOfThisSubtask
+
+ val countPartitionResult = getRuntimeContext
+ .getBroadcastVariable[(Int, Long)](broadcastName)
+ .asScala
+
+ // sort by partition index, extract number per partition, sum with intermediate results
+ countList = countPartitionResult.sortWith(_._1 < _._1).map(_._2).scanLeft(0L) { case (a, b) =>
+ val sum = a + b
+ if (sum < 0L) { // prevent overflow
+ Long.MaxValue
+ }
+ sum
+ }.toArray
+
+ elementCount = 0
+ }
+
+ override def filter(value: T): Boolean = {
+ if (elementCount != Long.MaxValue) { // prevent overflow
+ elementCount += 1L
+ }
+ // we filter out records that are not within the limit (Long.MaxValue is unlimited)
+ limitStart - countList(partitionIndex) < elementCount &&
+ (limitEnd == Long.MaxValue || limitEnd - countList(partitionIndex) >= elementCount)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala
new file mode 100644
index 0000000..cf32404
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.util.Collector
+
+class MapJoinLeftRunner[IN1, IN2, OUT](
+ name: String,
+ code: String,
+ @transient returnType: TypeInformation[OUT],
+ broadcastSetName: String)
+ extends MapSideJoinRunner[IN1, IN2, IN2, IN1, OUT](name, code, returnType, broadcastSetName) {
+
+ override def flatMap(multiInput: IN1, out: Collector[OUT]): Unit =
+ function.join(multiInput, singleInput, out)
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala
new file mode 100644
index 0000000..c4bc0d1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.util.Collector
+
+class MapJoinRightRunner[IN1, IN2, OUT](
+ name: String,
+ code: String,
+ @transient returnType: TypeInformation[OUT],
+ broadcastSetName: String)
+ extends MapSideJoinRunner[IN1, IN2, IN1, IN2, OUT](name, code, returnType, broadcastSetName) {
+
+ override def flatMap(multiInput: IN2, out: Collector[OUT]): Unit =
+ function.join(singleInput, multiInput, out)
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
new file mode 100644
index 0000000..51e2fc5
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.configuration.Configuration
+import org.slf4j.LoggerFactory
+
+class MapRunner[IN, OUT](
+ name: String,
+ code: String,
+ @transient returnType: TypeInformation[OUT])
+ extends RichMapFunction[IN, OUT]
+ with ResultTypeQueryable[OUT]
+ with Compiler[MapFunction[IN, OUT]] {
+
+ val LOG = LoggerFactory.getLogger(this.getClass)
+
+ private var function: MapFunction[IN, OUT] = null
+
+ override def open(parameters: Configuration): Unit = {
+ LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
+ val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+ LOG.debug("Instantiating MapFunction.")
+ function = clazz.newInstance()
+ }
+
+ override def map(in: IN): OUT =
+ function.map(in)
+
+ override def getProducedType: TypeInformation[OUT] = returnType
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala
new file mode 100644
index 0000000..f12590f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.{FlatJoinFunction, RichFlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.configuration.Configuration
+import org.slf4j.LoggerFactory
+
+abstract class MapSideJoinRunner[IN1, IN2, SINGLE_IN, MULTI_IN, OUT](
+ name: String,
+ code: String,
+ @transient returnType: TypeInformation[OUT],
+ broadcastSetName: String)
+ extends RichFlatMapFunction[MULTI_IN, OUT]
+ with ResultTypeQueryable[OUT]
+ with Compiler[FlatJoinFunction[IN1, IN2, OUT]] {
+
+ val LOG = LoggerFactory.getLogger(this.getClass)
+
+ protected var function: FlatJoinFunction[IN1, IN2, OUT] = _
+ protected var singleInput: SINGLE_IN = _
+
+ override def open(parameters: Configuration): Unit = {
+ LOG.debug(s"Compiling FlatJoinFunction: $name \n\n Code:\n$code")
+ val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+ LOG.debug("Instantiating FlatJoinFunction.")
+ function = clazz.newInstance()
+ singleInput = getRuntimeContext.getBroadcastVariable(broadcastSetName).get(0)
+ }
+
+ override def getProducedType: TypeInformation[OUT] = returnType
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MinusCoGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MinusCoGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MinusCoGroupFunction.scala
new file mode 100644
index 0000000..41679ee
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MinusCoGroupFunction.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.CoGroupFunction
+import org.apache.flink.util.Collector
+
+class MinusCoGroupFunction[T](all: Boolean) extends CoGroupFunction[T, T, T] {
+ override def coGroup(first: Iterable[T], second: Iterable[T], out: Collector[T]): Unit = {
+ if (first == null || second == null) return
+ val leftIter = first.iterator
+ val rightIter = second.iterator
+
+ if (all) {
+ while (rightIter.hasNext && leftIter.hasNext) {
+ leftIter.next()
+ rightIter.next()
+ }
+
+ while (leftIter.hasNext) {
+ out.collect(leftIter.next())
+ }
+ } else {
+ if (!rightIter.hasNext && leftIter.hasNext) {
+ out.collect(leftIter.next())
+ }
+ }
+ }
+}
+