You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2017/03/17 10:03:06 UTC

[4/4] flink git commit: [FLINK-3849] [table] Add FilterableTableSource interface and rules for pushing it (2)

[FLINK-3849] [table] Add FilterableTableSource interface and rules for pushing it (2)

This closes #3520.

fix compilation failure

fix compilation failure again.

1. Deep copy TableSource when we copy TableSourceScan
2. unify push project into scan rule for both batch and stream

address comments.

expand project list before creating new RexProgram

update tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/78f22aae
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/78f22aae
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/78f22aae

Branch: refs/heads/master
Commit: 78f22aaec9bd7d39fdec8477335e5c9247d42030
Parents: 9f6cd2e
Author: Kurt Young <yk...@gmail.com>
Authored: Mon Mar 13 15:30:13 2017 +0800
Committer: Kurt Young <ku...@apache.org>
Committed: Fri Mar 17 18:01:50 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/KafkaTableSource.java      |   5 +
 .../flink/addons/hbase/HBaseTableSource.java    |   7 +-
 .../flink/table/api/BatchTableEnvironment.scala |   2 +-
 .../table/api/StreamTableEnvironment.scala      |   2 +-
 .../flink/table/api/TableEnvironment.scala      |  12 -
 .../flink/table/calcite/RexNodeWrapper.scala    | 106 ------
 .../flink/table/plan/nodes/CommonCalc.scala     |   3 +-
 .../table/plan/nodes/TableSourceScan.scala      |  63 ++++
 .../table/plan/nodes/dataset/BatchScan.scala    |  21 +-
 .../nodes/dataset/BatchTableSourceScan.scala    |  47 +--
 .../table/plan/nodes/dataset/DataSetCalc.scala  |  23 +-
 .../table/plan/nodes/dataset/DataSetScan.scala  |  14 +-
 .../plan/nodes/datastream/DataStreamCalc.scala  |  24 +-
 .../plan/nodes/datastream/DataStreamScan.scala  |   8 +-
 .../plan/nodes/datastream/StreamScan.scala      |  10 +-
 .../datastream/StreamTableSourceScan.scala      |  52 +--
 .../PushFilterIntoTableSourceScanRuleBase.scala | 104 ++++++
 ...PushProjectIntoTableSourceScanRuleBase.scala |  57 +++
 ...PushFilterIntoBatchTableSourceScanRule.scala |  58 +---
 ...ushProjectIntoBatchTableSourceScanRule.scala |  48 +--
 ...ushFilterIntoStreamTableSourceScanRule.scala |  58 +---
 ...shProjectIntoStreamTableSourceScanRule.scala |  40 +--
 .../table/plan/schema/TableSourceTable.scala    |   1 -
 .../util/RexProgramExpressionExtractor.scala    | 163 ---------
 .../table/plan/util/RexProgramExtractor.scala   | 183 ++++++++++
 .../plan/util/RexProgramProjectExtractor.scala  | 120 -------
 .../table/plan/util/RexProgramRewriter.scala    |  91 +++++
 .../table/sources/FilterableTableSource.scala   |  38 +-
 .../table/sources/ProjectableTableSource.scala  |   9 +-
 .../flink/table/sources/TableSource.scala       |   2 +
 .../flink/table/validate/FunctionCatalog.scala  |   5 +-
 .../apache/flink/table/TableSourceTest.scala    | 170 ++++++---
 .../api/scala/batch/TableSourceITCase.scala     |   4 +-
 .../api/scala/stream/TableSourceITCase.scala    |   4 +-
 .../expressions/utils/ExpressionTestBase.scala  |   4 +-
 .../RexProgramExpressionExtractorTest.scala     | 182 ----------
 .../plan/util/RexProgramExtractorTest.scala     | 346 +++++++++++++++++++
 .../util/RexProgramProjectExtractorTest.scala   | 121 -------
 .../plan/util/RexProgramRewriterTest.scala      |  62 ++++
 .../table/plan/util/RexProgramTestBase.scala    |  80 +++++
 .../flink/table/utils/CommonTestData.scala      | 122 +------
 .../table/utils/MockTableEnvironment.scala      |  39 +++
 .../table/utils/TestFilterableTableSource.scala | 134 +++++++
 43 files changed, 1452 insertions(+), 1192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
