You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:47 UTC

[18/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSortRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSortRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSortRule.scala
new file mode 100644
index 0000000..14b9459
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSortRule.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{LogicalJoin, LogicalSort}
+import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetSort}
+
+class DataSetSortRule
+  extends ConverterRule(
+    classOf[LogicalSort],
+    Convention.NONE,
+    DataSetConvention.INSTANCE,
+    "DataSetSortRule") {
+
+  override def convert(rel: RelNode): RelNode = {
+
+    val sort: LogicalSort = rel.asInstanceOf[LogicalSort]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+    val convInput: RelNode = RelOptRule.convert(sort.getInput, DataSetConvention.INSTANCE)
+
+    new DataSetSort(
+      rel.getCluster,
+      traitSet,
+      convInput,
+      sort.getCollation,
+      rel.getRowType,
+      sort.offset,
+      sort.fetch
+    )
+  }
+}
+
+object DataSetSortRule {
+  val INSTANCE: RelOptRule = new DataSetSortRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetUnionRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetUnionRule.scala
new file mode 100644
index 0000000..e24f477
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetUnionRule.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalUnion
+import org.apache.calcite.rel.rules.UnionToDistinctRule
+import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetUnion}
+
+class DataSetUnionRule
+  extends ConverterRule(
+      classOf[LogicalUnion],
+      Convention.NONE,
+      DataSetConvention.INSTANCE,
+      "DataSetUnionRule")
+  {
+
+  /**
+   * Only translate UNION ALL.
+   * Note: A distinct Union are translated into
+   * an Aggregate on top of a UNION ALL by [[UnionToDistinctRule]]
+   */
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val union: LogicalUnion = call.rel(0).asInstanceOf[LogicalUnion]
+    union.all
+  }
+
+  def convert(rel: RelNode): RelNode = {
+
+    val union: LogicalUnion = rel.asInstanceOf[LogicalUnion]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+    val convLeft: RelNode = RelOptRule.convert(union.getInput(0), DataSetConvention.INSTANCE)
+    val convRight: RelNode = RelOptRule.convert(union.getInput(1), DataSetConvention.INSTANCE)
+
+    new DataSetUnion(
+      rel.getCluster,
+      traitSet,
+      convLeft,
+      convRight,
+      rel.getRowType)
+  }
+}
+
+object DataSetUnionRule {
+  val INSTANCE: RelOptRule = new DataSetUnionRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetValuesRule.scala
new file mode 100644
index 0000000..8ecdc74
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetValuesRule.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{RelOptRule, RelTraitSet, Convention}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalValues
+import org.apache.flink.table.plan.nodes.dataset.{DataSetValues, DataSetConvention}
+
+class DataSetValuesRule
+  extends ConverterRule(
+    classOf[LogicalValues],
+    Convention.NONE,
+    DataSetConvention.INSTANCE,
+    "DataSetValuesRule")
+{
+
+  def convert(rel: RelNode): RelNode = {
+
+    val values: LogicalValues = rel.asInstanceOf[LogicalValues]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+
+    new DataSetValues(
+      rel.getCluster,
+      traitSet,
+      rel.getRowType,
+      values.getTuples,
+      description)
+  }
+}
+
+object DataSetValuesRule {
+  val INSTANCE: RelOptRule = new DataSetValuesRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
new file mode 100644
index 0000000..5d91c62
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc}
+import org.apache.flink.table.plan.rules.util.RexProgramProjectExtractor._
+import org.apache.flink.table.sources.{BatchTableSource, ProjectableTableSource}
+
+/**
+  * This rule tries to push projections into a BatchTableSourceScan.
+  */
+class PushProjectIntoBatchTableSourceScanRule extends RelOptRule(
+  operand(classOf[DataSetCalc],
+          operand(classOf[BatchTableSourceScan], none)),
+  "PushProjectIntoBatchTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall) = {
+    val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
+    scan.tableSource match {
+      case _: ProjectableTableSource[_] => true
+      case _ => false
+    }
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+    val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
+    val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
+
+    val usedFields: Array[Int] = extractRefInputFields(calc.calcProgram)
+
+    // if no fields can be projected, there is no need to transform subtree
+    if (scan.tableSource.getNumberOfFields != usedFields.length) {
+      val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]]
+      val newTableSource = originTableSource.projectFields(usedFields)
+      val newScan = new BatchTableSourceScan(
+        scan.getCluster,
+        scan.getTraitSet,
+        scan.getTable,
+        newTableSource.asInstanceOf[BatchTableSource[_]])
+
+      val newCalcProgram = rewriteRexProgram(
+        calc.calcProgram,
+        newScan.getRowType,
+        usedFields,
+        calc.getCluster.getRexBuilder)
+
+      // if project merely returns its input and doesn't exist filter, remove datasetCalc nodes
+      if (newCalcProgram.isTrivial) {
+        call.transformTo(newScan)
+      } else {
+        val newCalc = new DataSetCalc(
+          calc.getCluster,
+          calc.getTraitSet,
+          newScan,
+          calc.getRowType,
+          newCalcProgram,
+          description)
+        call.transformTo(newCalc)
+      }
+    }
+  }
+}
+
+object PushProjectIntoBatchTableSourceScanRule {
+  val INSTANCE: RelOptRule = new PushProjectIntoBatchTableSourceScanRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
new file mode 100644
index 0000000..bf8a18e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.Alias
+import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamAggregate, DataStreamConvention}
+
+import scala.collection.JavaConversions._
+
+class DataStreamAggregateRule
+  extends ConverterRule(
+      classOf[LogicalWindowAggregate],
+      Convention.NONE,
+      DataStreamConvention.INSTANCE,
+      "DataStreamAggregateRule")
+  {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val agg: LogicalWindowAggregate = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+    // check if we have distinct aggregates
+    val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
+    if (distinctAggs) {
+      throw TableException("DISTINCT aggregates are currently not supported.")
+    }
+
+    // check if we have grouping sets
+    val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
+    if (groupSets || agg.indicator) {
+      throw TableException("GROUPING SETS are currently not supported.")
+    }
+
+    !distinctAggs && !groupSets && !agg.indicator
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+    val agg: LogicalWindowAggregate = rel.asInstanceOf[LogicalWindowAggregate]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+    val convInput: RelNode = RelOptRule.convert(agg.getInput, DataStreamConvention.INSTANCE)
+
+    new DataStreamAggregate(
+      agg.getWindow,
+      agg.getNamedProperties,
+      rel.getCluster,
+      traitSet,
+      convInput,
+      agg.getNamedAggCalls,
+      rel.getRowType,
+      agg.getInput.getRowType,
+      agg.getGroupSet.toArray)
+    }
+  }
+
+object DataStreamAggregateRule {
+  val INSTANCE: RelOptRule = new DataStreamAggregateRule
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala
new file mode 100644
index 0000000..4e620c9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalCalc
+import org.apache.flink.table.plan.nodes.datastream.DataStreamCalc
+import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
+
+class DataStreamCalcRule
+  extends ConverterRule(
+    classOf[LogicalCalc],
+    Convention.NONE,
+    DataStreamConvention.INSTANCE,
+    "DataStreamCalcRule")
+{
+
+  def convert(rel: RelNode): RelNode = {
+    val calc: LogicalCalc = rel.asInstanceOf[LogicalCalc]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+    val convInput: RelNode = RelOptRule.convert(calc.getInput, DataStreamConvention.INSTANCE)
+
+    new DataStreamCalc(
+      rel.getCluster,
+      traitSet,
+      convInput,
+      rel.getRowType,
+      calc.getProgram,
+      description)
+  }
+}
+
+object DataStreamCalcRule {
+  val INSTANCE: RelOptRule = new DataStreamCalcRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala
new file mode 100644
index 0000000..adce9f4
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan}
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
+import org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate
+
+/**
+  * Rule to convert a LogicalCorrelate into a DataStreamCorrelate.
+  */
+class DataStreamCorrelateRule
+  extends ConverterRule(
+    classOf[LogicalCorrelate],
+    Convention.NONE,
+    DataStreamConvention.INSTANCE,
+    "DataStreamCorrelateRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate]
+    val right = join.getRight.asInstanceOf[RelSubset].getOriginal
+
+    right match {
+      // right node is a table function
+      case scan: LogicalTableFunctionScan => true
+      // a filter is pushed above the table function
+      case filter: LogicalFilter =>
+        filter
+          .getInput.asInstanceOf[RelSubset]
+          .getOriginal
+          .isInstanceOf[LogicalTableFunctionScan]
+      case _ => false
+    }
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+    val join: LogicalCorrelate = rel.asInstanceOf[LogicalCorrelate]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+    val convInput: RelNode = RelOptRule.convert(join.getInput(0), DataStreamConvention.INSTANCE)
+    val right: RelNode = join.getInput(1)
+
+    def convertToCorrelate(relNode: RelNode, condition: Option[RexNode]): DataStreamCorrelate = {
+      relNode match {
+        case rel: RelSubset =>
+          convertToCorrelate(rel.getRelList.get(0), condition)
+
+        case filter: LogicalFilter =>
+          convertToCorrelate(
+            filter.getInput.asInstanceOf[RelSubset].getOriginal,
+            Some(filter.getCondition))
+
+        case scan: LogicalTableFunctionScan =>
+          new DataStreamCorrelate(
+            rel.getCluster,
+            traitSet,
+            convInput,
+            scan,
+            condition,
+            rel.getRowType,
+            join.getRowType,
+            join.getJoinType,
+            description)
+      }
+    }
+    convertToCorrelate(right, None)
+  }
+
+}
+
+object DataStreamCorrelateRule {
+  val INSTANCE: RelOptRule = new DataStreamCorrelateRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala
new file mode 100644
index 0000000..91fd6e2
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
+import org.apache.flink.table.plan.nodes.datastream.DataStreamScan
+import org.apache.flink.table.plan.schema.DataStreamTable
+
+class DataStreamScanRule
+  extends ConverterRule(
+    classOf[LogicalTableScan],
+    Convention.NONE,
+    DataStreamConvention.INSTANCE,
+    "DataStreamScanRule")
+{
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
+    val dataSetTable = scan.getTable.unwrap(classOf[DataStreamTable[Any]])
+    dataSetTable match {
+      case _: DataStreamTable[Any] =>
+        true
+      case _ =>
+        false
+    }
+  }
+
+  def convert(rel: RelNode): RelNode = {
+    val scan: LogicalTableScan = rel.asInstanceOf[LogicalTableScan]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+
+    new DataStreamScan(
+      rel.getCluster,
+      traitSet,
+      scan.getTable,
+      rel.getRowType
+    )
+  }
+}
+
+object DataStreamScanRule {
+  val INSTANCE: RelOptRule = new DataStreamScanRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala
new file mode 100644
index 0000000..475c050
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalUnion
+import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
+import org.apache.flink.table.plan.nodes.datastream.DataStreamUnion
+
+class DataStreamUnionRule
+  extends ConverterRule(
+    classOf[LogicalUnion],
+    Convention.NONE,
+    DataStreamConvention.INSTANCE,
+    "DataStreamUnionRule")
+{
+
+  def convert(rel: RelNode): RelNode = {
+    val union: LogicalUnion = rel.asInstanceOf[LogicalUnion]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+    val convLeft: RelNode = RelOptRule.convert(union.getInput(0), DataStreamConvention.INSTANCE)
+    val convRight: RelNode = RelOptRule.convert(union.getInput(1), DataStreamConvention.INSTANCE)
+
+    new DataStreamUnion(
+      rel.getCluster,
+      traitSet,
+      convLeft,
+      convRight,
+      rel.getRowType)
+  }
+}
+
+object DataStreamUnionRule {
+  val INSTANCE: RelOptRule = new DataStreamUnionRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala
new file mode 100644
index 0000000..db33842
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalValues
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamValues, DataStreamConvention}
+
+class DataStreamValuesRule
+  extends ConverterRule(
+    classOf[LogicalValues],
+    Convention.NONE,
+    DataStreamConvention.INSTANCE,
+    "DataStreamValuesRule")
+{
+
+  def convert(rel: RelNode): RelNode = {
+
+    val values: LogicalValues = rel.asInstanceOf[LogicalValues]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+
+    new DataStreamValues(
+      rel.getCluster,
+      traitSet,
+      rel.getRowType,
+      values.getTuples,
+      description)
+  }
+}
+
+object DataStreamValuesRule {
+  val INSTANCE: RelOptRule = new DataStreamValuesRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
new file mode 100644
index 0000000..296c86b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.flink.table.plan.nodes.datastream.
+  {StreamTableSourceScan, DataStreamConvention}
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.sources.StreamTableSource
+
+/** Rule to convert a [[LogicalTableScan]] into a [[StreamTableSourceScan]]. */
+class StreamTableSourceScanRule
+  extends ConverterRule(
+    classOf[LogicalTableScan],
+    Convention.NONE,
+    DataStreamConvention.INSTANCE,
+    "StreamTableSourceScanRule")
+{
+
+  /** Rule must only match if TableScan targets a [[StreamTableSource]] */
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
+    val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable])
+    dataSetTable match {
+      case tst: TableSourceTable =>
+        tst.tableSource match {
+          case _: StreamTableSource[_] =>
+            true
+          case _ =>
+            false
+        }
+      case _ =>
+        false
+    }
+  }
+
+  def convert(rel: RelNode): RelNode = {
+    val scan: LogicalTableScan = rel.asInstanceOf[LogicalTableScan]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+
+    // The original registered table source
+    val table: TableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable])
+    val tableSource: StreamTableSource[_] = table.tableSource.asInstanceOf[StreamTableSource[_]]
+
+    new StreamTableSourceScan(
+      rel.getCluster,
+      traitSet,
+      scan.getTable,
+      tableSource
+    )
+  }
+}
+
+object StreamTableSourceScanRule {
+  val INSTANCE: RelOptRule = new StreamTableSourceScanRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractor.scala
new file mode 100644
index 0000000..129cfd1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractor.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.util
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex._
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+object RexProgramProjectExtractor {
+
+  /**
+    * Extracts the indexes of input fields accessed by the RexProgram.
+    *
+    * @param rexProgram The RexProgram to analyze
+    * @return The indexes of accessed input fields
+    */
+  def extractRefInputFields(rexProgram: RexProgram): Array[Int] = {
+    val visitor = new RefFieldsVisitor
+    // extract input fields from project expressions
+    rexProgram.getProjectList.foreach(exp => rexProgram.expandLocalRef(exp).accept(visitor))
+    val condition = rexProgram.getCondition
+    // extract input fields from condition expression
+    if (condition != null) {
+      rexProgram.expandLocalRef(condition).accept(visitor)
+    }
+    visitor.getFields
+  }
+
+  /**
+    * Generates a new RexProgram based on mapped input fields.
+    *
+    * @param rexProgram      original RexProgram
+    * @param inputRowType    input row type
+    * @param usedInputFields indexes of used input fields
+    * @param rexBuilder      builder for Rex expressions
+    *
+    * @return A RexProgram with mapped input field expressions.
+    */
+  def rewriteRexProgram(
+      rexProgram: RexProgram,
+      inputRowType: RelDataType,
+      usedInputFields: Array[Int],
+      rexBuilder: RexBuilder): RexProgram = {
+
+    val inputRewriter = new InputRewriter(usedInputFields)
+    val newProjectExpressions = rexProgram.getProjectList.map(
+      exp => rexProgram.expandLocalRef(exp).accept(inputRewriter)
+    ).toList.asJava
+
+    val oldCondition = rexProgram.getCondition
+    val newConditionExpression = {
+      oldCondition match {
+        case ref: RexLocalRef => rexProgram.expandLocalRef(ref).accept(inputRewriter)
+        case _ => null // null does not match any type
+      }
+    }
+    RexProgram.create(
+      inputRowType,
+      newProjectExpressions,
+      newConditionExpression,
+      rexProgram.getOutputRowType,
+      rexBuilder
+    )
+  }
+}
+
+/**
+  * A RexVisitor to extract used input fields
+  */
+class RefFieldsVisitor extends RexVisitorImpl[Unit](true) {
+  private var fields = mutable.LinkedHashSet[Int]()
+
+  def getFields: Array[Int] = fields.toArray
+
+  override def visitInputRef(inputRef: RexInputRef): Unit = fields += inputRef.getIndex
+
+  override def visitCall(call: RexCall): Unit =
+    call.operands.foreach(operand => operand.accept(this))
+}
+
+/**
+  * A RexShuttle to rewrite field accesses of a RexProgram.
+  *
+  * @param fields fields mapping
+  */
+class InputRewriter(fields: Array[Int]) extends RexShuttle {
+
+  /** old input fields ref index -> new input fields ref index mappings */
+  private val fieldMap: Map[Int, Int] =
+    fields.zipWithIndex.toMap
+
+  override def visitInputRef(inputRef: RexInputRef): RexNode =
+    new RexInputRef(relNodeIndex(inputRef), inputRef.getType)
+
+  override def visitLocalRef(localRef: RexLocalRef): RexNode =
+    new RexInputRef(relNodeIndex(localRef), localRef.getType)
+
+  private def relNodeIndex(ref: RexSlot): Int =
+    fieldMap.getOrElse(ref.getIndex,
+      throw new IllegalArgumentException("input field contains invalid index"))
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
new file mode 100644
index 0000000..f7d9e1d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql.`type`.ArraySqlType
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+/**
+  * Flink distinguishes between primitive arrays (int[], double[], ...) and
+  * object arrays (Integer[], MyPojo[], ...). This custom type supports both cases.
+  */
+class ArrayRelDataType(
+    val typeInfo: TypeInformation[_],
+    elementType: RelDataType,
+    isNullable: Boolean)
+  extends ArraySqlType(
+    elementType,
+    isNullable) {
+
+  override def toString = s"ARRAY($typeInfo)"
+
+  def canEqual(other: Any): Boolean = other.isInstanceOf[ArrayRelDataType]
+
+  override def equals(other: Any): Boolean = other match {
+    case that: ArrayRelDataType =>
+      super.equals(that) &&
+        (that canEqual this) &&
+        typeInfo == that.typeInfo
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    typeInfo.hashCode()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
new file mode 100644
index 0000000..92f9199
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import java.util
+
+import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFieldImpl, RelRecordType}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.schema.CompositeRelDataType.createFieldList
+
+import scala.collection.JavaConverters._
+
+/**
+  * Composite type for encapsulating Flink's [[CompositeType]].
+  *
+  * @param compositeType CompositeType to encapsulate
+  * @param typeFactory Flink's type factory
+  */
+class CompositeRelDataType(
+    val compositeType: CompositeType[_],
+    typeFactory: FlinkTypeFactory)
+  extends RelRecordType(createFieldList(compositeType, typeFactory)) {
+
+  override def toString = s"COMPOSITE($compositeType)"
+
+  def canEqual(other: Any): Boolean = other.isInstanceOf[CompositeRelDataType]
+
+  override def equals(other: Any): Boolean = other match {
+    case that: CompositeRelDataType =>
+      super.equals(that) &&
+        (that canEqual this) &&
+        compositeType == that.compositeType
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    compositeType.hashCode()
+  }
+
+}
+
+object CompositeRelDataType {
+
+  /**
+    * Converts the fields of a composite type to list of [[RelDataTypeField]].
+    */
+  private def createFieldList(
+      compositeType: CompositeType[_],
+      typeFactory: FlinkTypeFactory)
+    : util.List[RelDataTypeField] = {
+
+    compositeType
+      .getFieldNames
+      .zipWithIndex
+      .map { case (name, index) =>
+        new RelDataTypeFieldImpl(
+          name,
+          index,
+          typeFactory.createTypeFromTypeInfo(compositeType.getTypeAt(index)))
+            .asInstanceOf[RelDataTypeField]
+      }
+      .toList
+      .asJava
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala
new file mode 100644
index 0000000..f8c6835
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import java.lang.Double
+import java.util
+import java.util.Collections
+
+import org.apache.calcite.rel.{RelCollation, RelDistribution}
+import org.apache.calcite.schema.Statistic
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.api.java.DataSet
+
+class DataSetTable[T](
+    val dataSet: DataSet[T],
+    override val fieldIndexes: Array[Int],
+    override val fieldNames: Array[String])
+  extends FlinkTable[T](dataSet.getType, fieldIndexes, fieldNames) {
+
+  override def getStatistic: Statistic = {
+    new DefaultDataSetStatistic
+  }
+
+}
+
+class DefaultDataSetStatistic extends Statistic {
+
+  override def getRowCount: Double = 1000d
+
+  override def getCollations: util.List[RelCollation] = Collections.emptyList()
+
+  override def isKey(columns: ImmutableBitSet): Boolean = false
+
+  override def getDistribution: RelDistribution = null
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
new file mode 100644
index 0000000..0355fac
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.flink.streaming.api.datastream.DataStream
+
+class DataStreamTable[T](
+    val dataStream: DataStream[T],
+    override val fieldIndexes: Array[Int],
+    override val fieldNames: Array[String])
+  extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames) {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
new file mode 100644
index 0000000..8bb5c81
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+
+abstract class FlinkTable[T](
+    val typeInfo: TypeInformation[T],
+    val fieldIndexes: Array[Int],
+    val fieldNames: Array[String])
+  extends AbstractTable {
+
+  if (fieldIndexes.length != fieldNames.length) {
+    throw new TableException(
+      "Number of field indexes and field names must be equal.")
+  }
+
+  // check uniqueness of field names
+  if (fieldNames.length != fieldNames.toSet.size) {
+    throw new TableException(
+      "Table field names must be unique.")
+  }
+
+  val fieldTypes: Array[TypeInformation[_]] =
+    typeInfo match {
+      case cType: CompositeType[T] =>
+        if (fieldNames.length != cType.getArity) {
+          throw new TableException(
+          s"Arity of type (" + cType.getFieldNames.deep + ") " +
+            "not equal to number of field names " + fieldNames.deep + ".")
+        }
+        fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]])
+      case aType: AtomicType[T] =>
+        if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) {
+          throw new TableException(
+            "Non-composite input type may have only a single field and its index must be 0.")
+        }
+        Array(aType)
+    }
+
+  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
+    val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
+    flinkTypeFactory.buildRowDataType(fieldNames, fieldTypes)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
new file mode 100644
index 0000000..1c05883
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.schema
+
+import java.lang.reflect.{Method, Type}
+import java.util
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.TableFunction
+import org.apache.calcite.schema.impl.ReflectiveFunctionBase
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+
+/**
+  * This is heavily inspired by Calcite's [[org.apache.calcite.schema.impl.TableFunctionImpl]].
+  * We need it in order to create a [[org.apache.flink.table.functions.utils.TableSqlFunction]].
+  * The main difference is that we override the [[getRowType()]] and [[getElementType()]].
+  */
+class FlinkTableFunctionImpl[T](
+    val typeInfo: TypeInformation[T],
+    val fieldIndexes: Array[Int],
+    val fieldNames: Array[String],
+    val evalMethod: Method)
+  extends ReflectiveFunctionBase(evalMethod)
+  with TableFunction {
+
+  if (fieldIndexes.length != fieldNames.length) {
+    throw new TableException(
+      "Number of field indexes and field names must be equal.")
+  }
+
+  // check uniqueness of field names
+  if (fieldNames.length != fieldNames.toSet.size) {
+    throw new TableException(
+      "Table field names must be unique.")
+  }
+
+  val fieldTypes: Array[TypeInformation[_]] =
+    typeInfo match {
+      case cType: CompositeType[T] =>
+        if (fieldNames.length != cType.getArity) {
+          throw new TableException(
+            s"Arity of type (" + cType.getFieldNames.deep + ") " +
+              "not equal to number of field names " + fieldNames.deep + ".")
+        }
+        fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]])
+      case aType: AtomicType[T] =>
+        if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) {
+          throw new TableException(
+            "Non-composite input type may have only a single field and its index must be 0.")
+        }
+        Array(aType)
+    }
+
+  override def getElementType(arguments: util.List[AnyRef]): Type = classOf[Array[Object]]
+
+  override def getRowType(typeFactory: RelDataTypeFactory,
+                          arguments: util.List[AnyRef]): RelDataType = {
+    val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
+    val builder = flinkTypeFactory.builder
+    fieldNames
+      .zip(fieldTypes)
+      .foreach { f =>
+        builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2)).nullable(true)
+      }
+    builder.build
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
new file mode 100644
index 0000000..d93908b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.calcite.FlinkTypeSystem
+
+/**
+  * Generic type for encapsulating Flink's [[TypeInformation]].
+  *
+  * @param typeInfo TypeInformation to encapsulate
+  * @param typeSystem Flink's type system
+  */
+class GenericRelDataType(
+    val typeInfo: TypeInformation[_],
+    typeSystem: FlinkTypeSystem)
+  extends BasicSqlType(
+    typeSystem,
+    SqlTypeName.ANY) {
+
+  override def toString = s"ANY($typeInfo)"
+
+  def canEqual(other: Any): Boolean = other.isInstanceOf[GenericRelDataType]
+
+  override def equals(other: Any): Boolean = other match {
+    case that: GenericRelDataType =>
+      super.equals(that) &&
+        (that canEqual this) &&
+        typeInfo == that.typeInfo
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    typeInfo.hashCode()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RelTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RelTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RelTable.scala
new file mode 100644
index 0000000..30052a8
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RelTable.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.plan.RelOptTable
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.Schema.TableType
+import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.calcite.schema.TranslatableTable
+
+/**
+ * A [[org.apache.calcite.schema.Table]] implementation for registering
+ * Table API Tables in the Calcite schema to be used by Flink SQL.
+ * It implements [[TranslatableTable]] so that its logical scan
+ * can be converted to a relational expression.
+ *
+ * @see [[DataSetTable]]
+ */
+class RelTable(relNode: RelNode) extends AbstractTable with TranslatableTable {
+
+  override def getJdbcTableType: TableType = ???
+
+  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = relNode.getRowType
+
+  override def toRel(context: ToRelContext, relOptTable: RelOptTable): RelNode = {
+    relNode
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
new file mode 100644
index 0000000..0f55daf
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.sources.TableSource
+import org.apache.flink.types.Row
+
+/** Table which defines an external table via a [[TableSource]] */
+class TableSourceTable(val tableSource: TableSource[_])
+  extends FlinkTable[Row](
+    typeInfo = new RowTypeInfo(tableSource.getFieldTypes: _*),
+    fieldIndexes = 0.until(tableSource.getNumberOfFields).toArray,
+    fieldNames = tableSource.getFieldsNames)

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CountPartitionFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CountPartitionFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CountPartitionFunction.scala
new file mode 100644
index 0000000..9fe4ec3
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CountPartitionFunction.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichMapPartitionFunction
+import org.apache.flink.util.Collector
+
+class CountPartitionFunction[IN] extends RichMapPartitionFunction[IN, (Int, Long)] {
+
+  override def mapPartition(value: Iterable[IN], out: Collector[(Int, Long)]): Unit = {
+    val partitionIndex = getRuntimeContext.getIndexOfThisSubtask
+    var elementCount = 0L
+    val iterator = value.iterator()
+    while (iterator.hasNext) {
+      if (elementCount != Long.MaxValue) { // prevent overflow
+        elementCount += 1L
+      }
+      iterator.next()
+    }
+    out.collect(partitionIndex, elementCount)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala
new file mode 100644
index 0000000..715848d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.{FlatJoinFunction, RichFlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class FlatJoinRunner[IN1, IN2, OUT](
+    name: String,
+    code: String,
+    @transient returnType: TypeInformation[OUT])
+  extends RichFlatJoinFunction[IN1, IN2, OUT]
+  with ResultTypeQueryable[OUT]
+  with Compiler[FlatJoinFunction[IN1, IN2, OUT]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var function: FlatJoinFunction[IN1, IN2, OUT] = null
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling FlatJoinFunction: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating FlatJoinFunction.")
+    function = clazz.newInstance()
+  }
+
+  override def join(first: IN1, second: IN2, out: Collector[OUT]): Unit =
+    function.join(first, second, out)
+
+  override def getProducedType: TypeInformation[OUT] = returnType
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
new file mode 100644
index 0000000..a7bd980
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class FlatMapRunner[IN, OUT](
+    name: String,
+    code: String,
+    @transient returnType: TypeInformation[OUT])
+  extends RichFlatMapFunction[IN, OUT]
+  with ResultTypeQueryable[OUT]
+  with Compiler[FlatMapFunction[IN, OUT]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var function: FlatMapFunction[IN, OUT] = _
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling FlatMapFunction: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating FlatMapFunction.")
+    function = clazz.newInstance()
+  }
+
+  override def flatMap(in: IN, out: Collector[OUT]): Unit =
+    function.flatMap(in, out)
+
+  override def getProducedType: TypeInformation[OUT] = returnType
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/IntersectCoGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/IntersectCoGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/IntersectCoGroupFunction.scala
new file mode 100644
index 0000000..39d2914
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/IntersectCoGroupFunction.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.functions.CoGroupFunction
+import org.apache.flink.util.Collector
+
+class IntersectCoGroupFunction[T](all: Boolean) extends CoGroupFunction[T, T, T]{
+  override def coGroup(first: JIterable[T], second: JIterable[T], out: Collector[T]): Unit = {
+    if (first == null || second == null) return
+    val leftIter = first.iterator()
+    val rightIter = second.iterator()
+    if (all) {
+      while (leftIter.hasNext && rightIter.hasNext) {
+        out.collect(leftIter.next)
+        rightIter.next
+      }
+    } else {
+      if (leftIter.hasNext && rightIter.hasNext) {
+        out.collect(leftIter.next)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/LimitFilterFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/LimitFilterFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/LimitFilterFunction.scala
new file mode 100644
index 0000000..8441245
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/LimitFilterFunction.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.configuration.Configuration
+
+import scala.collection.JavaConverters._
+
+
+class LimitFilterFunction[T](
+    limitStart: Long,
+    limitEnd: Long,
+    broadcastName: String)
+  extends RichFilterFunction[T] {
+
+  var partitionIndex: Int = _
+  var elementCount: Long = _
+  var countList: Array[Long] = _
+
+  override def open(config: Configuration) {
+    partitionIndex = getRuntimeContext.getIndexOfThisSubtask
+
+    val countPartitionResult = getRuntimeContext
+      .getBroadcastVariable[(Int, Long)](broadcastName)
+      .asScala
+
+    // sort by partition index, extract number per partition, sum with intermediate results
+    countList = countPartitionResult.sortWith(_._1 < _._1).map(_._2).scanLeft(0L) { case (a, b) =>
+        val sum = a + b
+        if (sum < 0L) { // prevent overflow
+          Long.MaxValue
+        }
+        sum
+    }.toArray
+
+    elementCount = 0
+  }
+
+  override def filter(value: T): Boolean = {
+    if (elementCount != Long.MaxValue) { // prevent overflow
+      elementCount += 1L
+    }
+    // we filter out records that are not within the limit (Long.MaxValue is unlimited)
+    limitStart - countList(partitionIndex) < elementCount &&
+      (limitEnd == Long.MaxValue || limitEnd - countList(partitionIndex) >= elementCount)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala
new file mode 100644
index 0000000..cf32404
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.util.Collector
+
+class MapJoinLeftRunner[IN1, IN2, OUT](
+    name: String,
+    code: String,
+    @transient returnType: TypeInformation[OUT],
+    broadcastSetName: String)
+  extends MapSideJoinRunner[IN1, IN2, IN2, IN1, OUT](name, code, returnType, broadcastSetName) {
+
+  override def flatMap(multiInput: IN1, out: Collector[OUT]): Unit =
+    function.join(multiInput, singleInput, out)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala
new file mode 100644
index 0000000..c4bc0d1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.util.Collector
+
+class MapJoinRightRunner[IN1, IN2, OUT](
+    name: String,
+    code: String,
+    @transient returnType: TypeInformation[OUT],
+    broadcastSetName: String)
+  extends MapSideJoinRunner[IN1, IN2, IN1, IN2, OUT](name, code, returnType, broadcastSetName) {
+
+  override def flatMap(multiInput: IN2, out: Collector[OUT]): Unit =
+    function.join(singleInput, multiInput, out)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
new file mode 100644
index 0000000..51e2fc5
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.configuration.Configuration
+import org.slf4j.LoggerFactory
+
+class MapRunner[IN, OUT](
+    name: String,
+    code: String,
+    @transient returnType: TypeInformation[OUT])
+  extends RichMapFunction[IN, OUT]
+  with ResultTypeQueryable[OUT]
+  with Compiler[MapFunction[IN, OUT]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var function: MapFunction[IN, OUT] = null
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating MapFunction.")
+    function = clazz.newInstance()
+  }
+
+  override def map(in: IN): OUT =
+    function.map(in)
+
+  override def getProducedType: TypeInformation[OUT] = returnType
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala
new file mode 100644
index 0000000..f12590f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.{FlatJoinFunction, RichFlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.configuration.Configuration
+import org.slf4j.LoggerFactory
+
+abstract class MapSideJoinRunner[IN1, IN2, SINGLE_IN, MULTI_IN, OUT](
+    name: String,
+    code: String,
+    @transient returnType: TypeInformation[OUT],
+    broadcastSetName: String)
+  extends RichFlatMapFunction[MULTI_IN, OUT]
+    with ResultTypeQueryable[OUT]
+    with Compiler[FlatJoinFunction[IN1, IN2, OUT]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  protected var function: FlatJoinFunction[IN1, IN2, OUT] = _
+  protected var singleInput: SINGLE_IN = _
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling FlatJoinFunction: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating FlatJoinFunction.")
+    function = clazz.newInstance()
+    singleInput = getRuntimeContext.getBroadcastVariable(broadcastSetName).get(0)
+  }
+
+  override def getProducedType: TypeInformation[OUT] = returnType
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MinusCoGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MinusCoGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MinusCoGroupFunction.scala
new file mode 100644
index 0000000..41679ee
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MinusCoGroupFunction.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.CoGroupFunction
+import org.apache.flink.util.Collector
+
+class MinusCoGroupFunction[T](all: Boolean) extends CoGroupFunction[T, T, T] {
+  override def coGroup(first: Iterable[T], second: Iterable[T], out: Collector[T]): Unit = {
+    if (first == null || second == null) return
+    val leftIter = first.iterator
+    val rightIter = second.iterator
+
+    if (all) {
+      while (rightIter.hasNext && leftIter.hasNext) {
+        leftIter.next()
+        rightIter.next()
+      }
+
+      while (leftIter.hasNext) {
+        out.collect(leftIter.next())
+      }
+    } else {
+      if (!rightIter.hasNext && leftIter.hasNext) {
+        out.collect(leftIter.next())
+      }
+    }
+  }
+}
+