You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:47:01 UTC

[32/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
deleted file mode 100644
index 5c1fb53..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.{LogicalJoin, LogicalSort}
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSort}
-
-class DataSetSortRule
-  extends ConverterRule(
-    classOf[LogicalSort],
-    Convention.NONE,
-    DataSetConvention.INSTANCE,
-    "DataSetSortRule") {
-
-  override def convert(rel: RelNode): RelNode = {
-
-    val sort: LogicalSort = rel.asInstanceOf[LogicalSort]
-    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-    val convInput: RelNode = RelOptRule.convert(sort.getInput, DataSetConvention.INSTANCE)
-
-    new DataSetSort(
-      rel.getCluster,
-      traitSet,
-      convInput,
-      sort.getCollation,
-      rel.getRowType,
-      sort.offset,
-      sort.fetch
-    )
-  }
-}
-
-object DataSetSortRule {
-  val INSTANCE: RelOptRule = new DataSetSortRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala
deleted file mode 100644
index ea35637..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalUnion
-import org.apache.calcite.rel.rules.UnionToDistinctRule
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetUnion}
-
-class DataSetUnionRule
-  extends ConverterRule(
-      classOf[LogicalUnion],
-      Convention.NONE,
-      DataSetConvention.INSTANCE,
-      "DataSetUnionRule")
-  {
-
-  /**
-   * Only translate UNION ALL.
-   * Note: A distinct Union are translated into
-   * an Aggregate on top of a UNION ALL by [[UnionToDistinctRule]]
-   */
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val union: LogicalUnion = call.rel(0).asInstanceOf[LogicalUnion]
-    union.all
-  }
-
-  def convert(rel: RelNode): RelNode = {
-
-    val union: LogicalUnion = rel.asInstanceOf[LogicalUnion]
-    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-    val convLeft: RelNode = RelOptRule.convert(union.getInput(0), DataSetConvention.INSTANCE)
-    val convRight: RelNode = RelOptRule.convert(union.getInput(1), DataSetConvention.INSTANCE)
-
-    new DataSetUnion(
-      rel.getCluster,
-      traitSet,
-      convLeft,
-      convRight,
-      rel.getRowType)
-  }
-}
-
-object DataSetUnionRule {
-  val INSTANCE: RelOptRule = new DataSetUnionRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala
deleted file mode 100644
index 3d6c0de..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.{RelOptRule, RelTraitSet, Convention}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalValues
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetValues, DataSetConvention}
-
-class DataSetValuesRule
-  extends ConverterRule(
-    classOf[LogicalValues],
-    Convention.NONE,
-    DataSetConvention.INSTANCE,
-    "DataSetValuesRule")
-{
-
-  def convert(rel: RelNode): RelNode = {
-
-    val values: LogicalValues = rel.asInstanceOf[LogicalValues]
-    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-
-    new DataSetValues(
-      rel.getCluster,
-      traitSet,
-      rel.getRowType,
-      values.getTuples,
-      description)
-  }
-}
-
-object DataSetValuesRule {
-  val INSTANCE: RelOptRule = new DataSetValuesRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
deleted file mode 100644
index 301a45b..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.plan.RelOptRule.{none, operand}
-import org.apache.flink.api.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc}
-import org.apache.flink.api.table.plan.rules.util.RexProgramProjectExtractor._
-import org.apache.flink.api.table.sources.{BatchTableSource, ProjectableTableSource}
-
-/**
-  * This rule tries to push projections into a BatchTableSourceScan.
-  */
-class PushProjectIntoBatchTableSourceScanRule extends RelOptRule(
-  operand(classOf[DataSetCalc],
-          operand(classOf[BatchTableSourceScan], none)),
-  "PushProjectIntoBatchTableSourceScanRule") {
-
-  override def matches(call: RelOptRuleCall) = {
-    val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
-    scan.tableSource match {
-      case _: ProjectableTableSource[_] => true
-      case _ => false
-    }
-  }
-
-  override def onMatch(call: RelOptRuleCall) {
-    val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
-    val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
-
-    val usedFields: Array[Int] = extractRefInputFields(calc.calcProgram)
-
-    // if no fields can be projected, there is no need to transform subtree
-    if (scan.tableSource.getNumberOfFields != usedFields.length) {
-      val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]]
-      val newTableSource = originTableSource.projectFields(usedFields)
-      val newScan = new BatchTableSourceScan(
-        scan.getCluster,
-        scan.getTraitSet,
-        scan.getTable,
-        newTableSource.asInstanceOf[BatchTableSource[_]])
-
-      val newCalcProgram = rewriteRexProgram(
-        calc.calcProgram,
-        newScan.getRowType,
-        usedFields,
-        calc.getCluster.getRexBuilder)
-
-      // if project merely returns its input and doesn't exist filter, remove datasetCalc nodes
-      if (newCalcProgram.isTrivial) {
-        call.transformTo(newScan)
-      } else {
-        val newCalc = new DataSetCalc(
-          calc.getCluster,
-          calc.getTraitSet,
-          newScan,
-          calc.getRowType,
-          newCalcProgram,
-          description)
-        call.transformTo(newCalc)
-      }
-    }
-  }
-}
-
-object PushProjectIntoBatchTableSourceScanRule {
-  val INSTANCE: RelOptRule = new PushProjectIntoBatchTableSourceScanRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamAggregateRule.scala
deleted file mode 100644
index dff2adc..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamAggregateRule.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.datastream
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.flink.api.table.TableException
-import org.apache.flink.api.table.expressions.Alias
-import org.apache.flink.api.table.plan.logical.rel.LogicalWindowAggregate
-import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamAggregate, DataStreamConvention}
-
-import scala.collection.JavaConversions._
-
-class DataStreamAggregateRule
-  extends ConverterRule(
-      classOf[LogicalWindowAggregate],
-      Convention.NONE,
-      DataStreamConvention.INSTANCE,
-      "DataStreamAggregateRule")
-  {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val agg: LogicalWindowAggregate = call.rel(0).asInstanceOf[LogicalWindowAggregate]
-
-    // check if we have distinct aggregates
-    val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
-    if (distinctAggs) {
-      throw TableException("DISTINCT aggregates are currently not supported.")
-    }
-
-    // check if we have grouping sets
-    val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
-    if (groupSets || agg.indicator) {
-      throw TableException("GROUPING SETS are currently not supported.")
-    }
-
-    !distinctAggs && !groupSets && !agg.indicator
-  }
-
-  override def convert(rel: RelNode): RelNode = {
-    val agg: LogicalWindowAggregate = rel.asInstanceOf[LogicalWindowAggregate]
-    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
-    val convInput: RelNode = RelOptRule.convert(agg.getInput, DataStreamConvention.INSTANCE)
-
-    new DataStreamAggregate(
-      agg.getWindow,
-      agg.getNamedProperties,
-      rel.getCluster,
-      traitSet,
-      convInput,
-      agg.getNamedAggCalls,
-      rel.getRowType,
-      agg.getInput.getRowType,
-      agg.getGroupSet.toArray)
-    }
-  }
-
-object DataStreamAggregateRule {
-  val INSTANCE: RelOptRule = new DataStreamAggregateRule
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCalcRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCalcRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCalcRule.scala
deleted file mode 100644
index b62967a..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCalcRule.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.datastream
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalCalc
-import org.apache.flink.api.table.plan.nodes.datastream.DataStreamCalc
-import org.apache.flink.api.table.plan.nodes.datastream.DataStreamConvention
-
-class DataStreamCalcRule
-  extends ConverterRule(
-    classOf[LogicalCalc],
-    Convention.NONE,
-    DataStreamConvention.INSTANCE,
-    "DataStreamCalcRule")
-{
-
-  def convert(rel: RelNode): RelNode = {
-    val calc: LogicalCalc = rel.asInstanceOf[LogicalCalc]
-    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
-    val convInput: RelNode = RelOptRule.convert(calc.getInput, DataStreamConvention.INSTANCE)
-
-    new DataStreamCalc(
-      rel.getCluster,
-      traitSet,
-      convInput,
-      rel.getRowType,
-      calc.getProgram,
-      description)
-  }
-}
-
-object DataStreamCalcRule {
-  val INSTANCE: RelOptRule = new DataStreamCalcRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala
deleted file mode 100644
index 554c6c1..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan.rules.datastream
-
-import org.apache.calcite.plan.volcano.RelSubset
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan}
-import org.apache.calcite.rex.RexNode
-import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamCorrelate, DataStreamConvention}
-
-/**
-  * Rule to convert a LogicalCorrelate into a DataStreamCorrelate.
-  */
-class DataStreamCorrelateRule
-  extends ConverterRule(
-    classOf[LogicalCorrelate],
-    Convention.NONE,
-    DataStreamConvention.INSTANCE,
-    "DataStreamCorrelateRule") {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate]
-    val right = join.getRight.asInstanceOf[RelSubset].getOriginal
-
-    right match {
-      // right node is a table function
-      case scan: LogicalTableFunctionScan => true
-      // a filter is pushed above the table function
-      case filter: LogicalFilter =>
-        filter
-          .getInput.asInstanceOf[RelSubset]
-          .getOriginal
-          .isInstanceOf[LogicalTableFunctionScan]
-      case _ => false
-    }
-  }
-
-  override def convert(rel: RelNode): RelNode = {
-    val join: LogicalCorrelate = rel.asInstanceOf[LogicalCorrelate]
-    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
-    val convInput: RelNode = RelOptRule.convert(join.getInput(0), DataStreamConvention.INSTANCE)
-    val right: RelNode = join.getInput(1)
-
-    def convertToCorrelate(relNode: RelNode, condition: Option[RexNode]): DataStreamCorrelate = {
-      relNode match {
-        case rel: RelSubset =>
-          convertToCorrelate(rel.getRelList.get(0), condition)
-
-        case filter: LogicalFilter =>
-          convertToCorrelate(
-            filter.getInput.asInstanceOf[RelSubset].getOriginal,
-            Some(filter.getCondition))
-
-        case scan: LogicalTableFunctionScan =>
-          new DataStreamCorrelate(
-            rel.getCluster,
-            traitSet,
-            convInput,
-            scan,
-            condition,
-            rel.getRowType,
-            join.getRowType,
-            join.getJoinType,
-            description)
-      }
-    }
-    convertToCorrelate(right, None)
-  }
-
-}
-
-object DataStreamCorrelateRule {
-  val INSTANCE: RelOptRule = new DataStreamCorrelateRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamScanRule.scala
deleted file mode 100644
index 62638bc..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamScanRule.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.datastream
-
-import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.core.TableScan
-import org.apache.calcite.rel.logical.LogicalTableScan
-import org.apache.flink.api.table.plan.nodes.datastream.DataStreamConvention
-import org.apache.flink.api.table.plan.nodes.datastream.DataStreamScan
-import org.apache.flink.api.table.plan.schema.DataStreamTable
-
-class DataStreamScanRule
-  extends ConverterRule(
-    classOf[LogicalTableScan],
-    Convention.NONE,
-    DataStreamConvention.INSTANCE,
-    "DataStreamScanRule")
-{
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
-    val dataSetTable = scan.getTable.unwrap(classOf[DataStreamTable[Any]])
-    dataSetTable match {
-      case _: DataStreamTable[Any] =>
-        true
-      case _ =>
-        false
-    }
-  }
-
-  def convert(rel: RelNode): RelNode = {
-    val scan: LogicalTableScan = rel.asInstanceOf[LogicalTableScan]
-    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
-
-    new DataStreamScan(
-      rel.getCluster,
-      traitSet,
-      scan.getTable,
-      rel.getRowType
-    )
-  }
-}
-
-object DataStreamScanRule {
-  val INSTANCE: RelOptRule = new DataStreamScanRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamUnionRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamUnionRule.scala
deleted file mode 100644
index 78a5486..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamUnionRule.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.datastream
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalUnion
-import org.apache.flink.api.table.plan.nodes.datastream.DataStreamConvention
-import org.apache.flink.api.table.plan.nodes.datastream.DataStreamUnion
-
-class DataStreamUnionRule
-  extends ConverterRule(
-    classOf[LogicalUnion],
-    Convention.NONE,
-    DataStreamConvention.INSTANCE,
-    "DataStreamUnionRule")
-{
-
-  def convert(rel: RelNode): RelNode = {
-    val union: LogicalUnion = rel.asInstanceOf[LogicalUnion]
-    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
-    val convLeft: RelNode = RelOptRule.convert(union.getInput(0), DataStreamConvention.INSTANCE)
-    val convRight: RelNode = RelOptRule.convert(union.getInput(1), DataStreamConvention.INSTANCE)
-
-    new DataStreamUnion(
-      rel.getCluster,
-      traitSet,
-      convLeft,
-      convRight,
-      rel.getRowType)
-  }
-}
-
-object DataStreamUnionRule {
-  val INSTANCE: RelOptRule = new DataStreamUnionRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala
deleted file mode 100644
index 738642d..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.datastream
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalValues
-import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamValues, DataStreamConvention}
-
-class DataStreamValuesRule
-  extends ConverterRule(
-    classOf[LogicalValues],
-    Convention.NONE,
-    DataStreamConvention.INSTANCE,
-    "DataStreamValuesRule")
-{
-
-  def convert(rel: RelNode): RelNode = {
-
-    val values: LogicalValues = rel.asInstanceOf[LogicalValues]
-    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
-
-    new DataStreamValues(
-      rel.getCluster,
-      traitSet,
-      rel.getRowType,
-      values.getTuples,
-      description)
-  }
-}
-
-object DataStreamValuesRule {
-  val INSTANCE: RelOptRule = new DataStreamValuesRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
deleted file mode 100644
index 91dd255..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.datastream
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.core.TableScan
-import org.apache.calcite.rel.logical.LogicalTableScan
-import org.apache.flink.api.table.plan.nodes.datastream.
-  {StreamTableSourceScan, DataStreamConvention}
-import org.apache.flink.api.table.plan.schema.TableSourceTable
-import org.apache.flink.api.table.sources.StreamTableSource
-
-/** Rule to convert a [[LogicalTableScan]] into a [[StreamTableSourceScan]]. */
-class StreamTableSourceScanRule
-  extends ConverterRule(
-    classOf[LogicalTableScan],
-    Convention.NONE,
-    DataStreamConvention.INSTANCE,
-    "StreamTableSourceScanRule")
-{
-
-  /** Rule must only match if TableScan targets a [[StreamTableSource]] */
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
-    val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable])
-    dataSetTable match {
-      case tst: TableSourceTable =>
-        tst.tableSource match {
-          case _: StreamTableSource[_] =>
-            true
-          case _ =>
-            false
-        }
-      case _ =>
-        false
-    }
-  }
-
-  def convert(rel: RelNode): RelNode = {
-    val scan: LogicalTableScan = rel.asInstanceOf[LogicalTableScan]
-    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
-
-    // The original registered table source
-    val table: TableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable])
-    val tableSource: StreamTableSource[_] = table.tableSource.asInstanceOf[StreamTableSource[_]]
-
-    new StreamTableSourceScan(
-      rel.getCluster,
-      traitSet,
-      scan.getTable,
-      tableSource
-    )
-  }
-}
-
-object StreamTableSourceScanRule {
-  val INSTANCE: RelOptRule = new StreamTableSourceScanRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractor.scala
deleted file mode 100644
index d78e07f..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractor.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.util
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex._
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable
-import scala.collection.JavaConverters._
-
-object RexProgramProjectExtractor {
-
-  /**
-    * Extracts the indexes of input fields accessed by the RexProgram.
-    *
-    * @param rexProgram The RexProgram to analyze
-    * @return The indexes of accessed input fields
-    */
-  def extractRefInputFields(rexProgram: RexProgram): Array[Int] = {
-    val visitor = new RefFieldsVisitor
-    // extract input fields from project expressions
-    rexProgram.getProjectList.foreach(exp => rexProgram.expandLocalRef(exp).accept(visitor))
-    val condition = rexProgram.getCondition
-    // extract input fields from condition expression
-    if (condition != null) {
-      rexProgram.expandLocalRef(condition).accept(visitor)
-    }
-    visitor.getFields
-  }
-
-  /**
-    * Generates a new RexProgram based on mapped input fields.
-    *
-    * @param rexProgram      original RexProgram
-    * @param inputRowType    input row type
-    * @param usedInputFields indexes of used input fields
-    * @param rexBuilder      builder for Rex expressions
-    *
-    * @return A RexProgram with mapped input field expressions.
-    */
-  def rewriteRexProgram(
-      rexProgram: RexProgram,
-      inputRowType: RelDataType,
-      usedInputFields: Array[Int],
-      rexBuilder: RexBuilder): RexProgram = {
-
-    val inputRewriter = new InputRewriter(usedInputFields)
-    val newProjectExpressions = rexProgram.getProjectList.map(
-      exp => rexProgram.expandLocalRef(exp).accept(inputRewriter)
-    ).toList.asJava
-
-    val oldCondition = rexProgram.getCondition
-    val newConditionExpression = {
-      oldCondition match {
-        case ref: RexLocalRef => rexProgram.expandLocalRef(ref).accept(inputRewriter)
-        case _ => null // null does not match any type
-      }
-    }
-    RexProgram.create(
-      inputRowType,
-      newProjectExpressions,
-      newConditionExpression,
-      rexProgram.getOutputRowType,
-      rexBuilder
-    )
-  }
-}
-
-/**
-  * A RexVisitor to extract used input fields
-  */
-class RefFieldsVisitor extends RexVisitorImpl[Unit](true) {
-  private var fields = mutable.LinkedHashSet[Int]()
-
-  def getFields: Array[Int] = fields.toArray
-
-  override def visitInputRef(inputRef: RexInputRef): Unit = fields += inputRef.getIndex
-
-  override def visitCall(call: RexCall): Unit =
-    call.operands.foreach(operand => operand.accept(this))
-}
-
-/**
-  * A RexShuttle to rewrite field accesses of a RexProgram.
-  *
-  * @param fields fields mapping
-  */
-class InputRewriter(fields: Array[Int]) extends RexShuttle {
-
-  /** old input fields ref index -> new input fields ref index mappings */
-  private val fieldMap: Map[Int, Int] =
-    fields.zipWithIndex.toMap
-
-  override def visitInputRef(inputRef: RexInputRef): RexNode =
-    new RexInputRef(relNodeIndex(inputRef), inputRef.getType)
-
-  override def visitLocalRef(localRef: RexLocalRef): RexNode =
-    new RexInputRef(relNodeIndex(localRef), localRef.getType)
-
-  private def relNodeIndex(ref: RexSlot): Int =
-    fieldMap.getOrElse(ref.getIndex,
-      throw new IllegalArgumentException("input field contains invalid index"))
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/ArrayRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/ArrayRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/ArrayRelDataType.scala
deleted file mode 100644
index 92fcb83..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/ArrayRelDataType.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.schema
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.sql.`type`.ArraySqlType
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-/**
-  * Flink distinguishes between primitive arrays (int[], double[], ...) and
-  * object arrays (Integer[], MyPojo[], ...). This custom type supports both cases.
-  */
-class ArrayRelDataType(
-    val typeInfo: TypeInformation[_],
-    elementType: RelDataType,
-    isNullable: Boolean)
-  extends ArraySqlType(
-    elementType,
-    isNullable) {
-
-  override def toString = s"ARRAY($typeInfo)"
-
-  def canEqual(other: Any): Boolean = other.isInstanceOf[ArrayRelDataType]
-
-  override def equals(other: Any): Boolean = other match {
-    case that: ArrayRelDataType =>
-      super.equals(that) &&
-        (that canEqual this) &&
-        typeInfo == that.typeInfo
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    typeInfo.hashCode()
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/CompositeRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/CompositeRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/CompositeRelDataType.scala
deleted file mode 100644
index b9ceff0..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/CompositeRelDataType.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.schema
-
-import java.util
-
-import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFieldImpl, RelRecordType}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.FlinkTypeFactory
-import org.apache.flink.api.table.plan.schema.CompositeRelDataType.createFieldList
-
-import scala.collection.JavaConverters._
-
-/**
-  * Composite type for encapsulating Flink's [[CompositeType]].
-  *
-  * @param compositeType CompositeType to encapsulate
-  * @param typeFactory Flink's type factory
-  */
-class CompositeRelDataType(
-    val compositeType: CompositeType[_],
-    typeFactory: FlinkTypeFactory)
-  extends RelRecordType(createFieldList(compositeType, typeFactory)) {
-
-  override def toString = s"COMPOSITE($compositeType)"
-
-  def canEqual(other: Any): Boolean = other.isInstanceOf[CompositeRelDataType]
-
-  override def equals(other: Any): Boolean = other match {
-    case that: CompositeRelDataType =>
-      super.equals(that) &&
-        (that canEqual this) &&
-        compositeType == that.compositeType
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    compositeType.hashCode()
-  }
-
-}
-
-object CompositeRelDataType {
-
-  /**
-    * Converts the fields of a composite type to list of [[RelDataTypeField]].
-    */
-  private def createFieldList(
-      compositeType: CompositeType[_],
-      typeFactory: FlinkTypeFactory)
-    : util.List[RelDataTypeField] = {
-
-    compositeType
-      .getFieldNames
-      .zipWithIndex
-      .map { case (name, index) =>
-        new RelDataTypeFieldImpl(
-          name,
-          index,
-          typeFactory.createTypeFromTypeInfo(compositeType.getTypeAt(index)))
-            .asInstanceOf[RelDataTypeField]
-      }
-      .toList
-      .asJava
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
deleted file mode 100644
index bbcba13..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.schema
-
-import java.lang.Double
-import java.util
-import java.util.Collections
-
-import org.apache.calcite.rel.{RelCollation, RelDistribution}
-import org.apache.calcite.schema.Statistic
-import org.apache.calcite.util.ImmutableBitSet
-import org.apache.flink.api.java.DataSet
-
-class DataSetTable[T](
-    val dataSet: DataSet[T],
-    override val fieldIndexes: Array[Int],
-    override val fieldNames: Array[String])
-  extends FlinkTable[T](dataSet.getType, fieldIndexes, fieldNames) {
-
-  override def getStatistic: Statistic = {
-    new DefaultDataSetStatistic
-  }
-
-}
-
-class DefaultDataSetStatistic extends Statistic {
-
-  override def getRowCount: Double = 1000d
-
-  override def getCollations: util.List[RelCollation] = Collections.emptyList()
-
-  override def isKey(columns: ImmutableBitSet): Boolean = false
-
-  override def getDistribution: RelDistribution = null
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
deleted file mode 100644
index 570d723..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.schema
-
-import org.apache.flink.streaming.api.datastream.DataStream
-
-class DataStreamTable[T](
-    val dataStream: DataStream[T],
-    override val fieldIndexes: Array[Int],
-    override val fieldNames: Array[String])
-  extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames) {
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala
deleted file mode 100644
index 84d6d7e..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.schema
-
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
-import org.apache.calcite.schema.impl.AbstractTable
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.{FlinkTypeFactory, TableException}
-
-abstract class FlinkTable[T](
-    val typeInfo: TypeInformation[T],
-    val fieldIndexes: Array[Int],
-    val fieldNames: Array[String])
-  extends AbstractTable {
-
-  if (fieldIndexes.length != fieldNames.length) {
-    throw new TableException(
-      "Number of field indexes and field names must be equal.")
-  }
-
-  // check uniqueness of field names
-  if (fieldNames.length != fieldNames.toSet.size) {
-    throw new TableException(
-      "Table field names must be unique.")
-  }
-
-  val fieldTypes: Array[TypeInformation[_]] =
-    typeInfo match {
-      case cType: CompositeType[T] =>
-        if (fieldNames.length != cType.getArity) {
-          throw new TableException(
-          s"Arity of type (" + cType.getFieldNames.deep + ") " +
-            "not equal to number of field names " + fieldNames.deep + ".")
-        }
-        fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]])
-      case aType: AtomicType[T] =>
-        if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) {
-          throw new TableException(
-            "Non-composite input type may have only a single field and its index must be 0.")
-        }
-        Array(aType)
-    }
-
-  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
-    val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
-    flinkTypeFactory.buildRowDataType(fieldNames, fieldTypes)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTableFunctionImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTableFunctionImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTableFunctionImpl.scala
deleted file mode 100644
index 540a5c8..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTableFunctionImpl.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan.schema
-
-import java.lang.reflect.{Method, Type}
-import java.util
-
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
-import org.apache.calcite.schema.TableFunction
-import org.apache.calcite.schema.impl.ReflectiveFunctionBase
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.{FlinkTypeFactory, TableException}
-
-/**
-  * This is heavily inspired by Calcite's [[org.apache.calcite.schema.impl.TableFunctionImpl]].
-  * We need it in order to create a [[org.apache.flink.api.table.functions.utils.TableSqlFunction]].
-  * The main difference is that we override the [[getRowType()]] and [[getElementType()]].
-  */
-class FlinkTableFunctionImpl[T](
-    val typeInfo: TypeInformation[T],
-    val fieldIndexes: Array[Int],
-    val fieldNames: Array[String],
-    val evalMethod: Method)
-  extends ReflectiveFunctionBase(evalMethod)
-  with TableFunction {
-
-  if (fieldIndexes.length != fieldNames.length) {
-    throw new TableException(
-      "Number of field indexes and field names must be equal.")
-  }
-
-  // check uniqueness of field names
-  if (fieldNames.length != fieldNames.toSet.size) {
-    throw new TableException(
-      "Table field names must be unique.")
-  }
-
-  val fieldTypes: Array[TypeInformation[_]] =
-    typeInfo match {
-      case cType: CompositeType[T] =>
-        if (fieldNames.length != cType.getArity) {
-          throw new TableException(
-            s"Arity of type (" + cType.getFieldNames.deep + ") " +
-              "not equal to number of field names " + fieldNames.deep + ".")
-        }
-        fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]])
-      case aType: AtomicType[T] =>
-        if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) {
-          throw new TableException(
-            "Non-composite input type may have only a single field and its index must be 0.")
-        }
-        Array(aType)
-    }
-
-  override def getElementType(arguments: util.List[AnyRef]): Type = classOf[Array[Object]]
-
-  override def getRowType(typeFactory: RelDataTypeFactory,
-                          arguments: util.List[AnyRef]): RelDataType = {
-    val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
-    val builder = flinkTypeFactory.builder
-    fieldNames
-      .zip(fieldTypes)
-      .foreach { f =>
-        builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2)).nullable(true)
-      }
-    builder.build
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/GenericRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/GenericRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/GenericRelDataType.scala
deleted file mode 100644
index a3012d1..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/GenericRelDataType.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.schema
-
-import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.FlinkTypeSystem
-
-/**
-  * Generic type for encapsulating Flink's [[TypeInformation]].
-  *
-  * @param typeInfo TypeInformation to encapsulate
-  * @param typeSystem Flink's type system
-  */
-class GenericRelDataType(
-    val typeInfo: TypeInformation[_],
-    typeSystem: FlinkTypeSystem)
-  extends BasicSqlType(
-    typeSystem,
-    SqlTypeName.ANY) {
-
-  override def toString = s"ANY($typeInfo)"
-
-  def canEqual(other: Any): Boolean = other.isInstanceOf[GenericRelDataType]
-
-  override def equals(other: Any): Boolean = other match {
-    case that: GenericRelDataType =>
-      super.equals(that) &&
-        (that canEqual this) &&
-        typeInfo == that.typeInfo
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    typeInfo.hashCode()
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/RelTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/RelTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/RelTable.scala
deleted file mode 100644
index f952d83..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/RelTable.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.schema
-
-import org.apache.calcite.plan.RelOptTable
-import org.apache.calcite.plan.RelOptTable.ToRelContext
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
-import org.apache.calcite.schema.Schema.TableType
-import org.apache.calcite.schema.impl.AbstractTable
-import org.apache.calcite.schema.TranslatableTable
-
-/**
- * A [[org.apache.calcite.schema.Table]] implementation for registering
- * Table API Tables in the Calcite schema to be used by Flink SQL.
- * It implements [[TranslatableTable]] so that its logical scan
- * can be converted to a relational expression.
- *
- * @see [[DataSetTable]]
- */
-class RelTable(relNode: RelNode) extends AbstractTable with TranslatableTable {
-
-  override def getJdbcTableType: TableType = ???
-
-  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = relNode.getRowType
-
-  override def toRel(context: ToRelContext, relOptTable: RelOptTable): RelNode = {
-    relNode
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala
deleted file mode 100644
index 72be00c..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.schema
-
-import org.apache.flink.types.Row
-import org.apache.flink.api.table.sources.TableSource
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-
-/** Table which defines an external table via a [[TableSource]] */
-class TableSourceTable(val tableSource: TableSource[_])
-  extends FlinkTable[Row](
-    typeInfo = new RowTypeInfo(tableSource.getFieldTypes: _*),
-    fieldIndexes = 0.until(tableSource.getNumberOfFields).toArray,
-    fieldNames = tableSource.getFieldsNames)

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
deleted file mode 100644
index 5896f4c..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime
-
-import java.lang.Iterable
-
-import org.apache.flink.api.common.functions.RichMapPartitionFunction
-import org.apache.flink.util.Collector
-
-class CountPartitionFunction[IN] extends RichMapPartitionFunction[IN, (Int, Long)] {
-
-  override def mapPartition(value: Iterable[IN], out: Collector[(Int, Long)]): Unit = {
-    val partitionIndex = getRuntimeContext.getIndexOfThisSubtask
-    var elementCount = 0L
-    val iterator = value.iterator()
-    while (iterator.hasNext) {
-      if (elementCount != Long.MaxValue) { // prevent overflow
-        elementCount += 1L
-      }
-      iterator.next()
-    }
-    out.collect(partitionIndex, elementCount)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala
deleted file mode 100644
index 2e57a0f..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.common.functions.{FlatJoinFunction, RichFlatJoinFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.api.table.codegen.Compiler
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.util.Collector
-import org.slf4j.LoggerFactory
-
-class FlatJoinRunner[IN1, IN2, OUT](
-    name: String,
-    code: String,
-    @transient returnType: TypeInformation[OUT])
-  extends RichFlatJoinFunction[IN1, IN2, OUT]
-  with ResultTypeQueryable[OUT]
-  with Compiler[FlatJoinFunction[IN1, IN2, OUT]] {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-  private var function: FlatJoinFunction[IN1, IN2, OUT] = null
-
-  override def open(parameters: Configuration): Unit = {
-    LOG.debug(s"Compiling FlatJoinFunction: $name \n\n Code:\n$code")
-    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
-    LOG.debug("Instantiating FlatJoinFunction.")
-    function = clazz.newInstance()
-  }
-
-  override def join(first: IN1, second: IN2, out: Collector[OUT]): Unit =
-    function.join(first, second, out)
-
-  override def getProducedType: TypeInformation[OUT] = returnType
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
deleted file mode 100644
index e228e2b..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.api.table.codegen.Compiler
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.util.Collector
-import org.slf4j.LoggerFactory
-
-class FlatMapRunner[IN, OUT](
-    name: String,
-    code: String,
-    @transient returnType: TypeInformation[OUT])
-  extends RichFlatMapFunction[IN, OUT]
-  with ResultTypeQueryable[OUT]
-  with Compiler[FlatMapFunction[IN, OUT]] {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-  private var function: FlatMapFunction[IN, OUT] = _
-
-  override def open(parameters: Configuration): Unit = {
-    LOG.debug(s"Compiling FlatMapFunction: $name \n\n Code:\n$code")
-    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
-    LOG.debug("Instantiating FlatMapFunction.")
-    function = clazz.newInstance()
-  }
-
-  override def flatMap(in: IN, out: Collector[OUT]): Unit =
-    function.flatMap(in, out)
-
-  override def getProducedType: TypeInformation[OUT] = returnType
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala
deleted file mode 100644
index 9930811..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime
-
-import java.lang.{Iterable => JIterable}
-
-import org.apache.flink.api.common.functions.CoGroupFunction
-import org.apache.flink.util.Collector
-
-class IntersectCoGroupFunction[T](all: Boolean) extends CoGroupFunction[T, T, T]{
-  override def coGroup(first: JIterable[T], second: JIterable[T], out: Collector[T]): Unit = {
-    if (first == null || second == null) return
-    val leftIter = first.iterator()
-    val rightIter = second.iterator()
-    if (all) {
-      while (leftIter.hasNext && rightIter.hasNext) {
-        out.collect(leftIter.next)
-        rightIter.next
-      }
-    } else {
-      if (leftIter.hasNext && rightIter.hasNext) {
-        out.collect(leftIter.next)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
deleted file mode 100644
index 5ec9035..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.common.functions.RichFilterFunction
-import org.apache.flink.configuration.Configuration
-
-import scala.collection.JavaConverters._
-
-
-class LimitFilterFunction[T](
-    limitStart: Long,
-    limitEnd: Long,
-    broadcastName: String)
-  extends RichFilterFunction[T] {
-
-  var partitionIndex: Int = _
-  var elementCount: Long = _
-  var countList: Array[Long] = _
-
-  override def open(config: Configuration) {
-    partitionIndex = getRuntimeContext.getIndexOfThisSubtask
-
-    val countPartitionResult = getRuntimeContext
-      .getBroadcastVariable[(Int, Long)](broadcastName)
-      .asScala
-
-    // sort by partition index, extract number per partition, sum with intermediate results
-    countList = countPartitionResult.sortWith(_._1 < _._1).map(_._2).scanLeft(0L) { case (a, b) =>
-        val sum = a + b
-        if (sum < 0L) { // prevent overflow
-          Long.MaxValue
-        }
-        sum
-    }.toArray
-
-    elementCount = 0
-  }
-
-  override def filter(value: T): Boolean = {
-    if (elementCount != Long.MaxValue) { // prevent overflow
-      elementCount += 1L
-    }
-    // we filter out records that are not within the limit (Long.MaxValue is unlimited)
-    limitStart - countList(partitionIndex) < elementCount &&
-      (limitEnd == Long.MaxValue || limitEnd - countList(partitionIndex) >= elementCount)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinLeftRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinLeftRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinLeftRunner.scala
deleted file mode 100644
index 76650c2..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinLeftRunner.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.util.Collector
-
-class MapJoinLeftRunner[IN1, IN2, OUT](
-    name: String,
-    code: String,
-    @transient returnType: TypeInformation[OUT],
-    broadcastSetName: String)
-  extends MapSideJoinRunner[IN1, IN2, IN2, IN1, OUT](name, code, returnType, broadcastSetName) {
-
-  override def flatMap(multiInput: IN1, out: Collector[OUT]): Unit =
-    function.join(multiInput, singleInput, out)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinRightRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinRightRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinRightRunner.scala
deleted file mode 100644
index 52b01cf..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinRightRunner.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.util.Collector
-
-class MapJoinRightRunner[IN1, IN2, OUT](
-    name: String,
-    code: String,
-    @transient returnType: TypeInformation[OUT],
-    broadcastSetName: String)
-  extends MapSideJoinRunner[IN1, IN2, IN1, IN2, OUT](name, code, returnType, broadcastSetName) {
-
-  override def flatMap(multiInput: IN2, out: Collector[OUT]): Unit =
-    function.join(singleInput, multiInput, out)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
deleted file mode 100644
index 9fd1876..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.api.table.codegen.Compiler
-import org.apache.flink.configuration.Configuration
-import org.slf4j.LoggerFactory
-
-class MapRunner[IN, OUT](
-    name: String,
-    code: String,
-    @transient returnType: TypeInformation[OUT])
-  extends RichMapFunction[IN, OUT]
-  with ResultTypeQueryable[OUT]
-  with Compiler[MapFunction[IN, OUT]] {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-  private var function: MapFunction[IN, OUT] = null
-
-  override def open(parameters: Configuration): Unit = {
-    LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
-    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
-    LOG.debug("Instantiating MapFunction.")
-    function = clazz.newInstance()
-  }
-
-  override def map(in: IN): OUT =
-    function.map(in)
-
-  override def getProducedType: TypeInformation[OUT] = returnType
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapSideJoinRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapSideJoinRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapSideJoinRunner.scala
deleted file mode 100644
index b355d49..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapSideJoinRunner.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.common.functions.{FlatJoinFunction, RichFlatMapFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.api.table.codegen.Compiler
-import org.apache.flink.configuration.Configuration
-import org.slf4j.LoggerFactory
-
-abstract class MapSideJoinRunner[IN1, IN2, SINGLE_IN, MULTI_IN, OUT](
-    name: String,
-    code: String,
-    @transient returnType: TypeInformation[OUT],
-    broadcastSetName: String)
-  extends RichFlatMapFunction[MULTI_IN, OUT]
-    with ResultTypeQueryable[OUT]
-    with Compiler[FlatJoinFunction[IN1, IN2, OUT]] {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-  protected var function: FlatJoinFunction[IN1, IN2, OUT] = _
-  protected var singleInput: SINGLE_IN = _
-
-  override def open(parameters: Configuration): Unit = {
-    LOG.debug(s"Compiling FlatJoinFunction: $name \n\n Code:\n$code")
-    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
-    LOG.debug("Instantiating FlatJoinFunction.")
-    function = clazz.newInstance()
-    singleInput = getRuntimeContext.getBroadcastVariable(broadcastSetName).get(0)
-  }
-
-  override def getProducedType: TypeInformation[OUT] = returnType
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MinusCoGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MinusCoGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MinusCoGroupFunction.scala
deleted file mode 100644
index cac4fe6..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MinusCoGroupFunction.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime
-
-import java.lang.Iterable
-
-import org.apache.flink.api.common.functions.CoGroupFunction
-import org.apache.flink.util.Collector
-
-class MinusCoGroupFunction[T](all: Boolean) extends CoGroupFunction[T, T, T] {
-  override def coGroup(first: Iterable[T], second: Iterable[T], out: Collector[T]): Unit = {
-    if (first == null || second == null) return
-    val leftIter = first.iterator
-    val rightIter = second.iterator
-
-    if (all) {
-      while (rightIter.hasNext && leftIter.hasNext) {
-        leftIter.next()
-        rightIter.next()
-      }
-
-      while (leftIter.hasNext) {
-        out.collect(leftIter.next())
-      }
-    } else {
-      if (!rightIter.hasNext && leftIter.hasNext) {
-        out.collect(leftIter.next())
-      }
-    }
-  }
-}
-