index dd32bdd..506358d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -137,4 +137,9 @@ public abstract class KafkaTableSource implements StreamTableSource<Row> {
 	protected DeserializationSchema<Row> getDeserializationSchema() {
 		return deserializationSchema;
 	}
+
+	@Override
+	public String explainSource() {
+		return "";
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
index a1be23f..f709212 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
@@ -108,7 +108,7 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable
 	}
 
 	@Override
-	public ProjectableTableSource<Row> projectFields(int[] fields) {
+	public HBaseTableSource projectFields(int[] fields) {
 		String[] famNames = schema.getFamilyNames();
 		HBaseTableSource newTableSource = new HBaseTableSource(this.conf, tableName);
 		// Extract the family from the given fields
@@ -122,4 +122,9 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable
 		}
 		return newTableSource;
 	}
+
+	@Override
+	public String explainSource() {
+		return "";
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 7f27357..b48e9f9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -95,7 +95,7 @@ abstract class BatchTableEnvironment(
 
     tableSource match {
       case batchTableSource: BatchTableSource[_] =>
-        registerTableInternal(name, new TableSourceTable(batchTableSource, this))
+        registerTableInternal(name, new TableSourceTable(batchTableSource))
       case _ =>
         throw new TableException("Only BatchTableSource can be registered in " +
             "BatchTableEnvironment")

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 7e9f38f..d927c3a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -136,7 +136,7 @@ abstract class StreamTableEnvironment(
 
     tableSource match {
       case streamTableSource: StreamTableSource[_] =>
-        registerTableInternal(name, new TableSourceTable(streamTableSource, this))
+        registerTableInternal(name, new TableSourceTable(streamTableSource))
       case _ =>
         throw new TableException("Only StreamTableSource can be registered in " +
             "StreamTableEnvironment")

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 291f49f..1dda3a8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -842,18 +842,6 @@ object TableEnvironment {
   }
 
   /**
-    * Returns field types for a given [[TableSource]].
-    *
-    * @param tableSource The TableSource to extract field types from.
-    * @tparam A The type of the TableSource.
-    * @return An array holding the field types.
-    */
-  def getFieldTypes[A](tableSource: TableSource[A]): Array[TypeInformation[_]] = {
-    val returnType = tableSource.getReturnType
-    TableEnvironment.getFieldTypes(returnType)
-  }
-
-  /**
     * Returns field names for a given [[TableSource]].
     *
     * @param tableSource The TableSource to extract field names from.

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RexNodeWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RexNodeWrapper.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RexNodeWrapper.scala
deleted file mode 100644
index 1926a67..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RexNodeWrapper.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.calcite
-
-import org.apache.calcite.rex._
-import org.apache.calcite.sql._
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference}
-import org.apache.flink.table.validate.FunctionCatalog
-import org.apache.flink.table.calcite.RexNodeWrapper._
-
-abstract class RexNodeWrapper(rex: RexNode) {
-  def get: RexNode = rex
-  def toExpression(names: Map[RexInputRef, String]): Expression
-}
-
-case class RexLiteralWrapper(literal: RexLiteral) extends RexNodeWrapper(literal) {
-  override def toExpression(names: Map[RexInputRef, String]): Expression = {
-    val typeInfo = FlinkTypeFactory.toTypeInfo(literal.getType)
-    Literal(literal.getValue, typeInfo)
-  }
-}
-
-case class RexInputWrapper(input: RexInputRef) extends RexNodeWrapper(input) {
-  override def toExpression(names: Map[RexInputRef, String]): Expression = {
-    val typeInfo = FlinkTypeFactory.toTypeInfo(input.getType)
-    ResolvedFieldReference(names(input), typeInfo)
-  }
-}
-
-case class RexCallWrapper(
-    call: RexCall,
-    operands: Seq[RexNodeWrapper]) extends RexNodeWrapper(call) {
-
-  override def toExpression(names: Map[RexInputRef, String]): Expression = {
-    val ops = operands.map(_.toExpression(names))
-    call.op match {
-      case function: SqlFunction =>
-        lookupFunction(replace(function.getName), ops)
-      case postfix: SqlPostfixOperator =>
-        lookupFunction(replace(postfix.getName), ops)
-      case operator@_ =>
-        val name = replace(s"${operator.kind}")
-        lookupFunction(name, ops)
-    }
-  }
-
-  def replace(str: String): String = {
-    str.replaceAll("\\s|_", "")
-  }
-}
-
-object RexNodeWrapper {
-
-  private var catalog: Option[FunctionCatalog] = None
-
-  def wrap(rex: RexNode, functionCatalog: FunctionCatalog): RexNodeWrapper = {
-    catalog = Option(functionCatalog)
-    rex.accept(new WrapperVisitor)
-  }
-
-  private[table] def lookupFunction(name: String, operands: Seq[Expression]): Expression = {
-    catalog.getOrElse(throw TableException("FunctionCatalog was not defined"))
-      .lookupFunction(name, operands)
-  }
-}
-
-class WrapperVisitor extends RexVisitorImpl[RexNodeWrapper](true) {
-
-  override def visitInputRef(inputRef: RexInputRef): RexNodeWrapper = {
-    RexInputWrapper(inputRef)
-  }
-
-  override def visitLiteral(literal: RexLiteral): RexNodeWrapper = {
-    RexLiteralWrapper(literal)
-  }
-
-  override def visitLocalRef(localRef: RexLocalRef): RexNodeWrapper = {
-    localRef.accept(this)
-  }
-
-  override def visitCall(call: RexCall): RexNodeWrapper = {
-    val operands = for {
-      x <- 0 until call.operands.size()
-    } yield {
-      call.operands.get(x).accept(this)
-    }
-    RexCallWrapper(call, operands)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
index 8b07aac..bc25140 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
@@ -167,7 +167,8 @@ trait CommonCalc {
       case _ => true
     }
 
-    planner.getCostFactory.makeCost(rowCnt, rowCnt * compCnt, 0)
+    val newRowCnt = estimateRowCount(calcProgram, rowCnt)
+    planner.getCostFactory.makeCost(newRowCnt, newRowCnt * compCnt, 0)
   }
 
   private[flink] def estimateRowCount(

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/TableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/TableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/TableSourceScan.scala
new file mode 100644
index 0000000..e0f7786
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/TableSourceScan.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptTable, RelTraitSet}
+import org.apache.calcite.rel.RelWriter
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.TableScan
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.sources.TableSource
+
+import scala.collection.JavaConverters._
+
+abstract class TableSourceScan(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    table: RelOptTable,
+    val tableSource: TableSource[_])
+  extends TableScan(cluster, traitSet, table) {
+
+  override def deriveRowType(): RelDataType = {
+    val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+    flinkTypeFactory.buildRowDataType(
+      TableEnvironment.getFieldNames(tableSource),
+      TableEnvironment.getFieldTypes(tableSource.getReturnType))
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    val terms = super.explainTerms(pw)
+        .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", "))
+
+    val sourceDesc = tableSource.explainSource()
+    if (sourceDesc.nonEmpty) {
+      terms.item("source", sourceDesc)
+    } else {
+      terms
+    }
+  }
+
+  override def toString: String = {
+    s"Source(from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
+  }
+
+  def copy(traitSet: RelTraitSet, tableSource: TableSource[_]): TableSourceScan
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
index 09262a6..b39b8ed 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
@@ -18,9 +18,6 @@
 
 package org.apache.flink.table.plan.nodes.dataset
 
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.core.TableScan
-import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.FlinkTypeFactory
@@ -31,23 +28,7 @@ import org.apache.flink.types.Row
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
-abstract class BatchScan(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    table: RelOptTable)
-  extends TableScan(cluster, traitSet, table)
-  with CommonScan
-  with DataSetRel {
-
-  override def toString: String = {
-    s"Source(from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
-  }
-
-  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
-
-    val rowCnt = metadata.getRowCount(this)
-    planner.getCostFactory.makeCost(rowCnt, rowCnt, 0)
-  }
+trait BatchScan extends CommonScan with DataSetRel {
 
   protected def convertToInternalRow(
       input: DataSet[Any],

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
index 11f595c..a9784e2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
@@ -19,33 +19,25 @@
 package org.apache.flink.table.plan.nodes.dataset
 
 import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{RelNode, RelWriter}
-import org.apache.calcite.rex.RexNode
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.table.api.{BatchTableEnvironment, TableEnvironment}
-import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.plan.nodes.TableSourceScan
 import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.sources.{BatchTableSource, TableSource}
 import org.apache.flink.types.Row
-import org.apache.flink.table.sources.BatchTableSource
 
 /** Flink RelNode to read data from an external source defined by a [[BatchTableSource]]. */
 class BatchTableSourceScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     table: RelOptTable,
-    val tableSource: BatchTableSource[_],
-    filterCondition: RexNode = null)
-  extends BatchScan(cluster, traitSet, table) {
+    tableSource: BatchTableSource[_])
+  extends TableSourceScan(cluster, traitSet, table, tableSource)
+  with BatchScan {
 
-  override def deriveRowType() = {
-    val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-    flinkTypeFactory.buildRowDataType(
-      TableEnvironment.getFieldNames(tableSource),
-      TableEnvironment.getFieldTypes(tableSource.getReturnType))
-  }
-
-  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+  override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
     val rowCnt = metadata.getRowCount(this)
     planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType))
   }
@@ -55,27 +47,22 @@ class BatchTableSourceScan(
       cluster,
       traitSet,
       getTable,
-      tableSource,
-      filterCondition
+      tableSource
     )
   }
 
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    val terms = super.explainTerms(pw)
-      .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", "))
-      if (filterCondition != null) {
-        import scala.collection.JavaConverters._
-        val fieldNames = getTable.getRowType.getFieldNames.asScala.toList
-        terms.item("filter", getExpressionString(filterCondition, fieldNames, None))
-      }
-    terms
+  override def copy(traitSet: RelTraitSet, newTableSource: TableSource[_]): TableSourceScan = {
+    new BatchTableSourceScan(
+      cluster,
+      traitSet,
+      getTable,
+      newTableSource.asInstanceOf[BatchTableSource[_]]
+    )
   }
 
   override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
