You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:47:01 UTC
[32/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
deleted file mode 100644
index 5c1fb53..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.{LogicalJoin, LogicalSort}
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSort}
-
-class DataSetSortRule
- extends ConverterRule(
- classOf[LogicalSort],
- Convention.NONE,
- DataSetConvention.INSTANCE,
- "DataSetSortRule") {
-
- override def convert(rel: RelNode): RelNode = {
-
- val sort: LogicalSort = rel.asInstanceOf[LogicalSort]
- val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
- val convInput: RelNode = RelOptRule.convert(sort.getInput, DataSetConvention.INSTANCE)
-
- new DataSetSort(
- rel.getCluster,
- traitSet,
- convInput,
- sort.getCollation,
- rel.getRowType,
- sort.offset,
- sort.fetch
- )
- }
-}
-
-object DataSetSortRule {
- val INSTANCE: RelOptRule = new DataSetSortRule
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala
deleted file mode 100644
index ea35637..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalUnion
-import org.apache.calcite.rel.rules.UnionToDistinctRule
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetUnion}
-
-class DataSetUnionRule
- extends ConverterRule(
- classOf[LogicalUnion],
- Convention.NONE,
- DataSetConvention.INSTANCE,
- "DataSetUnionRule")
- {
-
- /**
- * Only translate UNION ALL.
- * Note: A distinct Union are translated into
- * an Aggregate on top of a UNION ALL by [[UnionToDistinctRule]]
- */
- override def matches(call: RelOptRuleCall): Boolean = {
- val union: LogicalUnion = call.rel(0).asInstanceOf[LogicalUnion]
- union.all
- }
-
- def convert(rel: RelNode): RelNode = {
-
- val union: LogicalUnion = rel.asInstanceOf[LogicalUnion]
- val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
- val convLeft: RelNode = RelOptRule.convert(union.getInput(0), DataSetConvention.INSTANCE)
- val convRight: RelNode = RelOptRule.convert(union.getInput(1), DataSetConvention.INSTANCE)
-
- new DataSetUnion(
- rel.getCluster,
- traitSet,
- convLeft,
- convRight,
- rel.getRowType)
- }
-}
-
-object DataSetUnionRule {
- val INSTANCE: RelOptRule = new DataSetUnionRule
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala
deleted file mode 100644
index 3d6c0de..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.{RelOptRule, RelTraitSet, Convention}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalValues
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetValues, DataSetConvention}
-
-class DataSetValuesRule
- extends ConverterRule(
- classOf[LogicalValues],
- Convention.NONE,
- DataSetConvention.INSTANCE,
- "DataSetValuesRule")
-{
-
- def convert(rel: RelNode): RelNode = {
-
- val values: LogicalValues = rel.asInstanceOf[LogicalValues]
- val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-
- new DataSetValues(
- rel.getCluster,
- traitSet,
- rel.getRowType,
- values.getTuples,
- description)
- }
-}
-
-object DataSetValuesRule {
- val INSTANCE: RelOptRule = new DataSetValuesRule
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
deleted file mode 100644
index 301a45b..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.plan.RelOptRule.{none, operand}
-import org.apache.flink.api.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc}
-import org.apache.flink.api.table.plan.rules.util.RexProgramProjectExtractor._
-import org.apache.flink.api.table.sources.{BatchTableSource, ProjectableTableSource}
-
-/**
- * This rule tries to push projections into a BatchTableSourceScan.
- */
-class PushProjectIntoBatchTableSourceScanRule extends RelOptRule(
- operand(classOf[DataSetCalc],
- operand(classOf[BatchTableSourceScan], none)),
- "PushProjectIntoBatchTableSourceScanRule") {
-
- override def matches(call: RelOptRuleCall) = {
- val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
- scan.tableSource match {
- case _: ProjectableTableSource[_] => true
- case _ => false
- }
- }
-
- override def onMatch(call: RelOptRuleCall) {
- val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
- val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
-
- val usedFields: Array[Int] = extractRefInputFields(calc.calcProgram)
-
- // if no fields can be projected, there is no need to transform subtree
- if (scan.tableSource.getNumberOfFields != usedFields.length) {
- val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]]
- val newTableSource = originTableSource.projectFields(usedFields)
- val newScan = new BatchTableSourceScan(
- scan.getCluster,
- scan.getTraitSet,
- scan.getTable,
- newTableSource.asInstanceOf[BatchTableSource[_]])
-
- val newCalcProgram = rewriteRexProgram(
- calc.calcProgram,
- newScan.getRowType,
- usedFields,
- calc.getCluster.getRexBuilder)
-
- // if project merely returns its input and doesn't exist filter, remove datasetCalc nodes
- if (newCalcProgram.isTrivial) {
- call.transformTo(newScan)
- } else {
- val newCalc = new DataSetCalc(
- calc.getCluster,
- calc.getTraitSet,
- newScan,
- calc.getRowType,
- newCalcProgram,
- description)
- call.transformTo(newCalc)
- }
- }
- }
-}
-
-object PushProjectIntoBatchTableSourceScanRule {
- val INSTANCE: RelOptRule = new PushProjectIntoBatchTableSourceScanRule
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamAggregateRule.scala
deleted file mode 100644
index dff2adc..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamAggregateRule.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.datastream
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.flink.api.table.TableException
-import org.apache.flink.api.table.expressions.Alias
-import org.apache.flink.api.table.plan.logical.rel.LogicalWindowAggregate
-import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamAggregate, DataStreamConvention}
-
-import scala.collection.JavaConversions._
-
-class DataStreamAggregateRule
- extends ConverterRule(
- classOf[LogicalWindowAggregate],
- Convention.NONE,
- DataStreamConvention.INSTANCE,
- "DataStreamAggregateRule")
- {
-
- override def matches(call: RelOptRuleCall): Boolean = {
- val agg: LogicalWindowAggregate = call.rel(0).asInstanceOf[LogicalWindowAggregate]
-
- // check if we have distinct aggregates
- val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
- if (distinctAggs) {
- throw TableException("DISTINCT aggregates are currently not supported.")
- }
-
- // check if we have grouping sets
- val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
- if (groupSets || agg.indicator) {
- throw TableException("GROUPING SETS are currently not supported.")
- }
-
- !distinctAggs && !groupSets && !agg.indicator
- }
-
- override def convert(rel: RelNode): RelNode = {
- val agg: LogicalWindowAggregate = rel.asInstanceOf[LogicalWindowAggregate]
- val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
- val convInput: RelNode = RelOptRule.convert(agg.getInput, DataStreamConvention.INSTANCE)
-
- new DataStreamAggregate(
- agg.getWindow,
- agg.getNamedProperties,
- rel.getCluster,
- traitSet,
- convInput,
- agg.getNamedAggCalls,
- rel.getRowType,
- agg.getInput.getRowType,
- agg.getGroupSet.toArray)
- }
- }
-
-object DataStreamAggregateRule {
- val INSTANCE: RelOptRule = new DataStreamAggregateRule
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCalcRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCalcRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCalcRule.scala
deleted file mode 100644
index b62967a..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCalcRule.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.datastream
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalCalc
-import org.apache.flink.api.table.plan.nodes.datastream.DataStreamCalc
-import org.apache.flink.api.table.plan.nodes.datastream.DataStreamConvention
-
-class DataStreamCalcRule
- extends ConverterRule(
- classOf[LogicalCalc],
- Convention.NONE,
- DataStreamConvention.INSTANCE,
- "DataStreamCalcRule")
-{
-
- def convert(rel: RelNode): RelNode = {
- val calc: LogicalCalc = rel.asInstanceOf[LogicalCalc]
- val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
- val convInput: RelNode = RelOptRule.convert(calc.getInput, DataStreamConvention.INSTANCE)
-
- new DataStreamCalc(
- rel.getCluster,
- traitSet,
- convInput,
- rel.getRowType,
- calc.getProgram,
- description)
- }
-}
-
-object DataStreamCalcRule {
- val INSTANCE: RelOptRule = new DataStreamCalcRule
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala
deleted file mode 100644
index 554c6c1..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan.rules.datastream
-
-import org.apache.calcite.plan.volcano.RelSubset
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan}
-import org.apache.calcite.rex.RexNode
-import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamCorrelate, DataStreamConvention}
-
-/**
- * Rule to convert a LogicalCorrelate into a DataStreamCorrelate.
- */
-class DataStreamCorrelateRule
- extends ConverterRule(
- classOf[LogicalCorrelate],
- Convention.NONE,
- DataStreamConvention.INSTANCE,
- "DataStreamCorrelateRule") {
-
- override def matches(call: RelOptRuleCall): Boolean = {
- val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate]
- val right = join.getRight.asInstanceOf[RelSubset].getOriginal
-
- right match {
- // right node is a table function
- case scan: LogicalTableFunctionScan => true
- // a filter is pushed above the table function
- case filter: LogicalFilter =>
- filter
- .getInput.asInstanceOf[RelSubset]
- .getOriginal
- .isInstanceOf[LogicalTableFunctionScan]
- case _ => false
- }
- }
-
- override def convert(rel: RelNode): RelNode = {
- val join: LogicalCorrelate = rel.asInstanceOf[LogicalCorrelate]
- val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
- val convInput: RelNode = RelOptRule.convert(join.getInput(0), DataStreamConvention.INSTANCE)
- val right: RelNode = join.getInput(1)
-
- def convertToCorrelate(relNode: RelNode, condition: Option[RexNode]): DataStreamCorrelate = {
- relNode match {
- case rel: RelSubset =>
- convertToCorrelate(rel.getRelList.get(0), condition)
-
- case filter: LogicalFilter =>
- convertToCorrelate(
- filter.getInput.asInstanceOf[RelSubset].getOriginal,
- Some(filter.getCondition))
-
- case scan: LogicalTableFunctionScan =>
- new DataStreamCorrelate(
- rel.getCluster,
- traitSet,
- convInput,
- scan,
- condition,
- rel.getRowType,
- join.getRowType,
- join.getJoinType,
- description)
- }
- }
- convertToCorrelate(right, None)
- }
-
-}
-
-object DataStreamCorrelateRule {
- val INSTANCE: RelOptRule = new DataStreamCorrelateRule
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamScanRule.scala
deleted file mode 100644
index 62638bc..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamScanRule.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.datastream
-
-import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.core.TableScan
-import org.apache.calcite.rel.logical.LogicalTableScan
-import org.apache.flink.api.table.plan.nodes.datastream.DataStreamConvention
-import org.apache.flink.api.table.plan.nodes.datastream.DataStreamScan
-import org.apache.flink.api.table.plan.schema.DataStreamTable
-
-class DataStreamScanRule
- extends ConverterRule(
- classOf[LogicalTableScan],
- Convention.NONE,
- DataStreamConvention.INSTANCE,
- "DataStreamScanRule")
-{
-
- override def matches(call: RelOptRuleCall): Boolean = {
- val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
- val dataSetTable = scan.getTable.unwrap(classOf[DataStreamTable[Any]])
- dataSetTable match {
- case _: DataStreamTable[Any] =>
- true
- case _ =>
- false
- }
- }
-
- def convert(rel: RelNode): RelNode = {
- val scan: LogicalTableScan = rel.asInstanceOf[LogicalTableScan]
- val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
-
- new DataStreamScan(
- rel.getCluster,
- traitSet,
- scan.getTable,
- rel.getRowType
- )
- }
-}
-
-object DataStreamScanRule {
- val INSTANCE: RelOptRule = new DataStreamScanRule
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamUnionRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamUnionRule.scala
deleted file mode 100644
index 78a5486..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamUnionRule.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.datastream
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalUnion
-import org.apache.flink.api.table.plan.nodes.datastream.DataStreamConvention
-import org.apache.flink.api.table.plan.nodes.datastream.DataStreamUnion
-
-class DataStreamUnionRule
- extends ConverterRule(
- classOf[LogicalUnion],
- Convention.NONE,
- DataStreamConvention.INSTANCE,
- "DataStreamUnionRule")
-{
-
- def convert(rel: RelNode): RelNode = {
- val union: LogicalUnion = rel.asInstanceOf[LogicalUnion]
- val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
- val convLeft: RelNode = RelOptRule.convert(union.getInput(0), DataStreamConvention.INSTANCE)
- val convRight: RelNode = RelOptRule.convert(union.getInput(1), DataStreamConvention.INSTANCE)
-
- new DataStreamUnion(
- rel.getCluster,
- traitSet,
- convLeft,
- convRight,
- rel.getRowType)
- }
-}
-
-object DataStreamUnionRule {
- val INSTANCE: RelOptRule = new DataStreamUnionRule
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala
deleted file mode 100644
index 738642d..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.datastream
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalValues
-import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamValues, DataStreamConvention}
-
-class DataStreamValuesRule
- extends ConverterRule(
- classOf[LogicalValues],
- Convention.NONE,
- DataStreamConvention.INSTANCE,
- "DataStreamValuesRule")
-{
-
- def convert(rel: RelNode): RelNode = {
-
- val values: LogicalValues = rel.asInstanceOf[LogicalValues]
- val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
-
- new DataStreamValues(
- rel.getCluster,
- traitSet,
- rel.getRowType,
- values.getTuples,
- description)
- }
-}
-
-object DataStreamValuesRule {
- val INSTANCE: RelOptRule = new DataStreamValuesRule
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
deleted file mode 100644
index 91dd255..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.datastream
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.core.TableScan
-import org.apache.calcite.rel.logical.LogicalTableScan
-import org.apache.flink.api.table.plan.nodes.datastream.
- {StreamTableSourceScan, DataStreamConvention}
-import org.apache.flink.api.table.plan.schema.TableSourceTable
-import org.apache.flink.api.table.sources.StreamTableSource
-
-/** Rule to convert a [[LogicalTableScan]] into a [[StreamTableSourceScan]]. */
-class StreamTableSourceScanRule
- extends ConverterRule(
- classOf[LogicalTableScan],
- Convention.NONE,
- DataStreamConvention.INSTANCE,
- "StreamTableSourceScanRule")
-{
-
- /** Rule must only match if TableScan targets a [[StreamTableSource]] */
- override def matches(call: RelOptRuleCall): Boolean = {
- val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
- val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable])
- dataSetTable match {
- case tst: TableSourceTable =>
- tst.tableSource match {
- case _: StreamTableSource[_] =>
- true
- case _ =>
- false
- }
- case _ =>
- false
- }
- }
-
- def convert(rel: RelNode): RelNode = {
- val scan: LogicalTableScan = rel.asInstanceOf[LogicalTableScan]
- val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
-
- // The original registered table source
- val table: TableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable])
- val tableSource: StreamTableSource[_] = table.tableSource.asInstanceOf[StreamTableSource[_]]
-
- new StreamTableSourceScan(
- rel.getCluster,
- traitSet,
- scan.getTable,
- tableSource
- )
- }
-}
-
-object StreamTableSourceScanRule {
- val INSTANCE: RelOptRule = new StreamTableSourceScanRule
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractor.scala
deleted file mode 100644
index d78e07f..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractor.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.util
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex._
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable
-import scala.collection.JavaConverters._
-
-object RexProgramProjectExtractor {
-
- /**
- * Extracts the indexes of input fields accessed by the RexProgram.
- *
- * @param rexProgram The RexProgram to analyze
- * @return The indexes of accessed input fields
- */
- def extractRefInputFields(rexProgram: RexProgram): Array[Int] = {
- val visitor = new RefFieldsVisitor
- // extract input fields from project expressions
- rexProgram.getProjectList.foreach(exp => rexProgram.expandLocalRef(exp).accept(visitor))
- val condition = rexProgram.getCondition
- // extract input fields from condition expression
- if (condition != null) {
- rexProgram.expandLocalRef(condition).accept(visitor)
- }
- visitor.getFields
- }
-
- /**
- * Generates a new RexProgram based on mapped input fields.
- *
- * @param rexProgram original RexProgram
- * @param inputRowType input row type
- * @param usedInputFields indexes of used input fields
- * @param rexBuilder builder for Rex expressions
- *
- * @return A RexProgram with mapped input field expressions.
- */
- def rewriteRexProgram(
- rexProgram: RexProgram,
- inputRowType: RelDataType,
- usedInputFields: Array[Int],
- rexBuilder: RexBuilder): RexProgram = {
-
- val inputRewriter = new InputRewriter(usedInputFields)
- val newProjectExpressions = rexProgram.getProjectList.map(
- exp => rexProgram.expandLocalRef(exp).accept(inputRewriter)
- ).toList.asJava
-
- val oldCondition = rexProgram.getCondition
- val newConditionExpression = {
- oldCondition match {
- case ref: RexLocalRef => rexProgram.expandLocalRef(ref).accept(inputRewriter)
- case _ => null // null does not match any type
- }
- }
- RexProgram.create(
- inputRowType,
- newProjectExpressions,
- newConditionExpression,
- rexProgram.getOutputRowType,
- rexBuilder
- )
- }
-}
-
-/**
- * A RexVisitor to extract used input fields
- */
-class RefFieldsVisitor extends RexVisitorImpl[Unit](true) {
- private var fields = mutable.LinkedHashSet[Int]()
-
- def getFields: Array[Int] = fields.toArray
-
- override def visitInputRef(inputRef: RexInputRef): Unit = fields += inputRef.getIndex
-
- override def visitCall(call: RexCall): Unit =
- call.operands.foreach(operand => operand.accept(this))
-}
-
-/**
- * A RexShuttle to rewrite field accesses of a RexProgram.
- *
- * @param fields fields mapping
- */
-class InputRewriter(fields: Array[Int]) extends RexShuttle {
-
- /** old input fields ref index -> new input fields ref index mappings */
- private val fieldMap: Map[Int, Int] =
- fields.zipWithIndex.toMap
-
- override def visitInputRef(inputRef: RexInputRef): RexNode =
- new RexInputRef(relNodeIndex(inputRef), inputRef.getType)
-
- override def visitLocalRef(localRef: RexLocalRef): RexNode =
- new RexInputRef(relNodeIndex(localRef), localRef.getType)
-
- private def relNodeIndex(ref: RexSlot): Int =
- fieldMap.getOrElse(ref.getIndex,
- throw new IllegalArgumentException("input field contains invalid index"))
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/ArrayRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/ArrayRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/ArrayRelDataType.scala
deleted file mode 100644
index 92fcb83..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/ArrayRelDataType.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.schema
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.sql.`type`.ArraySqlType
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-/**
- * Flink distinguishes between primitive arrays (int[], double[], ...) and
- * object arrays (Integer[], MyPojo[], ...). This custom type supports both cases.
- */
-class ArrayRelDataType(
- val typeInfo: TypeInformation[_],
- elementType: RelDataType,
- isNullable: Boolean)
- extends ArraySqlType(
- elementType,
- isNullable) {
-
- override def toString = s"ARRAY($typeInfo)"
-
- def canEqual(other: Any): Boolean = other.isInstanceOf[ArrayRelDataType]
-
- override def equals(other: Any): Boolean = other match {
- case that: ArrayRelDataType =>
- super.equals(that) &&
- (that canEqual this) &&
- typeInfo == that.typeInfo
- case _ => false
- }
-
- override def hashCode(): Int = {
- typeInfo.hashCode()
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/CompositeRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/CompositeRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/CompositeRelDataType.scala
deleted file mode 100644
index b9ceff0..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/CompositeRelDataType.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.schema
-
-import java.util
-
-import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFieldImpl, RelRecordType}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.FlinkTypeFactory
-import org.apache.flink.api.table.plan.schema.CompositeRelDataType.createFieldList
-
-import scala.collection.JavaConverters._
-
-/**
- * Composite type for encapsulating Flink's [[CompositeType]].
- *
- * @param compositeType CompositeType to encapsulate
- * @param typeFactory Flink's type factory
- */
-class CompositeRelDataType(
- val compositeType: CompositeType[_],
- typeFactory: FlinkTypeFactory)
- extends RelRecordType(createFieldList(compositeType, typeFactory)) {
-
- override def toString = s"COMPOSITE($compositeType)"
-
- def canEqual(other: Any): Boolean = other.isInstanceOf[CompositeRelDataType]
-
- override def equals(other: Any): Boolean = other match {
- case that: CompositeRelDataType =>
- super.equals(that) &&
- (that canEqual this) &&
- compositeType == that.compositeType
- case _ => false
- }
-
- override def hashCode(): Int = {
- compositeType.hashCode()
- }
-
-}
-
-object CompositeRelDataType {
-
- /**
- * Converts the fields of a composite type to list of [[RelDataTypeField]].
- */
- private def createFieldList(
- compositeType: CompositeType[_],
- typeFactory: FlinkTypeFactory)
- : util.List[RelDataTypeField] = {
-
- compositeType
- .getFieldNames
- .zipWithIndex
- .map { case (name, index) =>
- new RelDataTypeFieldImpl(
- name,
- index,
- typeFactory.createTypeFromTypeInfo(compositeType.getTypeAt(index)))
- .asInstanceOf[RelDataTypeField]
- }
- .toList
- .asJava
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
deleted file mode 100644
index bbcba13..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.schema
-
-import java.lang.Double
-import java.util
-import java.util.Collections
-
-import org.apache.calcite.rel.{RelCollation, RelDistribution}
-import org.apache.calcite.schema.Statistic
-import org.apache.calcite.util.ImmutableBitSet
-import org.apache.flink.api.java.DataSet
-
-class DataSetTable[T](
- val dataSet: DataSet[T],
- override val fieldIndexes: Array[Int],
- override val fieldNames: Array[String])
- extends FlinkTable[T](dataSet.getType, fieldIndexes, fieldNames) {
-
- override def getStatistic: Statistic = {
- new DefaultDataSetStatistic
- }
-
-}
-
-class DefaultDataSetStatistic extends Statistic {
-
- override def getRowCount: Double = 1000d
-
- override def getCollations: util.List[RelCollation] = Collections.emptyList()
-
- override def isKey(columns: ImmutableBitSet): Boolean = false
-
- override def getDistribution: RelDistribution = null
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
deleted file mode 100644
index 570d723..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.schema
-
-import org.apache.flink.streaming.api.datastream.DataStream
-
-class DataStreamTable[T](
- val dataStream: DataStream[T],
- override val fieldIndexes: Array[Int],
- override val fieldNames: Array[String])
- extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames) {
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala
deleted file mode 100644
index 84d6d7e..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.schema
-
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
-import org.apache.calcite.schema.impl.AbstractTable
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.{FlinkTypeFactory, TableException}
-
-abstract class FlinkTable[T](
- val typeInfo: TypeInformation[T],
- val fieldIndexes: Array[Int],
- val fieldNames: Array[String])
- extends AbstractTable {
-
- if (fieldIndexes.length != fieldNames.length) {
- throw new TableException(
- "Number of field indexes and field names must be equal.")
- }
-
- // check uniqueness of field names
- if (fieldNames.length != fieldNames.toSet.size) {
- throw new TableException(
- "Table field names must be unique.")
- }
-
- val fieldTypes: Array[TypeInformation[_]] =
- typeInfo match {
- case cType: CompositeType[T] =>
- if (fieldNames.length != cType.getArity) {
- throw new TableException(
- s"Arity of type (" + cType.getFieldNames.deep + ") " +
- "not equal to number of field names " + fieldNames.deep + ".")
- }
- fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]])
- case aType: AtomicType[T] =>
- if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) {
- throw new TableException(
- "Non-composite input type may have only a single field and its index must be 0.")
- }
- Array(aType)
- }
-
- override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
- val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
- flinkTypeFactory.buildRowDataType(fieldNames, fieldTypes)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTableFunctionImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTableFunctionImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTableFunctionImpl.scala
deleted file mode 100644
index 540a5c8..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTableFunctionImpl.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan.schema
-
-import java.lang.reflect.{Method, Type}
-import java.util
-
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
-import org.apache.calcite.schema.TableFunction
-import org.apache.calcite.schema.impl.ReflectiveFunctionBase
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.{FlinkTypeFactory, TableException}
-
-/**
- * This is heavily inspired by Calcite's [[org.apache.calcite.schema.impl.TableFunctionImpl]].
- * We need it in order to create a [[org.apache.flink.api.table.functions.utils.TableSqlFunction]].
- * The main difference is that we override the [[getRowType()]] and [[getElementType()]].
- */
-class FlinkTableFunctionImpl[T](
- val typeInfo: TypeInformation[T],
- val fieldIndexes: Array[Int],
- val fieldNames: Array[String],
- val evalMethod: Method)
- extends ReflectiveFunctionBase(evalMethod)
- with TableFunction {
-
- if (fieldIndexes.length != fieldNames.length) {
- throw new TableException(
- "Number of field indexes and field names must be equal.")
- }
-
- // check uniqueness of field names
- if (fieldNames.length != fieldNames.toSet.size) {
- throw new TableException(
- "Table field names must be unique.")
- }
-
- val fieldTypes: Array[TypeInformation[_]] =
- typeInfo match {
- case cType: CompositeType[T] =>
- if (fieldNames.length != cType.getArity) {
- throw new TableException(
- s"Arity of type (" + cType.getFieldNames.deep + ") " +
- "not equal to number of field names " + fieldNames.deep + ".")
- }
- fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]])
- case aType: AtomicType[T] =>
- if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) {
- throw new TableException(
- "Non-composite input type may have only a single field and its index must be 0.")
- }
- Array(aType)
- }
-
- override def getElementType(arguments: util.List[AnyRef]): Type = classOf[Array[Object]]
-
- override def getRowType(typeFactory: RelDataTypeFactory,
- arguments: util.List[AnyRef]): RelDataType = {
- val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
- val builder = flinkTypeFactory.builder
- fieldNames
- .zip(fieldTypes)
- .foreach { f =>
- builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2)).nullable(true)
- }
- builder.build
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/GenericRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/GenericRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/GenericRelDataType.scala
deleted file mode 100644
index a3012d1..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/GenericRelDataType.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.schema
-
-import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.FlinkTypeSystem
-
-/**
- * Generic type for encapsulating Flink's [[TypeInformation]].
- *
- * @param typeInfo TypeInformation to encapsulate
- * @param typeSystem Flink's type system
- */
-class GenericRelDataType(
- val typeInfo: TypeInformation[_],
- typeSystem: FlinkTypeSystem)
- extends BasicSqlType(
- typeSystem,
- SqlTypeName.ANY) {
-
- override def toString = s"ANY($typeInfo)"
-
- def canEqual(other: Any): Boolean = other.isInstanceOf[GenericRelDataType]
-
- override def equals(other: Any): Boolean = other match {
- case that: GenericRelDataType =>
- super.equals(that) &&
- (that canEqual this) &&
- typeInfo == that.typeInfo
- case _ => false
- }
-
- override def hashCode(): Int = {
- typeInfo.hashCode()
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/RelTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/RelTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/RelTable.scala
deleted file mode 100644
index f952d83..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/RelTable.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.schema
-
-import org.apache.calcite.plan.RelOptTable
-import org.apache.calcite.plan.RelOptTable.ToRelContext
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
-import org.apache.calcite.schema.Schema.TableType
-import org.apache.calcite.schema.impl.AbstractTable
-import org.apache.calcite.schema.TranslatableTable
-
-/**
- * A [[org.apache.calcite.schema.Table]] implementation for registering
- * Table API Tables in the Calcite schema to be used by Flink SQL.
- * It implements [[TranslatableTable]] so that its logical scan
- * can be converted to a relational expression.
- *
- * @see [[DataSetTable]]
- */
-class RelTable(relNode: RelNode) extends AbstractTable with TranslatableTable {
-
- override def getJdbcTableType: TableType = ???
-
- override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = relNode.getRowType
-
- override def toRel(context: ToRelContext, relOptTable: RelOptTable): RelNode = {
- relNode
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala
deleted file mode 100644
index 72be00c..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.schema
-
-import org.apache.flink.types.Row
-import org.apache.flink.api.table.sources.TableSource
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-
-/** Table which defines an external table via a [[TableSource]] */
-class TableSourceTable(val tableSource: TableSource[_])
- extends FlinkTable[Row](
- typeInfo = new RowTypeInfo(tableSource.getFieldTypes: _*),
- fieldIndexes = 0.until(tableSource.getNumberOfFields).toArray,
- fieldNames = tableSource.getFieldsNames)
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
deleted file mode 100644
index 5896f4c..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime
-
-import java.lang.Iterable
-
-import org.apache.flink.api.common.functions.RichMapPartitionFunction
-import org.apache.flink.util.Collector
-
-class CountPartitionFunction[IN] extends RichMapPartitionFunction[IN, (Int, Long)] {
-
- override def mapPartition(value: Iterable[IN], out: Collector[(Int, Long)]): Unit = {
- val partitionIndex = getRuntimeContext.getIndexOfThisSubtask
- var elementCount = 0L
- val iterator = value.iterator()
- while (iterator.hasNext) {
- if (elementCount != Long.MaxValue) { // prevent overflow
- elementCount += 1L
- }
- iterator.next()
- }
- out.collect(partitionIndex, elementCount)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala
deleted file mode 100644
index 2e57a0f..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.common.functions.{FlatJoinFunction, RichFlatJoinFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.api.table.codegen.Compiler
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.util.Collector
-import org.slf4j.LoggerFactory
-
-class FlatJoinRunner[IN1, IN2, OUT](
- name: String,
- code: String,
- @transient returnType: TypeInformation[OUT])
- extends RichFlatJoinFunction[IN1, IN2, OUT]
- with ResultTypeQueryable[OUT]
- with Compiler[FlatJoinFunction[IN1, IN2, OUT]] {
-
- val LOG = LoggerFactory.getLogger(this.getClass)
-
- private var function: FlatJoinFunction[IN1, IN2, OUT] = null
-
- override def open(parameters: Configuration): Unit = {
- LOG.debug(s"Compiling FlatJoinFunction: $name \n\n Code:\n$code")
- val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
- LOG.debug("Instantiating FlatJoinFunction.")
- function = clazz.newInstance()
- }
-
- override def join(first: IN1, second: IN2, out: Collector[OUT]): Unit =
- function.join(first, second, out)
-
- override def getProducedType: TypeInformation[OUT] = returnType
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
deleted file mode 100644
index e228e2b..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.api.table.codegen.Compiler
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.util.Collector
-import org.slf4j.LoggerFactory
-
-class FlatMapRunner[IN, OUT](
- name: String,
- code: String,
- @transient returnType: TypeInformation[OUT])
- extends RichFlatMapFunction[IN, OUT]
- with ResultTypeQueryable[OUT]
- with Compiler[FlatMapFunction[IN, OUT]] {
-
- val LOG = LoggerFactory.getLogger(this.getClass)
-
- private var function: FlatMapFunction[IN, OUT] = _
-
- override def open(parameters: Configuration): Unit = {
- LOG.debug(s"Compiling FlatMapFunction: $name \n\n Code:\n$code")
- val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
- LOG.debug("Instantiating FlatMapFunction.")
- function = clazz.newInstance()
- }
-
- override def flatMap(in: IN, out: Collector[OUT]): Unit =
- function.flatMap(in, out)
-
- override def getProducedType: TypeInformation[OUT] = returnType
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala
deleted file mode 100644
index 9930811..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime
-
-import java.lang.{Iterable => JIterable}
-
-import org.apache.flink.api.common.functions.CoGroupFunction
-import org.apache.flink.util.Collector
-
-class IntersectCoGroupFunction[T](all: Boolean) extends CoGroupFunction[T, T, T]{
- override def coGroup(first: JIterable[T], second: JIterable[T], out: Collector[T]): Unit = {
- if (first == null || second == null) return
- val leftIter = first.iterator()
- val rightIter = second.iterator()
- if (all) {
- while (leftIter.hasNext && rightIter.hasNext) {
- out.collect(leftIter.next)
- rightIter.next
- }
- } else {
- if (leftIter.hasNext && rightIter.hasNext) {
- out.collect(leftIter.next)
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
deleted file mode 100644
index 5ec9035..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.common.functions.RichFilterFunction
-import org.apache.flink.configuration.Configuration
-
-import scala.collection.JavaConverters._
-
-
-class LimitFilterFunction[T](
- limitStart: Long,
- limitEnd: Long,
- broadcastName: String)
- extends RichFilterFunction[T] {
-
- var partitionIndex: Int = _
- var elementCount: Long = _
- var countList: Array[Long] = _
-
- override def open(config: Configuration) {
- partitionIndex = getRuntimeContext.getIndexOfThisSubtask
-
- val countPartitionResult = getRuntimeContext
- .getBroadcastVariable[(Int, Long)](broadcastName)
- .asScala
-
- // sort by partition index, extract number per partition, sum with intermediate results
- countList = countPartitionResult.sortWith(_._1 < _._1).map(_._2).scanLeft(0L) { case (a, b) =>
- val sum = a + b
- if (sum < 0L) { // prevent overflow
- Long.MaxValue
- }
- sum
- }.toArray
-
- elementCount = 0
- }
-
- override def filter(value: T): Boolean = {
- if (elementCount != Long.MaxValue) { // prevent overflow
- elementCount += 1L
- }
- // we filter out records that are not within the limit (Long.MaxValue is unlimited)
- limitStart - countList(partitionIndex) < elementCount &&
- (limitEnd == Long.MaxValue || limitEnd - countList(partitionIndex) >= elementCount)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinLeftRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinLeftRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinLeftRunner.scala
deleted file mode 100644
index 76650c2..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinLeftRunner.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.util.Collector
-
-class MapJoinLeftRunner[IN1, IN2, OUT](
- name: String,
- code: String,
- @transient returnType: TypeInformation[OUT],
- broadcastSetName: String)
- extends MapSideJoinRunner[IN1, IN2, IN2, IN1, OUT](name, code, returnType, broadcastSetName) {
-
- override def flatMap(multiInput: IN1, out: Collector[OUT]): Unit =
- function.join(multiInput, singleInput, out)
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinRightRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinRightRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinRightRunner.scala
deleted file mode 100644
index 52b01cf..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinRightRunner.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.util.Collector
-
-class MapJoinRightRunner[IN1, IN2, OUT](
- name: String,
- code: String,
- @transient returnType: TypeInformation[OUT],
- broadcastSetName: String)
- extends MapSideJoinRunner[IN1, IN2, IN1, IN2, OUT](name, code, returnType, broadcastSetName) {
-
- override def flatMap(multiInput: IN2, out: Collector[OUT]): Unit =
- function.join(singleInput, multiInput, out)
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
deleted file mode 100644
index 9fd1876..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.api.table.codegen.Compiler
-import org.apache.flink.configuration.Configuration
-import org.slf4j.LoggerFactory
-
-class MapRunner[IN, OUT](
- name: String,
- code: String,
- @transient returnType: TypeInformation[OUT])
- extends RichMapFunction[IN, OUT]
- with ResultTypeQueryable[OUT]
- with Compiler[MapFunction[IN, OUT]] {
-
- val LOG = LoggerFactory.getLogger(this.getClass)
-
- private var function: MapFunction[IN, OUT] = null
-
- override def open(parameters: Configuration): Unit = {
- LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
- val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
- LOG.debug("Instantiating MapFunction.")
- function = clazz.newInstance()
- }
-
- override def map(in: IN): OUT =
- function.map(in)
-
- override def getProducedType: TypeInformation[OUT] = returnType
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapSideJoinRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapSideJoinRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapSideJoinRunner.scala
deleted file mode 100644
index b355d49..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapSideJoinRunner.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.common.functions.{FlatJoinFunction, RichFlatMapFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.api.table.codegen.Compiler
-import org.apache.flink.configuration.Configuration
-import org.slf4j.LoggerFactory
-
-abstract class MapSideJoinRunner[IN1, IN2, SINGLE_IN, MULTI_IN, OUT](
- name: String,
- code: String,
- @transient returnType: TypeInformation[OUT],
- broadcastSetName: String)
- extends RichFlatMapFunction[MULTI_IN, OUT]
- with ResultTypeQueryable[OUT]
- with Compiler[FlatJoinFunction[IN1, IN2, OUT]] {
-
- val LOG = LoggerFactory.getLogger(this.getClass)
-
- protected var function: FlatJoinFunction[IN1, IN2, OUT] = _
- protected var singleInput: SINGLE_IN = _
-
- override def open(parameters: Configuration): Unit = {
- LOG.debug(s"Compiling FlatJoinFunction: $name \n\n Code:\n$code")
- val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
- LOG.debug("Instantiating FlatJoinFunction.")
- function = clazz.newInstance()
- singleInput = getRuntimeContext.getBroadcastVariable(broadcastSetName).get(0)
- }
-
- override def getProducedType: TypeInformation[OUT] = returnType
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MinusCoGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MinusCoGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MinusCoGroupFunction.scala
deleted file mode 100644
index cac4fe6..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MinusCoGroupFunction.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime
-
-import java.lang.Iterable
-
-import org.apache.flink.api.common.functions.CoGroupFunction
-import org.apache.flink.util.Collector
-
-class MinusCoGroupFunction[T](all: Boolean) extends CoGroupFunction[T, T, T] {
- override def coGroup(first: Iterable[T], second: Iterable[T], out: Collector[T]): Unit = {
- if (first == null || second == null) return
- val leftIter = first.iterator
- val rightIter = second.iterator
-
- if (all) {
- while (rightIter.hasNext && leftIter.hasNext) {
- leftIter.next()
- rightIter.next()
- }
-
- while (leftIter.hasNext) {
- out.collect(leftIter.next())
- }
- } else {
- if (!rightIter.hasNext && leftIter.hasNext) {
- out.collect(leftIter.next())
- }
- }
- }
-}
-