-
     val config = tableEnv.getConfig
     val inputDataSet = tableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]]
-
-    convertToInternalRow(inputDataSet, new TableSourceTable(tableSource, tableEnv), config)
+    convertToInternalRow(inputDataSet, new TableSourceTable(tableSource), config)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
index 972e45b..e05b5a8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
@@ -20,8 +20,9 @@ package org.apache.flink.table.plan.nodes.dataset
 
 import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Calc
 import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rex._
 import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.api.java.DataSet
@@ -40,35 +41,29 @@ class DataSetCalc(
     traitSet: RelTraitSet,
     input: RelNode,
     rowRelDataType: RelDataType,
-    private[flink] val calcProgram: RexProgram, // for tests
+    calcProgram: RexProgram,
     ruleDescription: String)
-  extends SingleRel(cluster, traitSet, input)
+  extends Calc(cluster, traitSet, input, calcProgram)
   with CommonCalc
   with DataSetRel {
 
-  override def deriveRowType() = rowRelDataType
+  override def deriveRowType(): RelDataType = rowRelDataType
 
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
-    new DataSetCalc(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      getRowType,
-      calcProgram,
-      ruleDescription)
+  override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = {
+    new DataSetCalc(cluster, traitSet, child, getRowType, program, ruleDescription)
   }
 
   override def toString: String = calcToString(calcProgram, getExpressionString)
 
   override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw)
+    pw.input("input", getInput)
       .item("select", selectionToString(calcProgram, getExpressionString))
       .itemIf("where",
         conditionToString(calcProgram, getExpressionString),
         calcProgram.getCondition != null)
   }
 
-  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+  override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
     val child = this.getInput
     val rowCnt = metadata.getRowCount(child)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
index 44d2d00..b1cf106 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
@@ -21,6 +21,8 @@ package org.apache.flink.table.plan.nodes.dataset
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.BatchTableEnvironment
 import org.apache.flink.table.plan.schema.DataSetTable
@@ -36,11 +38,17 @@ class DataSetScan(
     traitSet: RelTraitSet,
     table: RelOptTable,
     rowRelDataType: RelDataType)
-  extends BatchScan(cluster, traitSet, table) {
+  extends TableScan(cluster, traitSet, table)
+  with BatchScan {
 
   val dataSetTable: DataSetTable[Any] = getTable.unwrap(classOf[DataSetTable[Any]])
 
-  override def deriveRowType() = rowRelDataType
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+    val rowCnt = metadata.getRowCount(this)
+    planner.getCostFactory.makeCost(rowCnt, rowCnt, 0)
+  }
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataSetScan(
@@ -52,10 +60,8 @@ class DataSetScan(
   }
 
   override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
-
     val config = tableEnv.getConfig
     val inputDataSet: DataSet[Any] = dataSetTable.dataSet
-
     convertToInternalRow(inputDataSet, dataSetTable, config)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index 26778d7..b015a1d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -20,8 +20,9 @@ package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Calc
 import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rex.RexProgram
 import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.streaming.api.datastream.DataStream
@@ -40,36 +41,29 @@ class DataStreamCalc(
     traitSet: RelTraitSet,
     input: RelNode,
     rowRelDataType: RelDataType,
-    private[flink] val calcProgram: RexProgram,
+    calcProgram: RexProgram,
     ruleDescription: String)
-  extends SingleRel(cluster, traitSet, input)
+  extends Calc(cluster, traitSet, input, calcProgram)
   with CommonCalc
   with DataStreamRel {
 
-  override def deriveRowType() = rowRelDataType
+  override def deriveRowType(): RelDataType = rowRelDataType
 
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
-    new DataStreamCalc(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      getRowType,
-      calcProgram,
-      ruleDescription
-    )
+  override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = {
+    new DataStreamCalc(cluster, traitSet, child, getRowType, program, ruleDescription)
   }
 
   override def toString: String = calcToString(calcProgram, getExpressionString)
 
   override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw)
+    pw.input("input", getInput)
       .item("select", selectionToString(calcProgram, getExpressionString))
       .itemIf("where",
         conditionToString(calcProgram, getExpressionString),
         calcProgram.getCondition != null)
   }
 
-  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+  override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
     val child = this.getInput
     val rowCnt = metadata.getRowCount(child)
     computeSelfCost(calcProgram, planner, rowCnt)

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
index e8d218e..c187ae8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.plan.nodes.datastream
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.TableScan
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
 import org.apache.flink.table.plan.schema.DataStreamTable
@@ -36,11 +37,12 @@ class DataStreamScan(
     traitSet: RelTraitSet,
     table: RelOptTable,
     rowRelDataType: RelDataType)
-  extends StreamScan(cluster, traitSet, table) {
+  extends TableScan(cluster, traitSet, table)
+  with StreamScan {
 
   val dataStreamTable: DataStreamTable[Any] = getTable.unwrap(classOf[DataStreamTable[Any]])
 
-  override def deriveRowType() = rowRelDataType
+  override def deriveRowType(): RelDataType = rowRelDataType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataStreamScan(
@@ -52,10 +54,8 @@ class DataStreamScan(
   }
 
   override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
-
     val config = tableEnv.getConfig
     val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
-
     convertToInternalRow(inputDataStream, dataStreamTable, config)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
index 56f7f27..6d08302 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.plan.nodes.datastream
 
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.core.TableScan
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.FlinkTypeFactory
@@ -30,13 +28,7 @@ import org.apache.flink.types.Row
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
-abstract class StreamScan(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    table: RelOptTable)
-  extends TableScan(cluster, traitSet, table)
-  with CommonScan
-  with DataStreamRel {
+trait StreamScan extends CommonScan with DataStreamRel {
 
   protected def convertToInternalRow(
       input: DataStream[Any],

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index b808d8d..013c55f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -19,33 +19,25 @@
 package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{RelNode, RelWriter}
-import org.apache.calcite.rex.RexNode
-import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.plan.nodes.TableSourceScan
 import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.sources.{StreamTableSource, TableSource}
 import org.apache.flink.types.Row
-import org.apache.flink.table.sources.StreamTableSource
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment}
 
 /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
 class StreamTableSourceScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     table: RelOptTable,
-    val tableSource: StreamTableSource[_],
-  filterCondition: RexNode = null)
-  extends StreamScan(cluster, traitSet, table) {
-
-  override def deriveRowType() = {
-    val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-    flinkTypeFactory.buildRowDataType(
-      TableEnvironment.getFieldNames(tableSource),
-      TableEnvironment.getFieldTypes(tableSource.getReturnType))
-  }
+    tableSource: StreamTableSource[_])
+  extends TableSourceScan(cluster, traitSet, table, tableSource)
+  with StreamScan {
 
-  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+  override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
     val rowCnt = metadata.getRowCount(this)
     planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType))
   }
@@ -55,28 +47,22 @@ class StreamTableSourceScan(
       cluster,
       traitSet,
       getTable,
-      tableSource,
-      filterCondition
+      tableSource
     )
   }
 
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    val terms = super.explainTerms(pw)
-      .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", "))
-    if (filterCondition != null) {
-      import scala.collection.JavaConverters._
-      val fieldNames = getTable.getRowType.getFieldNames.asScala.toList
-      terms.item("filter", getExpressionString(filterCondition, fieldNames, None))
-    }
-    terms
+  override def copy(traitSet: RelTraitSet, newTableSource: TableSource[_]): TableSourceScan = {
+    new StreamTableSourceScan(
+      cluster,
+      traitSet,
+      getTable,
+      newTableSource.asInstanceOf[StreamTableSource[_]]
+    )
   }
 
   override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
-
     val config = tableEnv.getConfig
-    val inputDataStream: DataStream[Any] = tableSource
-      .getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
-
-    convertToInternalRow(inputDataStream, new TableSourceTable(tableSource, tableEnv), config)
+    val inputDataStream = tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
+    convertToInternalRow(inputDataStream, new TableSourceTable(tableSource), config)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala
new file mode 100644
index 0000000..b07f78e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.common
+
+import java.util
+
+import org.apache.calcite.plan.RelOptRuleCall
+import org.apache.calcite.rel.core.Calc
+import org.apache.calcite.rex.RexProgram
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.plan.nodes.TableSourceScan
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.plan.util.RexProgramExtractor
+import org.apache.flink.table.sources.FilterableTableSource
+import org.apache.flink.table.validate.FunctionCatalog
+import org.apache.flink.util.Preconditions
+
+import scala.collection.JavaConverters._
+
+trait PushFilterIntoTableSourceScanRuleBase {
+
+  private[flink] def pushFilterIntoScan(
+      call: RelOptRuleCall,
+      calc: Calc,
+      scan: TableSourceScan,
+      tableSourceTable: TableSourceTable[_],
+      filterableSource: FilterableTableSource[_],
+      description: String): Unit = {
+
+    Preconditions.checkArgument(!filterableSource.isFilterPushedDown)
+
+    val program = calc.getProgram
+    val functionCatalog = FunctionCatalog.withBuiltIns
+    val (predicates, unconvertedRexNodes) =
+      RexProgramExtractor.extractConjunctiveConditions(
+        program,
+        call.builder().getRexBuilder,
+        functionCatalog)
+    if (predicates.isEmpty) {
+      // no condition can be translated to expression
+      return
+    }
+
+    val remainingPredicates = new util.LinkedList[Expression]()
+    predicates.foreach(e => remainingPredicates.add(e))
+
+    val newTableSource = filterableSource.applyPredicate(remainingPredicates)
+
+    // check whether framework still need to do a filter
+    val relBuilder = call.builder()
+    val remainingCondition = {
+      if (!remainingPredicates.isEmpty || unconvertedRexNodes.nonEmpty) {
+        relBuilder.push(scan)
+        val remainingConditions =
+          (remainingPredicates.asScala.map(expr => expr.toRexNode(relBuilder))
+              ++ unconvertedRexNodes)
+        remainingConditions.reduce((l, r) => relBuilder.and(l, r))
+      } else {
+        null
+      }
+    }
+
+    // check whether we still need a RexProgram. An RexProgram is needed when either
+    // projection or filter exists.
+    val newScan = scan.copy(scan.getTraitSet, newTableSource)
+    val newRexProgram = {
+      if (remainingCondition != null || !program.projectsOnlyIdentity) {
+        val expandedProjectList = program.getProjectList.asScala
+            .map(ref => program.expandLocalRef(ref)).asJava
+        RexProgram.create(
+          program.getInputRowType,
+          expandedProjectList,
+          remainingCondition,
+          program.getOutputRowType,
+          relBuilder.getRexBuilder)
+      } else {
+        null
+      }
+    }
+
+    if (newRexProgram != null) {
+      val newCalc = calc.copy(calc.getTraitSet, newScan, newRexProgram)
+      call.transformTo(newCalc)
+    } else {
+      call.transformTo(newScan)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushProjectIntoTableSourceScanRuleBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushProjectIntoTableSourceScanRuleBase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushProjectIntoTableSourceScanRuleBase.scala
new file mode 100644
index 0000000..9f9c805
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushProjectIntoTableSourceScanRuleBase.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.common
+
+import org.apache.calcite.plan.RelOptRuleCall
+import org.apache.calcite.rel.core.Calc
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.plan.nodes.TableSourceScan
+import org.apache.flink.table.plan.util.{RexProgramExtractor, RexProgramRewriter}
+import org.apache.flink.table.sources.ProjectableTableSource
+
+trait PushProjectIntoTableSourceScanRuleBase {
+
+  private[flink] def pushProjectIntoScan(
+      call: RelOptRuleCall,
+      calc: Calc,
+      scan: TableSourceScan): Unit = {
+
+    val usedFields = RexProgramExtractor.extractRefInputFields(calc.getProgram)
+
+    // if no fields can be projected, we keep the original plan.
+    if (TableEnvironment.getFieldNames(scan.tableSource).length != usedFields.length) {
+      val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]]
+      val newTableSource = originTableSource.projectFields(usedFields)
+      val newScan = scan.copy(scan.getTraitSet, newTableSource)
+      val newCalcProgram = RexProgramRewriter.rewriteWithFieldProjection(
+        calc.getProgram,
+        newScan.getRowType,
+        calc.getCluster.getRexBuilder,
+        usedFields)
+
+      if (newCalcProgram.isTrivial) {
+        // drop calc if the transformed program merely returns its input and doesn't exist filter
+        call.transformTo(newScan)
+      } else {
+        val newCalc = calc.copy(calc.getTraitSet, newScan, newCalcProgram)
+        call.transformTo(newCalc)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala
index f95e34e..8cfd748 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala
@@ -20,23 +20,23 @@ package org.apache.flink.table.plan.rules.dataSet
 
 import org.apache.calcite.plan.RelOptRule._
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.rex.RexProgram
 import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc}
-import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._
+import org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase
 import org.apache.flink.table.plan.schema.TableSourceTable
 import org.apache.flink.table.sources.FilterableTableSource
 
 class PushFilterIntoBatchTableSourceScanRule extends RelOptRule(
   operand(classOf[DataSetCalc],
     operand(classOf[BatchTableSourceScan], none)),
-  "PushFilterIntoBatchTableSourceScanRule") {
+  "PushFilterIntoBatchTableSourceScanRule")
+  with PushFilterIntoTableSourceScanRuleBase {
 
-  override def matches(call: RelOptRuleCall) = {
+  override def matches(call: RelOptRuleCall): Boolean = {
     val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
     val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
     scan.tableSource match {
-      case _: FilterableTableSource =>
-        calc.calcProgram.getCondition != null
+      case source: FilterableTableSource[_] =>
+        calc.getProgram.getCondition != null && !source.isFilterPushedDown
       case _ => false
     }
   }
@@ -44,49 +44,9 @@ class PushFilterIntoBatchTableSourceScanRule extends RelOptRule(
   override def onMatch(call: RelOptRuleCall): Unit = {
     val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
     val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
-
-    val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource]
-
-    val program: RexProgram = calc.calcProgram
-    val tst = scan.getTable.unwrap(classOf[TableSourceTable[_]])
-    val predicate = extractPredicateExpressions(
-      program,
-      call.builder().getRexBuilder,
-      tst.tableEnv.getFunctionCatalog)
-
-    if (predicate.length != 0) {
-      val remainingPredicate = filterableSource.setPredicate(predicate)
-
-      if (verifyExpressions(predicate, remainingPredicate)) {
-
-        val filterRexNode = getFilterExpressionAsRexNode(
-          program.getInputRowType,
-          scan,
-          predicate.diff(remainingPredicate))(call.builder())
-
-        val newScan = new BatchTableSourceScan(
-          scan.getCluster,
-          scan.getTraitSet,
-          scan.getTable,
-          scan.tableSource,
-          filterRexNode)
-
-        val newCalcProgram = rewriteRexProgram(
-          program,
-          newScan,
-          remainingPredicate)(call.builder())
-
-        val newCalc = new DataSetCalc(
-          calc.getCluster,
-          calc.getTraitSet,
-          newScan,
-          calc.getRowType,
-          newCalcProgram,
-          description)
-
-        call.transformTo(newCalc)
-      }
-    }
+    val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable[_]])
+    val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource[_]]
+    pushFilterIntoScan(call, calc, scan, tableSourceTable, filterableSource, description)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
index 53f5fff..8c83047 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
@@ -18,22 +18,22 @@
 
 package org.apache.flink.table.plan.rules.dataSet
 
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.plan.RelOptRule.{none, operand}
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc}
-import org.apache.flink.table.plan.util.RexProgramProjectExtractor._
-import org.apache.flink.table.sources.{BatchTableSource, ProjectableTableSource}
+import org.apache.flink.table.plan.rules.common.PushProjectIntoTableSourceScanRuleBase
+import org.apache.flink.table.sources.ProjectableTableSource
 
 /**
   * This rule tries to push projections into a BatchTableSourceScan.
   */
 class PushProjectIntoBatchTableSourceScanRule extends RelOptRule(
   operand(classOf[DataSetCalc],
-          operand(classOf[BatchTableSourceScan], none)),
-  "PushProjectIntoBatchTableSourceScanRule") {
+    operand(classOf[BatchTableSourceScan], none)),
+  "PushProjectIntoBatchTableSourceScanRule")
+  with PushProjectIntoTableSourceScanRuleBase {
 
-  override def matches(call: RelOptRuleCall) = {
+  override def matches(call: RelOptRuleCall): Boolean = {
     val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
     scan.tableSource match {
       case _: ProjectableTableSource[_] => true
@@ -44,39 +44,7 @@ class PushProjectIntoBatchTableSourceScanRule extends RelOptRule(
   override def onMatch(call: RelOptRuleCall) {
     val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
     val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
-
-    val usedFields: Array[Int] = extractRefInputFields(calc.calcProgram)
-
-    // if no fields can be projected, we keep the original plan.
-    if (TableEnvironment.getFieldNames(scan.tableSource).length != usedFields.length) {
-      val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]]
-      val newTableSource = originTableSource.projectFields(usedFields)
-      val newScan = new BatchTableSourceScan(
-        scan.getCluster,
-        scan.getTraitSet,
-        scan.getTable,
-        newTableSource.asInstanceOf[BatchTableSource[_]])
-
-      val newCalcProgram = rewriteRexProgram(
-        calc.calcProgram,
-        newScan.getRowType,
-        usedFields,
-        calc.getCluster.getRexBuilder)
-
-      if (newCalcProgram.isTrivial) {
-        // drop calc if the transformed program merely returns its input and doesn't exist filter
-        call.transformTo(newScan)
-      } else {
-        val newCalc = new DataSetCalc(
-          calc.getCluster,
-          calc.getTraitSet,
-          newScan,
-          calc.getRowType,
-          newCalcProgram,
-          description)
-        call.transformTo(newCalc)
-      }
-    }
+    pushProjectIntoScan(call, calc, scan)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala
index 9c02dd7..53a3bcd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala
@@ -21,21 +21,22 @@ package org.apache.flink.table.plan.rules.datastream
 import org.apache.calcite.plan.RelOptRule._
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan}
-import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._
+import org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase
 import org.apache.flink.table.plan.schema.TableSourceTable
 import org.apache.flink.table.sources.FilterableTableSource
 
 class PushFilterIntoStreamTableSourceScanRule extends RelOptRule(
   operand(classOf[DataStreamCalc],
     operand(classOf[StreamTableSourceScan], none)),
-  "PushFilterIntoStreamTableSourceScanRule") {
+  "PushFilterIntoStreamTableSourceScanRule")
+  with PushFilterIntoTableSourceScanRuleBase {
 
-  override def matches(call: RelOptRuleCall) = {
+  override def matches(call: RelOptRuleCall): Boolean = {
     val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc]
     val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan]
     scan.tableSource match {
-      case _: FilterableTableSource =>
-        calc.calcProgram.getCondition != null
+      case source: FilterableTableSource[_] =>
+        calc.getProgram.getCondition != null && !source.isFilterPushedDown
       case _ => false
     }
   }
@@ -43,51 +44,10 @@ class PushFilterIntoStreamTableSourceScanRule extends RelOptRule(
   override def onMatch(call: RelOptRuleCall): Unit = {
     val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc]
     val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan]
-
-    val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource]
-
-    val program = calc.calcProgram
-    val tst = scan.getTable.unwrap(classOf[TableSourceTable[_]])
-    val predicates = extractPredicateExpressions(
-      program,
-      call.builder().getRexBuilder,
-      tst.tableEnv.getFunctionCatalog)
-
-    if (predicates.length != 0) {
-      val remainingPredicate = filterableSource.setPredicate(predicates)
-
-      if (verifyExpressions(predicates, remainingPredicate)) {
-
-        val filterRexNode = getFilterExpressionAsRexNode(
-          program.getInputRowType,
-          scan,
-          predicates.diff(remainingPredicate))(call.builder())
-
-        val newScan = new StreamTableSourceScan(
-          scan.getCluster,
-          scan.getTraitSet,
-          scan.getTable,
-          scan.tableSource,
-          filterRexNode)
-
-        val newCalcProgram = rewriteRexProgram(
-          program,
-          newScan,
-          remainingPredicate)(call.builder())
-
-        val newCalc = new DataStreamCalc(
-          calc.getCluster,
-          calc.getTraitSet,
-          newScan,
-          calc.getRowType,
-          newCalcProgram,
-          description)
-
-        call.transformTo(newCalc)
-      }
-    }
+    val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable[_]])
+    val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource[_]]
+    pushFilterIntoScan(call, calc, scan, tableSourceTable, filterableSource, description)
   }
-
 }
 
 object PushFilterIntoStreamTableSourceScanRule {

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
index 0c20f2a..903162e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
@@ -20,9 +20,8 @@ package org.apache.flink.table.plan.rules.datastream
 
 import org.apache.calcite.plan.RelOptRule._
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan}
-import org.apache.flink.table.plan.util.RexProgramProjectExtractor._
+import org.apache.flink.table.plan.rules.common.PushProjectIntoTableSourceScanRuleBase
 import org.apache.flink.table.sources.{ProjectableTableSource, StreamTableSource}
 
 /**
@@ -31,7 +30,8 @@ import org.apache.flink.table.sources.{ProjectableTableSource, StreamTableSource
 class PushProjectIntoStreamTableSourceScanRule extends RelOptRule(
   operand(classOf[DataStreamCalc],
     operand(classOf[StreamTableSourceScan], none())),
-  "PushProjectIntoStreamTableSourceScanRule") {
+  "PushProjectIntoStreamTableSourceScanRule")
+  with PushProjectIntoTableSourceScanRuleBase {
 
   /** Rule must only match if [[StreamTableSource]] targets a [[ProjectableTableSource]] */
   override def matches(call: RelOptRuleCall): Boolean = {
@@ -45,39 +45,7 @@ class PushProjectIntoStreamTableSourceScanRule extends RelOptRule(
   override def onMatch(call: RelOptRuleCall): Unit = {
     val calc = call.rel(0).asInstanceOf[DataStreamCalc]
     val scan = call.rel(1).asInstanceOf[StreamTableSourceScan]
-
-    val usedFields = extractRefInputFields(calc.calcProgram)
-
-    // if no fields can be projected, we keep the original plan
-    if (TableEnvironment.getFieldNames(scan.tableSource).length != usedFields.length) {
-      val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]]
-      val newTableSource = originTableSource.projectFields(usedFields)
-      val newScan = new StreamTableSourceScan(
-        scan.getCluster,
-        scan.getTraitSet,
-        scan.getTable,
-        newTableSource.asInstanceOf[StreamTableSource[_]])
-
-      val newProgram = rewriteRexProgram(
-        calc.calcProgram,
-        newScan.getRowType,
-        usedFields,
-        calc.getCluster.getRexBuilder)
-
-      if (newProgram.isTrivial) {
-        // drop calc if the transformed program merely returns its input and doesn't exist filter
-        call.transformTo(newScan)
-      } else {
-        val newCalc = new DataStreamCalc(
-          calc.getCluster,
-          calc.getTraitSet,
-          newScan,
-          calc.getRowType,
-          newProgram,
-          description)
-        call.transformTo(newCalc)
-      }
-    }
+    pushProjectIntoScan(call, calc, scan)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
index faf5efc..a3851e3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
@@ -25,7 +25,6 @@ import org.apache.flink.table.sources.TableSource
 /** Table which defines an external table via a [[TableSource]] */
 class TableSourceTable[T](
     val tableSource: TableSource[T],
-    val tableEnv: TableEnvironment,
     override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
   extends FlinkTable[T](
     typeInfo = tableSource.getReturnType,

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala
deleted file mode 100644
index 337b3de..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.util
-
-import org.apache.calcite.rel.core.TableScan
-import org.apache.calcite.rex._
-import org.apache.calcite.plan.RelOptUtil
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.calcite.{FlinkTypeFactory, RexNodeWrapper}
-import org.apache.flink.table.expressions._
-import org.apache.flink.table.validate.FunctionCatalog
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-import scala.collection.immutable.IndexedSeq
-
-object RexProgramExpressionExtractor {
-
-  /**
-    * converts a rexProgram condition into independent CNF expressions
-    *
-    * @param rexProgram The RexProgram to analyze
-    * @return converted expression
-    */
-  private[flink] def extractPredicateExpressions(
-      rexProgram: RexProgram,
-      rexBuilder: RexBuilder,
-      catalog: FunctionCatalog): Array[Expression] = {
-
-    val fieldNames = getInputsWithNames(rexProgram)
-
-    val condition = rexProgram.getCondition
-    if (condition == null) {
-      return Array.empty
-    }
-    val call = rexProgram.expandLocalRef(condition)
-    val cnf = RexUtil.toCnf(rexBuilder, call)
-    val conjunctions = RelOptUtil.conjunctions(cnf)
-    val expressions = conjunctions.asScala.map(
-      RexNodeWrapper.wrap(_, catalog).toExpression(fieldNames)
-    )
-    expressions.toArray
-  }
-
-  /**
-    * verify should we apply remained expressions on
-    *
-    * @param original initial expression
-    * @param remained remained part of original expression
-    * @return whether or not to decouple parts of the origin expression
-    */
-  private[flink] def verifyExpressions(
-      original: Array[Expression],
-      remained: Array[Expression]): Boolean =
-    remained forall (original contains)
-
-  /**
-    * Generates a new RexProgram based on new expression.
-    *
-    * @param rexProgram original RexProgram
-    * @param scan input source
-    * @param predicate filter condition (fields must be resolved)
-    * @param relBuilder builder for converting expression to Rex
-    */
-  private[flink] def rewriteRexProgram(
-      rexProgram: RexProgram,
-      scan: TableScan,
-      predicate: Array[Expression])(implicit relBuilder: RelBuilder): RexProgram = {
-
-    relBuilder.push(scan)
-
-    val inType = rexProgram.getInputRowType
-    val resolvedExps = resolveFields(predicate, inType)
-    val projs = rexProgram.getProjectList.map(rexProgram.expandLocalRef)
-
-    RexProgram.create(
-      inType,
-      projs,
-      conjunct(resolvedExps).get.toRexNode,
-      rexProgram.getOutputRowType,
-      relBuilder.getRexBuilder)
-  }
-
-  private[flink] def getFilterExpressionAsRexNode(
-      inputTpe: RelDataType,
-      scan: TableScan,
-      exps: Array[Expression])(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.push(scan)
-    val resolvedExps = resolveFields(exps, inputTpe)
-    val fullExp = conjunct(resolvedExps)
-    if (fullExp.isDefined) {
-      fullExp.get.toRexNode
-    } else {
-      null
-    }
-  }
-
-  private def resolveFields(
-      predicate: Array[Expression],
-      inType: RelDataType): Array[Expression] = {
-    val fieldTypes: Map[String, TypeInformation[_]] = inType.getFieldList
-      .map(f => f.getName -> FlinkTypeFactory.toTypeInfo(f.getType))
-      .toMap
-    val rule: PartialFunction[Expression, Expression] = {
-      case u@UnresolvedFieldReference(name) =>
-        ResolvedFieldReference(name, fieldTypes(name))
-    }
-    predicate.map(_.postOrderTransform(rule))
-  }
-
-  private def conjunct(exps: Array[Expression]): Option[Expression] = {
-    def overIndexes(): IndexedSeq[Expression] = {
-      for {
-        i <- exps.indices by 2
-      } yield {
-        if (i + 1 < exps.length) {
-          And(exps(i), exps(i + 1))
-        } else {
-          exps(i)
-        }
-      }
-    }
-    exps.length match {
-      case 0 =>
-        None
-      case 1 =>
-        Option(exps(0))
-      case _ =>
-        conjunct(overIndexes().toArray)
-    }
-  }
-
-  private def getInputsWithNames(rexProgram: RexProgram): Map[RexInputRef, String] = {
-    val names = rexProgram.getInputRowType.getFieldNames
-
-    val buffer = for {
-      exp <- rexProgram.getExprList.asScala
-      if exp.isInstanceOf[RexInputRef]
-      ref = exp.asInstanceOf[RexInputRef]
-    } yield {
-      ref -> names(ref.getIndex)
-    }
-    buffer.toMap
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
new file mode 100644
index 0000000..433a35b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference}
+import org.apache.flink.table.validate.FunctionCatalog
+import org.apache.flink.util.Preconditions
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+object RexProgramExtractor {
+
+  /**
+    * Extracts the indices of input fields which accessed by the RexProgram.
+    *
+    * @param rexProgram The RexProgram to analyze
+    * @return The indices of accessed input fields
+    */
+  def extractRefInputFields(rexProgram: RexProgram): Array[Int] = {
+    val visitor = new InputRefVisitor
+
+    // extract referenced input fields from projections
+    rexProgram.getProjectList.foreach(
+      exp => rexProgram.expandLocalRef(exp).accept(visitor))
+
+    // extract referenced input fields from condition
+    val condition = rexProgram.getCondition
+    if (condition != null) {
+      rexProgram.expandLocalRef(condition).accept(visitor)
+    }
+
+    visitor.getFields
+  }
+
+  /**
+    * Extract condition from RexProgram and convert it into independent CNF expressions.
+    *
+    * @param rexProgram The RexProgram to analyze
+    * @return converted expressions as well as RexNodes which cannot be translated
+    */
+  def extractConjunctiveConditions(
+      rexProgram: RexProgram,
+      rexBuilder: RexBuilder,
+      catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = {
+
+    rexProgram.getCondition match {
+      case condition: RexLocalRef =>
+        val expanded = rexProgram.expandLocalRef(condition)
+        // converts the expanded expression to conjunctive normal form,
+        // like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR c)"
+        val cnf = RexUtil.toCnf(rexBuilder, expanded)
+        // converts the cnf condition to a list of AND conditions
+        val conjunctions = RelOptUtil.conjunctions(cnf)
+
+        val convertedExpressions = new mutable.ArrayBuffer[Expression]
+        val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode]
+        val inputNames = rexProgram.getInputRowType.getFieldNames.asScala.toArray
+        val converter = new RexNodeToExpressionConverter(inputNames, catalog)
+
+        conjunctions.asScala.foreach(rex => {
+          rex.accept(converter) match {
+            case Some(expression) => convertedExpressions += expression
+            case None => unconvertedRexNodes += rex
+          }
+        })
+        (convertedExpressions.toArray, unconvertedRexNodes.toArray)
+
+      case _ => (Array.empty, Array.empty)
+    }
+  }
+}
+
+/**
+  * An RexVisitor to extract all referenced input fields
+  */
+class InputRefVisitor extends RexVisitorImpl[Unit](true) {
+
+  private val fields = mutable.LinkedHashSet[Int]()
+
+  def getFields: Array[Int] = fields.toArray
+
+  override def visitInputRef(inputRef: RexInputRef): Unit =
+    fields += inputRef.getIndex
+
+  override def visitCall(call: RexCall): Unit =
+    call.operands.foreach(operand => operand.accept(this))
+}
+
+/**
+  * An RexVisitor to convert RexNode to Expression.
+  *
+  * @param inputNames      The input names of the relation node
+  * @param functionCatalog The function catalog
+  */
+class RexNodeToExpressionConverter(
+    inputNames: Array[String],
+    functionCatalog: FunctionCatalog)
+    extends RexVisitor[Option[Expression]] {
+
+  override def visitInputRef(inputRef: RexInputRef): Option[Expression] = {
+    Preconditions.checkArgument(inputRef.getIndex < inputNames.length)
+    Some(ResolvedFieldReference(
+      inputNames(inputRef.getIndex),
+      FlinkTypeFactory.toTypeInfo(inputRef.getType)
+    ))
+  }
+
+  override def visitLocalRef(localRef: RexLocalRef): Option[Expression] = {
+    throw new TableException("Bug: RexLocalRef should have been expanded")
+  }
+
+  override def visitLiteral(literal: RexLiteral): Option[Expression] = {
+    Some(Literal(literal.getValue, FlinkTypeFactory.toTypeInfo(literal.getType)))
+  }
+
+  override def visitCall(call: RexCall): Option[Expression] = {
+    val operands = call.getOperands.map(
+      operand => operand.accept(this).orNull
+    )
+
+    // return null if we cannot translate all the operands of the call
+    if (operands.contains(null)) {
+      None
+    } else {
+        call.getOperator match {
+          case function: SqlFunction =>
+            lookupFunction(replace(function.getName), operands)
+          case postfix: SqlPostfixOperator =>
+            lookupFunction(replace(postfix.getName), operands)
+          case operator@_ =>
+            lookupFunction(replace(s"${operator.getKind}"), operands)
+      }
+    }
+  }
+
+  override def visitFieldAccess(fieldAccess: RexFieldAccess): Option[Expression] = None
+
+  override def visitCorrelVariable(correlVariable: RexCorrelVariable): Option[Expression] = None
+
+  override def visitRangeRef(rangeRef: RexRangeRef): Option[Expression] = None
+
+  override def visitSubQuery(subQuery: RexSubQuery): Option[Expression] = None
+
+  override def visitDynamicParam(dynamicParam: RexDynamicParam): Option[Expression] = None
+
+  override def visitOver(over: RexOver): Option[Expression] = None
+
+  private def lookupFunction(name: String, operands: Seq[Expression]): Option[Expression] = {
+    Try(functionCatalog.lookupFunction(name, operands)) match {
+      case Success(expr) => Some(expr)
+      case Failure(_) => None
+    }
+  }
+
+  private def replace(str: String): String = {
+    str.replaceAll("\\s|_", "")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractor.scala
deleted file mode 100644
index 1198167..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractor.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.util
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex._
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable
-import scala.collection.JavaConverters._
-
-object RexProgramProjectExtractor {
-
-  /**
-    * Extracts the indexes of input fields accessed by the RexProgram.
-    *
-    * @param rexProgram The RexProgram to analyze
-    * @return The indexes of accessed input fields
-    */
-  def extractRefInputFields(rexProgram: RexProgram): Array[Int] = {
-    val visitor = new RefFieldsVisitor
-    // extract input fields from project expressions
-    rexProgram.getProjectList.foreach(exp => rexProgram.expandLocalRef(exp).accept(visitor))
-    val condition = rexProgram.getCondition
-    // extract input fields from condition expression
-    if (condition != null) {
-      rexProgram.expandLocalRef(condition).accept(visitor)
-    }
-    visitor.getFields
-  }
-
-  /**
-    * Generates a new RexProgram based on mapped input fields.
-    *
-    * @param rexProgram      original RexProgram
-    * @param inputRowType    input row type
-    * @param usedInputFields indexes of used input fields
-    * @param rexBuilder      builder for Rex expressions
-    *
-    * @return A RexProgram with mapped input field expressions.
-    */
-  def rewriteRexProgram(
-      rexProgram: RexProgram,
-      inputRowType: RelDataType,
-      usedInputFields: Array[Int],
-      rexBuilder: RexBuilder): RexProgram = {
-
-    val inputRewriter = new InputRewriter(usedInputFields)
-    val newProjectExpressions = rexProgram.getProjectList.map(
-      exp => rexProgram.expandLocalRef(exp).accept(inputRewriter)
-    ).toList.asJava
-
-    val oldCondition = rexProgram.getCondition
-    val newConditionExpression = {
-      oldCondition match {
-        case ref: RexLocalRef => rexProgram.expandLocalRef(ref).accept(inputRewriter)
-        case _ => null // null does not match any type
-      }
-    }
-    RexProgram.create(
-      inputRowType,
-      newProjectExpressions,
-      newConditionExpression,
-      rexProgram.getOutputRowType,
-      rexBuilder
-    )
-  }
-}
-
-/**
-  * A RexVisitor to extract used input fields
-  */
-class RefFieldsVisitor extends RexVisitorImpl[Unit](true) {
-  private var fields = mutable.LinkedHashSet[Int]()
-
-  def getFields: Array[Int] = fields.toArray
-
-  override def visitInputRef(inputRef: RexInputRef): Unit = fields += inputRef.getIndex
-
-  override def visitCall(call: RexCall): Unit =
-    call.operands.foreach(operand => operand.accept(this))
-}
-
-/**
-  * A RexShuttle to rewrite field accesses of a RexProgram.
-  *
-  * @param fields fields mapping
-  */
-class InputRewriter(fields: Array[Int]) extends RexShuttle {
-
-  /** old input fields ref index -> new input fields ref index mappings */
-  private val fieldMap: Map[Int, Int] =
-    fields.zipWithIndex.toMap
-
-  override def visitInputRef(inputRef: RexInputRef): RexNode =
-    new RexInputRef(relNodeIndex(inputRef), inputRef.getType)
-
-  override def visitLocalRef(localRef: RexLocalRef): RexNode =
-    new RexInputRef(relNodeIndex(localRef), localRef.getType)
-
-  private def relNodeIndex(ref: RexSlot): Int =
-    fieldMap.getOrElse(ref.getIndex,
-      throw new IllegalArgumentException("input field contains invalid index"))
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramRewriter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramRewriter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramRewriter.scala
new file mode 100644
index 0000000..c8bbf2d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramRewriter.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex._
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+object RexProgramRewriter {
+
+  /**
+    * Generates a new RexProgram with used input fields. The used fields maybe
+    * a subset of total input fields, so we need to convert the field index in
+    * new RexProgram based on given fields.
+    *
+    * @param rexProgram   original RexProgram
+    * @param inputRowType input row type
+    * @param rexBuilder   builder for Rex expressions
+    * @param usedFields   indices of used input fields
+    * @return A new RexProgram with only used input fields
+    */
+  def rewriteWithFieldProjection(
+      rexProgram: RexProgram,
+      inputRowType: RelDataType,
+      rexBuilder: RexBuilder,
+      usedFields: Array[Int]): RexProgram = {
+
+    val inputRewriter = new InputRewriter(usedFields)
+
+    // rewrite input field in projections
+    val newProjectExpressions = rexProgram.getProjectList.map(
+      exp => rexProgram.expandLocalRef(exp).accept(inputRewriter)
+    ).toList.asJava
+
+    // rewrite input field in condition
+    val newConditionExpression = {
+      rexProgram.getCondition match {
+        case ref: RexLocalRef => rexProgram.expandLocalRef(ref).accept(inputRewriter)
+        case _ => null // null does not match any type
+      }
+    }
+
+    RexProgram.create(
+      inputRowType,
+      newProjectExpressions,
+      newConditionExpression,
+      rexProgram.getOutputRowType,
+      rexBuilder
+    )
+  }
+}
+
+/**
+  * A RexShuttle to rewrite field accesses of a RexProgram.
+  *
+  * @param fields used input fields
+  */
+class InputRewriter(fields: Array[Int]) extends RexShuttle {
+
+  /** old input fields ref index -> new input fields ref index mappings */
+  private val fieldMap: Map[Int, Int] =
+    fields.zipWithIndex.toMap
+
+  override def visitInputRef(inputRef: RexInputRef): RexNode =
+    new RexInputRef(refNewIndex(inputRef), inputRef.getType)
+
+  override def visitLocalRef(localRef: RexLocalRef): RexNode =
+    new RexInputRef(refNewIndex(localRef), localRef.getType)
+
+  private def refNewIndex(ref: RexSlot): Int =
+    fieldMap.getOrElse(ref.getIndex,
+      throw new IllegalArgumentException("input field contains invalid index"))
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala
index bbbf862..67529a7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala
@@ -18,21 +18,41 @@
 
 package org.apache.flink.table.sources
 
-import org.apache.flink.table.expressions.Expression
+import java.util.{List => JList}
 
+import org.apache.flink.table.expressions.Expression
 /**
   * Adds support for filtering push-down to a [[TableSource]].
-  * A [[TableSource]] extending this interface is able to filter the fields of the return table.
-  *
+  * A [[TableSource]] extending this interface is able to filter records before returning.
   */
-trait FilterableTableSource {
+trait FilterableTableSource[T] {
 
-  /** return an predicate expression that was set. */
-  def getPredicate: Array[Expression]
+  /**
+    * Check and pick all predicates this table source can support. The passed in predicates
+    * have been translated in conjunctive form, and table source can only pick those predicates
+    * that it supports.
+    * <p>
+    * After trying to push predicates down, we should return a new [[TableSource]]
+    * instance which holds all pushed down predicates. Even if we actually pushed nothing down,
+    * it is recommended that we still return a new [[TableSource]] instance since we will
+    * mark the returned instance as filter push down has been tried.
+    * <p>
+    * We also should note to not changing the form of the predicates passed in. It has been
+    * organized in CNF conjunctive form, and we should only take or leave each element from the
+    * list. Don't try to reorganize the predicates if you are absolutely confident with that.
+    *
+    * @param predicates A list contains conjunctive predicates, you should pick and remove all
+    *                   expressions that can be pushed down. The remaining elements of this list
+    *                   will further evaluated by framework.
+    * @return A new cloned instance of [[TableSource]] with or without any filters been
+    *         pushed into it.
+    */
+  def applyPredicate(predicates: JList[Expression]): TableSource[T]
 
   /**
-    * @param predicate a filter expression that will be applied to fields to return.
-    * @return an unsupported predicate expression.
+    * Return the flag to indicate whether filter push down has been tried. Must return true on
+    * the returned instance of [[applyPredicate]].
     */
-  def setPredicate(predicate: Array[Expression]): Array[Expression]
+  def isFilterPushedDown: Boolean
+
 }