You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2017/03/17 10:03:04 UTC

[2/4] flink git commit: [FLINK-3849] [table] Add FilterableTableSource interface and rules for pushing it (1)

[FLINK-3849] [table] Add FilterableTableSource interface and rules for pushing it (1)

fix filterable test

rebase and trying fix rexnode parsing

create wrapper and update rules


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f6cd2e7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f6cd2e7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f6cd2e7

Branch: refs/heads/master
Commit: 9f6cd2e76a44f194d96db7a218e5032791c5925c
Parents: ab014ef
Author: tonycox <an...@epam.com>
Authored: Wed Jan 11 13:15:49 2017 +0400
Committer: Kurt Young <ku...@apache.org>
Committed: Fri Mar 17 18:01:27 2017 +0800

----------------------------------------------------------------------
 .../flink/table/api/BatchTableEnvironment.scala |   2 +-
 .../table/api/StreamTableEnvironment.scala      |   2 +-
 .../flink/table/api/TableEnvironment.scala      |  12 +
 .../flink/table/calcite/RexNodeWrapper.scala    | 106 +++++++
 .../table/expressions/stringExpressions.scala   |   2 +-
 .../flink/table/plan/nodes/CommonCalc.scala     |  34 ++-
 .../nodes/dataset/BatchTableSourceScan.scala    |  20 +-
 .../table/plan/nodes/dataset/DataSetCalc.scala  |  24 +-
 .../plan/nodes/datastream/DataStreamCalc.scala  |  15 +-
 .../datastream/StreamTableSourceScan.scala      |  24 +-
 .../flink/table/plan/rules/FlinkRuleSets.scala  |  11 +-
 ...PushFilterIntoBatchTableSourceScanRule.scala |  95 ++++++
 ...ushProjectIntoBatchTableSourceScanRule.scala |   2 +-
 ...ushFilterIntoStreamTableSourceScanRule.scala |  95 ++++++
 ...shProjectIntoStreamTableSourceScanRule.scala |   2 +-
 .../rules/util/RexProgramProjectExtractor.scala | 120 --------
 .../table/plan/schema/TableSourceTable.scala    |   1 +
 .../util/RexProgramExpressionExtractor.scala    | 163 ++++++++++
 .../plan/util/RexProgramProjectExtractor.scala  | 120 ++++++++
 .../table/sources/FilterableTableSource.scala   |  38 +++
 .../flink/table/sources/TableSource.scala       |   1 -
 .../flink/table/validate/FunctionCatalog.scala  |  24 ++
 .../flink/table/TableEnvironmentTest.scala      |  25 +-
 .../apache/flink/table/TableSourceTest.scala    | 300 +++++++++++++++++++
 .../api/scala/batch/TableSourceITCase.scala     |  16 +
 .../table/api/scala/batch/TableSourceTest.scala | 209 -------------
 .../api/scala/stream/TableSourceITCase.scala    |  19 ++
 .../util/RexProgramProjectExtractorTest.scala   | 121 --------
 .../RexProgramExpressionExtractorTest.scala     | 182 +++++++++++
 .../util/RexProgramProjectExtractorTest.scala   | 121 ++++++++
 .../flink/table/utils/CommonTestData.scala      | 128 +++++++-
 31 files changed, 1511 insertions(+), 523 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index b48e9f9..7f27357 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -95,7 +95,7 @@ abstract class BatchTableEnvironment(
 
     tableSource match {
       case batchTableSource: BatchTableSource[_] =>
-        registerTableInternal(name, new TableSourceTable(batchTableSource))
+        registerTableInternal(name, new TableSourceTable(batchTableSource, this))
       case _ =>
         throw new TableException("Only BatchTableSource can be registered in " +
             "BatchTableEnvironment")

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index d927c3a..7e9f38f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -136,7 +136,7 @@ abstract class StreamTableEnvironment(
 
     tableSource match {
       case streamTableSource: StreamTableSource[_] =>
-        registerTableInternal(name, new TableSourceTable(streamTableSource))
+        registerTableInternal(name, new TableSourceTable(streamTableSource, this))
       case _ =>
         throw new TableException("Only StreamTableSource can be registered in " +
             "StreamTableEnvironment")

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 1dda3a8..291f49f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -842,6 +842,18 @@ object TableEnvironment {
   }
 
   /**
+    * Returns field types for a given [[TableSource]].
+    *
+    * @param tableSource The TableSource to extract field types from.
+    * @tparam A The type of the TableSource.
+    * @return An array holding the field types.
+    */
+  def getFieldTypes[A](tableSource: TableSource[A]): Array[TypeInformation[_]] = {
+    val returnType = tableSource.getReturnType
+    TableEnvironment.getFieldTypes(returnType)
+  }
+
+  /**
     * Returns field names for a given [[TableSource]].
     *
     * @param tableSource The TableSource to extract field names from.

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RexNodeWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RexNodeWrapper.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RexNodeWrapper.scala
new file mode 100644
index 0000000..1926a67
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RexNodeWrapper.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.calcite
+
+import org.apache.calcite.rex._
+import org.apache.calcite.sql._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference}
+import org.apache.flink.table.validate.FunctionCatalog
+import org.apache.flink.table.calcite.RexNodeWrapper._
+
+abstract class RexNodeWrapper(rex: RexNode) {
+  def get: RexNode = rex
+  def toExpression(names: Map[RexInputRef, String]): Expression
+}
+
+case class RexLiteralWrapper(literal: RexLiteral) extends RexNodeWrapper(literal) {
+  override def toExpression(names: Map[RexInputRef, String]): Expression = {
+    val typeInfo = FlinkTypeFactory.toTypeInfo(literal.getType)
+    Literal(literal.getValue, typeInfo)
+  }
+}
+
+case class RexInputWrapper(input: RexInputRef) extends RexNodeWrapper(input) {
+  override def toExpression(names: Map[RexInputRef, String]): Expression = {
+    val typeInfo = FlinkTypeFactory.toTypeInfo(input.getType)
+    ResolvedFieldReference(names(input), typeInfo)
+  }
+}
+
+case class RexCallWrapper(
+    call: RexCall,
+    operands: Seq[RexNodeWrapper]) extends RexNodeWrapper(call) {
+
+  override def toExpression(names: Map[RexInputRef, String]): Expression = {
+    val ops = operands.map(_.toExpression(names))
+    call.op match {
+      case function: SqlFunction =>
+        lookupFunction(replace(function.getName), ops)
+      case postfix: SqlPostfixOperator =>
+        lookupFunction(replace(postfix.getName), ops)
+      case operator@_ =>
+        val name = replace(s"${operator.kind}")
+        lookupFunction(name, ops)
+    }
+  }
+
+  def replace(str: String): String = {
+    str.replaceAll("\\s|_", "")
+  }
+}
+
+object RexNodeWrapper {
+
+  private var catalog: Option[FunctionCatalog] = None
+
+  def wrap(rex: RexNode, functionCatalog: FunctionCatalog): RexNodeWrapper = {
+    catalog = Option(functionCatalog)
+    rex.accept(new WrapperVisitor)
+  }
+
+  private[table] def lookupFunction(name: String, operands: Seq[Expression]): Expression = {
+    catalog.getOrElse(throw TableException("FunctionCatalog was not defined"))
+      .lookupFunction(name, operands)
+  }
+}
+
+class WrapperVisitor extends RexVisitorImpl[RexNodeWrapper](true) {
+
+  override def visitInputRef(inputRef: RexInputRef): RexNodeWrapper = {
+    RexInputWrapper(inputRef)
+  }
+
+  override def visitLiteral(literal: RexLiteral): RexNodeWrapper = {
+    RexLiteralWrapper(literal)
+  }
+
+  override def visitLocalRef(localRef: RexLocalRef): RexNodeWrapper = {
+    localRef.accept(this)
+  }
+
+  override def visitCall(call: RexCall): RexNodeWrapper = {
+    val operands = for {
+      x <- 0 until call.operands.size()
+    } yield {
+      call.operands.get(x).accept(this)
+    }
+    RexCallWrapper(call, operands)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
index f4b58cc..e8ae0d8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
@@ -111,7 +111,7 @@ case class Lower(child: Expression) extends UnaryExpression {
     }
   }
 
-  override def toString: String = s"($child).toLowerCase()"
+  override def toString: String = s"($child).lowerCase()"
 
   override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
     relBuilder.call(SqlStdOperatorTable.LOWER, child.toRexNode)

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
index 3f46258..8b07aac 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
@@ -18,8 +18,9 @@
 
 package org.apache.flink.table.plan.nodes
 
+import org.apache.calcite.plan.{RelOptCost, RelOptPlanner}
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex.{RexNode, RexProgram}
+import org.apache.calcite.rex._
 import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.TableConfig
@@ -149,4 +150,35 @@ trait CommonCalc {
     val name = calcOpName(calcProgram, expression)
     s"Calc($name)"
   }
+
+  private[flink] def computeSelfCost(
+      calcProgram: RexProgram,
+      planner: RelOptPlanner,
+      rowCnt: Double): RelOptCost = {
+
+    // compute number of expressions that do not access a field or literal, i.e. computations,
+    // conditions, etc. We only want to account for computations, not for simple projections.
+    // CASTs in RexProgram are reduced as far as possible by ReduceExpressionsRule
+    // in normalization stage. So we should ignore CASTs here in optimization stage.
+    val compCnt = calcProgram.getExprList.asScala.toList.count {
+      case i: RexInputRef => false
+      case l: RexLiteral => false
+      case c: RexCall if c.getOperator.getName.equals("CAST") => false
+      case _ => true
+    }
+
+    planner.getCostFactory.makeCost(rowCnt, rowCnt * compCnt, 0)
+  }
+
+  private[flink] def estimateRowCount(
+      calcProgram: RexProgram,
+      rowCnt: Double): Double = {
+
+    if (calcProgram.getCondition != null) {
+      // we reduce the result card to push filters down
+      (rowCnt * 0.75).min(1.0)
+    } else {
+      rowCnt
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
index 9b8e1ea..11f595c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
@@ -21,20 +21,21 @@ package org.apache.flink.table.plan.nodes.dataset
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.calcite.rex.RexNode
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.{BatchTableEnvironment, TableEnvironment}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.schema.TableSourceTable
-import org.apache.flink.table.sources.BatchTableSource
 import org.apache.flink.types.Row
+import org.apache.flink.table.sources.BatchTableSource
 
 /** Flink RelNode to read data from an external source defined by a [[BatchTableSource]]. */
 class BatchTableSourceScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     table: RelOptTable,
-    val tableSource: BatchTableSource[_])
+    val tableSource: BatchTableSource[_],
+    filterCondition: RexNode = null)
   extends BatchScan(cluster, traitSet, table) {
 
   override def deriveRowType() = {
@@ -54,13 +55,20 @@ class BatchTableSourceScan(
       cluster,
       traitSet,
       getTable,
-      tableSource
+      tableSource,
+      filterCondition
     )
   }
 
   override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw)
+    val terms = super.explainTerms(pw)
       .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", "))
+      if (filterCondition != null) {
+        import scala.collection.JavaConverters._
+        val fieldNames = getTable.getRowType.getFieldNames.asScala.toList
+        terms.item("filter", getExpressionString(filterCondition, fieldNames, None))
+      }
+    terms
   }
 
   override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
@@ -68,6 +76,6 @@ class BatchTableSourceScan(
     val config = tableEnv.getConfig
     val inputDataSet = tableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]]
 
-    convertToInternalRow(inputDataSet, new TableSourceTable(tableSource), config)
+    convertToInternalRow(inputDataSet, new TableSourceTable(tableSource, tableEnv), config)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
index 9b3ff63..972e45b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
@@ -31,8 +31,6 @@ import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.plan.nodes.CommonCalc
 import org.apache.flink.types.Row
 
-import scala.collection.JavaConverters._
-
 /**
   * Flink RelNode which matches along with LogicalCalc.
   *
@@ -71,34 +69,17 @@ class DataSetCalc(
   }
 
   override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
-
     val child = this.getInput
     val rowCnt = metadata.getRowCount(child)
 
-    // compute number of expressions that do not access a field or literal, i.e. computations,
-    //   conditions, etc. We only want to account for computations, not for simple projections.
-    // CASTs in RexProgram are reduced as far as possible by ReduceExpressionsRule
-    // in normalization stage. So we should ignore CASTs here in optimization stage.
-    val compCnt = calcProgram.getExprList.asScala.toList.count {
-      case i: RexInputRef => false
-      case l: RexLiteral => false
-      case c: RexCall if c.getOperator.getName.equals("CAST") => false
-      case _ => true
-    }
-
-    planner.getCostFactory.makeCost(rowCnt, rowCnt * compCnt, 0)
+    computeSelfCost(calcProgram, planner, rowCnt)
   }
 
   override def estimateRowCount(metadata: RelMetadataQuery): Double = {
     val child = this.getInput
     val rowCnt = metadata.getRowCount(child)
 
-    if (calcProgram.getCondition != null) {
-      // we reduce the result card to push filters down
-      (rowCnt * 0.75).min(1.0)
-    } else {
-      rowCnt
-    }
+    estimateRowCount(calcProgram, rowCnt)
   }
 
   override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
@@ -127,5 +108,4 @@ class DataSetCalc(
     val mapFunc = calcMapFunction(genFunction)
     inputDS.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index b39ae4a..26778d7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -18,8 +18,9 @@
 
 package org.apache.flink.table.plan.nodes.datastream
 
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.calcite.rex.RexProgram
 import org.apache.flink.api.common.functions.FlatMapFunction
@@ -68,6 +69,18 @@ class DataStreamCalc(
         calcProgram.getCondition != null)
   }
 
+  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+    val child = this.getInput
+    val rowCnt = metadata.getRowCount(child)
+    computeSelfCost(calcProgram, planner, rowCnt)
+  }
+
+  override def estimateRowCount(metadata: RelMetadataQuery): Double = {
+    val child = this.getInput
+    val rowCnt = metadata.getRowCount(child)
+    estimateRowCount(calcProgram, rowCnt)
+  }
+
   override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
 
     val config = tableEnv.getConfig

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 73d0291..b808d8d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -21,19 +21,21 @@ package org.apache.flink.table.plan.nodes.datastream
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter}
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment}
+import org.apache.calcite.rex.RexNode
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.schema.TableSourceTable
-import org.apache.flink.table.sources.StreamTableSource
 import org.apache.flink.types.Row
+import org.apache.flink.table.sources.StreamTableSource
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment}
 
 /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
 class StreamTableSourceScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     table: RelOptTable,
-    val tableSource: StreamTableSource[_])
+    val tableSource: StreamTableSource[_],
+  filterCondition: RexNode = null)
   extends StreamScan(cluster, traitSet, table) {
 
   override def deriveRowType() = {
@@ -53,13 +55,20 @@ class StreamTableSourceScan(
       cluster,
       traitSet,
       getTable,
-      tableSource
+      tableSource,
+      filterCondition
     )
   }
 
   override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw)
+    val terms = super.explainTerms(pw)
       .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", "))
+    if (filterCondition != null) {
+      import scala.collection.JavaConverters._
+      val fieldNames = getTable.getRowType.getFieldNames.asScala.toList
+      terms.item("filter", getExpressionString(filterCondition, fieldNames, None))
+    }
+    terms
   }
 
   override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
@@ -68,7 +77,6 @@ class StreamTableSourceScan(
     val inputDataStream: DataStream[Any] = tableSource
       .getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
 
-    convertToInternalRow(inputDataStream, new TableSourceTable(tableSource), config)
+    convertToInternalRow(inputDataStream, new TableSourceTable(tableSource, tableEnv), config)
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 3b20236..952ee34 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -122,8 +122,10 @@ object FlinkRuleSets {
     DataSetValuesRule.INSTANCE,
     DataSetCorrelateRule.INSTANCE,
     BatchTableSourceScanRule.INSTANCE,
-    // project pushdown optimization
-    PushProjectIntoBatchTableSourceScanRule.INSTANCE
+
+    // scan optimization
+    PushProjectIntoBatchTableSourceScanRule.INSTANCE,
+    PushFilterIntoBatchTableSourceScanRule.INSTANCE
   )
 
   /**
@@ -178,7 +180,10 @@ object FlinkRuleSets {
       DataStreamValuesRule.INSTANCE,
       DataStreamCorrelateRule.INSTANCE,
       StreamTableSourceScanRule.INSTANCE,
-      PushProjectIntoStreamTableSourceScanRule.INSTANCE
+
+      //  scan optimization
+      PushProjectIntoStreamTableSourceScanRule.INSTANCE,
+      PushFilterIntoStreamTableSourceScanRule.INSTANCE
   )
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala
new file mode 100644
index 0000000..f95e34e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.RexProgram
+import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc}
+import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.sources.FilterableTableSource
+
+class PushFilterIntoBatchTableSourceScanRule extends RelOptRule(
+  operand(classOf[DataSetCalc],
+    operand(classOf[BatchTableSourceScan], none)),
+  "PushFilterIntoBatchTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall) = {
+    val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
+    val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
+    scan.tableSource match {
+      case _: FilterableTableSource =>
+        calc.calcProgram.getCondition != null
+      case _ => false
+    }
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
+    val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
+
+    val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource]
+
+    val program: RexProgram = calc.calcProgram
+    val tst = scan.getTable.unwrap(classOf[TableSourceTable[_]])
+    val predicate = extractPredicateExpressions(
+      program,
+      call.builder().getRexBuilder,
+      tst.tableEnv.getFunctionCatalog)
+
+    if (predicate.length != 0) {
+      val remainingPredicate = filterableSource.setPredicate(predicate)
+
+      if (verifyExpressions(predicate, remainingPredicate)) {
+
+        val filterRexNode = getFilterExpressionAsRexNode(
+          program.getInputRowType,
+          scan,
+          predicate.diff(remainingPredicate))(call.builder())
+
+        val newScan = new BatchTableSourceScan(
+          scan.getCluster,
+          scan.getTraitSet,
+          scan.getTable,
+          scan.tableSource,
+          filterRexNode)
+
+        val newCalcProgram = rewriteRexProgram(
+          program,
+          newScan,
+          remainingPredicate)(call.builder())
+
+        val newCalc = new DataSetCalc(
+          calc.getCluster,
+          calc.getTraitSet,
+          newScan,
+          calc.getRowType,
+          newCalcProgram,
+          description)
+
+        call.transformTo(newCalc)
+      }
+    }
+  }
+}
+
+object PushFilterIntoBatchTableSourceScanRule {
+  val INSTANCE: RelOptRule = new PushFilterIntoBatchTableSourceScanRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
index 70639b7..53f5fff 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
@@ -22,7 +22,7 @@ import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.plan.RelOptRule.{none, operand}
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc}
-import org.apache.flink.table.plan.rules.util.RexProgramProjectExtractor._
+import org.apache.flink.table.plan.util.RexProgramProjectExtractor._
 import org.apache.flink.table.sources.{BatchTableSource, ProjectableTableSource}
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala
new file mode 100644
index 0000000..9c02dd7
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan}
+import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.sources.FilterableTableSource
+
+class PushFilterIntoStreamTableSourceScanRule extends RelOptRule(
+  operand(classOf[DataStreamCalc],
+    operand(classOf[StreamTableSourceScan], none)),
+  "PushFilterIntoStreamTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall) = {
+    val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc]
+    val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan]
+    scan.tableSource match {
+      case _: FilterableTableSource =>
+        calc.calcProgram.getCondition != null
+      case _ => false
+    }
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc]
+    val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan]
+
+    val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource]
+
+    val program = calc.calcProgram
+    val tst = scan.getTable.unwrap(classOf[TableSourceTable[_]])
+    val predicates = extractPredicateExpressions(
+      program,
+      call.builder().getRexBuilder,
+      tst.tableEnv.getFunctionCatalog)
+
+    if (predicates.length != 0) {
+      val remainingPredicate = filterableSource.setPredicate(predicates)
+
+      if (verifyExpressions(predicates, remainingPredicate)) {
+
+        val filterRexNode = getFilterExpressionAsRexNode(
+          program.getInputRowType,
+          scan,
+          predicates.diff(remainingPredicate))(call.builder())
+
+        val newScan = new StreamTableSourceScan(
+          scan.getCluster,
+          scan.getTraitSet,
+          scan.getTable,
+          scan.tableSource,
+          filterRexNode)
+
+        val newCalcProgram = rewriteRexProgram(
+          program,
+          newScan,
+          remainingPredicate)(call.builder())
+
+        val newCalc = new DataStreamCalc(
+          calc.getCluster,
+          calc.getTraitSet,
+          newScan,
+          calc.getRowType,
+          newCalcProgram,
+          description)
+
+        call.transformTo(newCalc)
+      }
+    }
+  }
+
+}
+
+object PushFilterIntoStreamTableSourceScanRule {
+  val INSTANCE: RelOptRule = new PushFilterIntoStreamTableSourceScanRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
index a6d4b82..0c20f2a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
@@ -22,7 +22,7 @@ import org.apache.calcite.plan.RelOptRule._
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan}
-import org.apache.flink.table.plan.rules.util.RexProgramProjectExtractor._
+import org.apache.flink.table.plan.util.RexProgramProjectExtractor._
 import org.apache.flink.table.sources.{ProjectableTableSource, StreamTableSource}
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractor.scala
deleted file mode 100644
index 129cfd1..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractor.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.rules.util
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex._
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable
-import scala.collection.JavaConverters._
-
-object RexProgramProjectExtractor {
-
-  /**
-    * Extracts the indexes of input fields accessed by the RexProgram.
-    *
-    * @param rexProgram The RexProgram to analyze
-    * @return The indexes of accessed input fields
-    */
-  def extractRefInputFields(rexProgram: RexProgram): Array[Int] = {
-    val visitor = new RefFieldsVisitor
-    // extract input fields from project expressions
-    rexProgram.getProjectList.foreach(exp => rexProgram.expandLocalRef(exp).accept(visitor))
-    val condition = rexProgram.getCondition
-    // extract input fields from condition expression
-    if (condition != null) {
-      rexProgram.expandLocalRef(condition).accept(visitor)
-    }
-    visitor.getFields
-  }
-
-  /**
-    * Generates a new RexProgram based on mapped input fields.
-    *
-    * @param rexProgram      original RexProgram
-    * @param inputRowType    input row type
-    * @param usedInputFields indexes of used input fields
-    * @param rexBuilder      builder for Rex expressions
-    *
-    * @return A RexProgram with mapped input field expressions.
-    */
-  def rewriteRexProgram(
-      rexProgram: RexProgram,
-      inputRowType: RelDataType,
-      usedInputFields: Array[Int],
-      rexBuilder: RexBuilder): RexProgram = {
-
-    val inputRewriter = new InputRewriter(usedInputFields)
-    val newProjectExpressions = rexProgram.getProjectList.map(
-      exp => rexProgram.expandLocalRef(exp).accept(inputRewriter)
-    ).toList.asJava
-
-    val oldCondition = rexProgram.getCondition
-    val newConditionExpression = {
-      oldCondition match {
-        case ref: RexLocalRef => rexProgram.expandLocalRef(ref).accept(inputRewriter)
-        case _ => null // null does not match any type
-      }
-    }
-    RexProgram.create(
-      inputRowType,
-      newProjectExpressions,
-      newConditionExpression,
-      rexProgram.getOutputRowType,
-      rexBuilder
-    )
-  }
-}
-
-/**
-  * A RexVisitor to extract used input fields
-  */
-class RefFieldsVisitor extends RexVisitorImpl[Unit](true) {
-  private var fields = mutable.LinkedHashSet[Int]()
-
-  def getFields: Array[Int] = fields.toArray
-
-  override def visitInputRef(inputRef: RexInputRef): Unit = fields += inputRef.getIndex
-
-  override def visitCall(call: RexCall): Unit =
-    call.operands.foreach(operand => operand.accept(this))
-}
-
-/**
-  * A RexShuttle to rewrite field accesses of a RexProgram.
-  *
-  * @param fields fields mapping
-  */
-class InputRewriter(fields: Array[Int]) extends RexShuttle {
-
-  /** old input fields ref index -> new input fields ref index mappings */
-  private val fieldMap: Map[Int, Int] =
-    fields.zipWithIndex.toMap
-
-  override def visitInputRef(inputRef: RexInputRef): RexNode =
-    new RexInputRef(relNodeIndex(inputRef), inputRef.getType)
-
-  override def visitLocalRef(localRef: RexLocalRef): RexNode =
-    new RexInputRef(relNodeIndex(localRef), localRef.getType)
-
-  private def relNodeIndex(ref: RexSlot): Int =
-    fieldMap.getOrElse(ref.getIndex,
-      throw new IllegalArgumentException("input field contains invalid index"))
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
index a3851e3..faf5efc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.sources.TableSource
 /** Table which defines an external table via a [[TableSource]] */
 class TableSourceTable[T](
     val tableSource: TableSource[T],
+    val tableEnv: TableEnvironment,
     override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
   extends FlinkTable[T](
     typeInfo = tableSource.getReturnType,

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala
new file mode 100644
index 0000000..337b3de
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rex._
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.calcite.{FlinkTypeFactory, RexNodeWrapper}
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.validate.FunctionCatalog
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.immutable.IndexedSeq
+
+object RexProgramExpressionExtractor {
+
+  /**
+    * converts a rexProgram condition into independent CNF expressions
+    *
+    * @param rexProgram The RexProgram to analyze
+    * @return converted expression
+    */
+  private[flink] def extractPredicateExpressions(
+      rexProgram: RexProgram,
+      rexBuilder: RexBuilder,
+      catalog: FunctionCatalog): Array[Expression] = {
+
+    val fieldNames = getInputsWithNames(rexProgram)
+
+    val condition = rexProgram.getCondition
+    if (condition == null) {
+      return Array.empty
+    }
+    val call = rexProgram.expandLocalRef(condition)
+    val cnf = RexUtil.toCnf(rexBuilder, call)
+    val conjunctions = RelOptUtil.conjunctions(cnf)
+    val expressions = conjunctions.asScala.map(
+      RexNodeWrapper.wrap(_, catalog).toExpression(fieldNames)
+    )
+    expressions.toArray
+  }
+
+  /**
+    * verify should we apply remained expressions on
+    *
+    * @param original initial expression
+    * @param remained remained part of original expression
+    * @return whether or not to decouple parts of the origin expression
+    */
+  private[flink] def verifyExpressions(
+      original: Array[Expression],
+      remained: Array[Expression]): Boolean =
+    remained forall (original contains)
+
+  /**
+    * Generates a new RexProgram based on new expression.
+    *
+    * @param rexProgram original RexProgram
+    * @param scan input source
+    * @param predicate filter condition (fields must be resolved)
+    * @param relBuilder builder for converting expression to Rex
+    */
+  private[flink] def rewriteRexProgram(
+      rexProgram: RexProgram,
+      scan: TableScan,
+      predicate: Array[Expression])(implicit relBuilder: RelBuilder): RexProgram = {
+
+    relBuilder.push(scan)
+
+    val inType = rexProgram.getInputRowType
+    val resolvedExps = resolveFields(predicate, inType)
+    val projs = rexProgram.getProjectList.map(rexProgram.expandLocalRef)
+
+    RexProgram.create(
+      inType,
+      projs,
+      conjunct(resolvedExps).get.toRexNode,
+      rexProgram.getOutputRowType,
+      relBuilder.getRexBuilder)
+  }
+
+  private[flink] def getFilterExpressionAsRexNode(
+      inputTpe: RelDataType,
+      scan: TableScan,
+      exps: Array[Expression])(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.push(scan)
+    val resolvedExps = resolveFields(exps, inputTpe)
+    val fullExp = conjunct(resolvedExps)
+    if (fullExp.isDefined) {
+      fullExp.get.toRexNode
+    } else {
+      null
+    }
+  }
+
+  private def resolveFields(
+      predicate: Array[Expression],
+      inType: RelDataType): Array[Expression] = {
+    val fieldTypes: Map[String, TypeInformation[_]] = inType.getFieldList
+      .map(f => f.getName -> FlinkTypeFactory.toTypeInfo(f.getType))
+      .toMap
+    val rule: PartialFunction[Expression, Expression] = {
+      case u@UnresolvedFieldReference(name) =>
+        ResolvedFieldReference(name, fieldTypes(name))
+    }
+    predicate.map(_.postOrderTransform(rule))
+  }
+
+  private def conjunct(exps: Array[Expression]): Option[Expression] = {
+    def overIndexes(): IndexedSeq[Expression] = {
+      for {
+        i <- exps.indices by 2
+      } yield {
+        if (i + 1 < exps.length) {
+          And(exps(i), exps(i + 1))
+        } else {
+          exps(i)
+        }
+      }
+    }
+    exps.length match {
+      case 0 =>
+        None
+      case 1 =>
+        Option(exps(0))
+      case _ =>
+        conjunct(overIndexes().toArray)
+    }
+  }
+
+  private def getInputsWithNames(rexProgram: RexProgram): Map[RexInputRef, String] = {
+    val names = rexProgram.getInputRowType.getFieldNames
+
+    val buffer = for {
+      exp <- rexProgram.getExprList.asScala
+      if exp.isInstanceOf[RexInputRef]
+      ref = exp.asInstanceOf[RexInputRef]
+    } yield {
+      ref -> names(ref.getIndex)
+    }
+    buffer.toMap
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractor.scala
new file mode 100644
index 0000000..1198167
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractor.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex._
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+object RexProgramProjectExtractor {
+
+  /**
+    * Extracts the indexes of input fields accessed by the RexProgram.
+    *
+    * @param rexProgram The RexProgram to analyze
+    * @return The indexes of accessed input fields
+    */
+  def extractRefInputFields(rexProgram: RexProgram): Array[Int] = {
+    val visitor = new RefFieldsVisitor
+    // extract input fields from project expressions
+    rexProgram.getProjectList.foreach(exp => rexProgram.expandLocalRef(exp).accept(visitor))
+    val condition = rexProgram.getCondition
+    // extract input fields from condition expression
+    if (condition != null) {
+      rexProgram.expandLocalRef(condition).accept(visitor)
+    }
+    visitor.getFields
+  }
+
+  /**
+    * Generates a new RexProgram based on mapped input fields.
+    *
+    * @param rexProgram      original RexProgram
+    * @param inputRowType    input row type
+    * @param usedInputFields indexes of used input fields
+    * @param rexBuilder      builder for Rex expressions
+    *
+    * @return A RexProgram with mapped input field expressions.
+    */
+  def rewriteRexProgram(
+      rexProgram: RexProgram,
+      inputRowType: RelDataType,
+      usedInputFields: Array[Int],
+      rexBuilder: RexBuilder): RexProgram = {
+
+    val inputRewriter = new InputRewriter(usedInputFields)
+    val newProjectExpressions = rexProgram.getProjectList.map(
+      exp => rexProgram.expandLocalRef(exp).accept(inputRewriter)
+    ).toList.asJava
+
+    val oldCondition = rexProgram.getCondition
+    val newConditionExpression = {
+      oldCondition match {
+        case ref: RexLocalRef => rexProgram.expandLocalRef(ref).accept(inputRewriter)
+        case _ => null // null does not match any type
+      }
+    }
+    RexProgram.create(
+      inputRowType,
+      newProjectExpressions,
+      newConditionExpression,
+      rexProgram.getOutputRowType,
+      rexBuilder
+    )
+  }
+}
+
+/**
+  * A RexVisitor to extract used input fields
+  */
+class RefFieldsVisitor extends RexVisitorImpl[Unit](true) {
+  private var fields = mutable.LinkedHashSet[Int]()
+
+  def getFields: Array[Int] = fields.toArray
+
+  override def visitInputRef(inputRef: RexInputRef): Unit = fields += inputRef.getIndex
+
+  override def visitCall(call: RexCall): Unit =
+    call.operands.foreach(operand => operand.accept(this))
+}
+
+/**
+  * A RexShuttle to rewrite field accesses of a RexProgram.
+  *
+  * @param fields fields mapping
+  */
+class InputRewriter(fields: Array[Int]) extends RexShuttle {
+
+  /** old input fields ref index -> new input fields ref index mappings */
+  private val fieldMap: Map[Int, Int] =
+    fields.zipWithIndex.toMap
+
+  override def visitInputRef(inputRef: RexInputRef): RexNode =
+    new RexInputRef(relNodeIndex(inputRef), inputRef.getType)
+
+  override def visitLocalRef(localRef: RexLocalRef): RexNode =
+    new RexInputRef(relNodeIndex(localRef), localRef.getType)
+
+  private def relNodeIndex(ref: RexSlot): Int =
+    fieldMap.getOrElse(ref.getIndex,
+      throw new IllegalArgumentException("input field contains invalid index"))
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala
new file mode 100644
index 0000000..bbbf862
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.table.expressions.Expression
+
+/**
+  * Adds support for filtering push-down to a [[TableSource]].
+  * A [[TableSource]] extending this interface is able to filter the fields of the return table.
+  *
+  */
+trait FilterableTableSource {
+
+  /** return an predicate expression that was set. */
+  def getPredicate: Array[Expression]
+
+  /**
+    * @param predicate a filter expression that will be applied to fields to return.
+    * @return an unsupported predicate expression.
+    */
+  def setPredicate(predicate: Array[Expression]): Array[Expression]
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
index a3eb03d..fe205f1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.table.sources
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.TableEnvironment
 
 /** Defines an external table by providing schema information and used to produce a
   * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]].

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 3c89ec4..2c08d8d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -139,7 +139,21 @@ class FunctionCatalog {
 object FunctionCatalog {
 
   val builtInFunctions: Map[String, Class[_]] = Map(
+
+//    SqlStdOperatorTable.AS,
+//    SqlStdOperatorTable.DIVIDE_INTEGER,
+//    SqlStdOperatorTable.DOT,
+
     // logic
+    "and" -> classOf[And],
+    "or" -> classOf[Or],
+    "not" -> classOf[Not],
+    "equals" -> classOf[EqualTo],
+    "greaterThan" -> classOf[GreaterThan],
+    "greaterThanOrEqual" -> classOf[GreaterThanOrEqual],
+    "lessThan" -> classOf[LessThan],
+    "lessThanOrEqual" -> classOf[LessThanOrEqual],
+    "notEquals" -> classOf[NotEqualTo],
     "isNull" -> classOf[IsNull],
     "isNotNull" -> classOf[IsNotNull],
     "isTrue" -> classOf[IsTrue],
@@ -158,15 +172,23 @@ object FunctionCatalog {
     "charLength" -> classOf[CharLength],
     "initCap" -> classOf[InitCap],
     "like" -> classOf[Like],
+    "concat" -> classOf[Plus],
+    "lower" -> classOf[Lower],
     "lowerCase" -> classOf[Lower],
     "similar" -> classOf[Similar],
     "substring" -> classOf[Substring],
     "trim" -> classOf[Trim],
+    // duplicate functions for calcite
+    "upper" -> classOf[Upper],
     "upperCase" -> classOf[Upper],
     "position" -> classOf[Position],
     "overlay" -> classOf[Overlay],
 
     // math functions
+    "plus" -> classOf[Plus],
+    "minus" -> classOf[Minus],
+    "divide" -> classOf[Div],
+    "times" -> classOf[Mul],
     "abs" -> classOf[Abs],
     "ceil" -> classOf[Ceil],
     "exp" -> classOf[Exp],
@@ -176,6 +198,7 @@ object FunctionCatalog {
     "power" -> classOf[Power],
     "mod" -> classOf[Mod],
     "sqrt" -> classOf[Sqrt],
+    "minusPrefix" -> classOf[UnaryMinus],
 
     // temporal functions
     "extract" -> classOf[Extract],
@@ -186,6 +209,7 @@ object FunctionCatalog {
     "localTimestamp" -> classOf[LocalTimestamp],
     "quarter" -> classOf[Quarter],
     "temporalOverlaps" -> classOf[TemporalOverlaps],
+    "dateTimePlus" -> classOf[Plus],
 
     // array
     "cardinality" -> classOf[ArrayCardinality],

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
index 05c2a49..767e83f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
@@ -18,18 +18,16 @@
 
 package org.apache.flink.table
 
-import org.apache.calcite.tools.RuleSet
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
-import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment, TableException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.TableException
 import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference}
-import org.apache.flink.table.sinks.TableSink
-import org.apache.flink.table.sources.TableSource
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil.{batchTableNode, term, unaryNode, binaryNode, streamTableNode}
+import org.apache.flink.table.utils.{MockTableEnvironment, TableTestBase}
+import org.apache.flink.table.utils.TableTestUtil._
+
 import org.junit.Test
 import org.junit.Assert.assertEquals
 
@@ -350,19 +348,6 @@ class TableEnvironmentTest extends TableTestBase {
 
 }
 
-class MockTableEnvironment extends TableEnvironment(new TableConfig) {
-
-  override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = ???
-
-  override protected def checkValidTableName(name: String): Unit = ???
-
-  override protected def getBuiltInNormRuleSet: RuleSet = ???
-
-  override protected def getBuiltInOptRuleSet: RuleSet = ???
-
-  override def registerTableSource(name: String, tableSource: TableSource[_]) = ???
-}
-
 case class CClass(cf1: Int, cf2: String, cf3: Double)
 
 class PojoClass(var pf1: Int, var pf2: String, var pf3: Double) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
new file mode 100644
index 0000000..058eca7
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table
+
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.sources.{CsvTableSource, TableSource}
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.{Assert, Test}
+import org.apache.flink.table.utils.{CommonTestData, TableTestBase}
+
+class TableSourceTest extends TableTestBase {
+
+  private val projectedFields: Array[String] = Array("last", "id", "score")
+  private val noCalcFields: Array[String] = Array("id", "score", "first")
+
+  // batch plan
+
+  @Test
+  def testBatchProjectableSourceScanPlanTableApi(): Unit = {
+    val (tableSource, tableName) = csvTable
+    val util = batchTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+      .scan(tableName)
+      .select('last.upperCase(), 'id.floor(), 'score * 2)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      projectableSourceBatchTableNode(tableName, projectedFields),
+      term("select", "UPPER(last) AS _c0", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testBatchProjectableSourceScanPlanSQL(): Unit = {
+    val (tableSource, tableName) = csvTable
+    val util = batchTestUtil()
+
+    util.tEnv.registerTableSource(tableName, tableSource)
+
+    val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName"
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      projectableSourceBatchTableNode(tableName, projectedFields),
+      term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testBatchProjectableSourceScanNoIdentityCalc(): Unit = {
+    val (tableSource, tableName) = csvTable
+    val util = batchTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+      .scan(tableName)
+      .select('id, 'score, 'first)
+
+    val expected = projectableSourceBatchTableNode(tableName, noCalcFields)
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testBatchFilterableSourceScanPlanTableApi(): Unit = {
+    val (tableSource, tableName) = filterableTableSource
+    val util = batchTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+      .scan(tableName)
+      .select('price, 'id, 'amount)
+      .where("amount > 2 && price * 2 < 32")
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      filterableSourceBatchTableNode(
+        tableName,
+        Array("name", "id", "amount", "price"),
+        ">(amount, 2)"),
+      term("select", "price", "id", "amount"),
+      term("where", "<(*(price, 2), 32)")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  // stream plan
+
+  @Test
+  def testStreamProjectableSourceScanPlanTableApi(): Unit = {
+    val (tableSource, tableName) = csvTable
+    val util = streamTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+      .scan(tableName)
+      .select('last, 'id.floor(), 'score * 2)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      projectableSourceStreamTableNode(tableName, projectedFields),
+      term("select", "last", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testStreamProjectableSourceScanPlanSQL(): Unit = {
+    val (tableSource, tableName) = csvTable
+    val util = streamTestUtil()
+
+    util.tEnv.registerTableSource(tableName, tableSource)
+
+    val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName"
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      projectableSourceStreamTableNode(tableName, projectedFields),
+      term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testStreamProjectableSourceScanNoIdentityCalc(): Unit = {
+    val (tableSource, tableName) = csvTable
+    val util = streamTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+      .scan(tableName)
+      .select('id, 'score, 'first)
+
+    val expected = projectableSourceStreamTableNode(tableName, noCalcFields)
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testStreamFilterableSourceScanPlanTableApi(): Unit = {
+    val (tableSource, tableName) = filterableTableSource
+    val util = streamTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+      .scan(tableName)
+      .select('price, 'id, 'amount)
+      .where("amount > 2 && price * 2 < 32")
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      filterableSourceStreamTableNode(
+        tableName,
+        Array("name", "id", "amount", "price"),
+        ">(amount, 2)"),
+      term("select", "price", "id", "amount"),
+      term("where", "<(*(price, 2), 32)")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  // csv builder
+
+  @Test
+  def testCsvTableSourceBuilder(): Unit = {
+    val source1 = CsvTableSource.builder()
+      .path("/path/to/csv")
+      .field("myfield", Types.STRING)
+      .field("myfield2", Types.INT)
+      .quoteCharacter(';')
+      .fieldDelimiter("#")
+      .lineDelimiter("\r\n")
+      .commentPrefix("%%")
+      .ignoreFirstLine()
+      .ignoreParseErrors()
+      .build()
+
+    val source2 = new CsvTableSource(
+      "/path/to/csv",
+      Array("myfield", "myfield2"),
+      Array(Types.STRING, Types.INT),
+      "#",
+      "\r\n",
+      ';',
+      true,
+      "%%",
+      true)
+
+    Assert.assertEquals(source1, source2)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testCsvTableSourceBuilderWithNullPath(): Unit = {
+    CsvTableSource.builder()
+      .field("myfield", Types.STRING)
+      // should fail, path is not defined
+      .build()
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testCsvTableSourceBuilderWithDuplicateFieldName(): Unit = {
+    CsvTableSource.builder()
+      .path("/path/to/csv")
+      .field("myfield", Types.STRING)
+      // should fail, field name must no be duplicate
+      .field("myfield", Types.INT)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testCsvTableSourceBuilderWithEmptyField(): Unit = {
+    CsvTableSource.builder()
+      .path("/path/to/csv")
+      // should fail, field can be empty
+      .build()
+  }
+
+  // utils
+
+  def filterableTableSource:(TableSource[_], String) = {
+    val tableSource = CommonTestData.getFilterableTableSource
+    (tableSource, "filterableTable")
+  }
+
+  def csvTable: (CsvTableSource, String) = {
+    val csvTable = CommonTestData.getCsvTableSource
+    val tableName = "csvTable"
+    (csvTable, tableName)
+  }
+
+  def projectableSourceBatchTableNode(
+      sourceName: String,
+      fields: Array[String]): String = {
+
+    "BatchTableSourceScan(" +
+      s"table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
+  }
+
+  def projectableSourceStreamTableNode(
+      sourceName: String,
+      fields: Array[String]): String = {
+    
+    "StreamTableSourceScan(" +
+      s"table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
+  }
+
+  def filterableSourceBatchTableNode(
+    sourceName: String,
+    fields: Array[String],
+    exp: String): String = {
+
+    "BatchTableSourceScan(" +
+      s"table=[[$sourceName]], fields=[${fields.mkString(", ")}], filter=[$exp])"
+  }
+
+  def filterableSourceStreamTableNode(
+    sourceName: String,
+    fields: Array[String],
+    exp: String): String = {
+
+    "StreamTableSourceScan(" +
+      s"table=[[$sourceName]], fields=[${fields.mkString(", ")}], filter=[$exp])"
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
index 70f4345..ca7cd8a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
@@ -102,4 +102,20 @@ class TableSourceITCase(
     TestBaseUtils.compareResultAsText(result.asJava, expected)
   }
 
+  @Test
+  def testTableSourceWithFilterable(): Unit = {
+    val tableName = "MyTable"
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+    tableEnv.registerTableSource(tableName, CommonTestData.getFilterableTableSource)
+    val results = tableEnv
+      .scan(tableName)
+      .where("amount > 4 && price < 9")
+      .select("id, name")
+      .collect()
+
+    val expected = Seq(
+      "5,Record_5", "6,Record_6", "7,Record_7", "8,Record_8").mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala
deleted file mode 100644
index 670e268..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch
-
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.sources.CsvTableSource
-import org.apache.flink.table.utils.{CommonTestData, TableTestBase}
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.{Assert, Test}
-
-class TableSourceTest extends TableTestBase {
-
-  private val projectedFields: Array[String] = Array("last", "id", "score")
-  private val noCalcFields: Array[String] = Array("id", "score", "first")
-
-  @Test
-  def testBatchProjectableSourceScanPlanTableApi(): Unit = {
-    val (csvTable, tableName) = tableSource
-    val util = batchTestUtil()
-    val tEnv = util.tEnv
-
-    tEnv.registerTableSource(tableName, csvTable)
-
-    val result = tEnv
-      .scan(tableName)
-      .select('last.upperCase(), 'id.floor(), 'score * 2)
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      sourceBatchTableNode(tableName, projectedFields),
-      term("select", "UPPER(last) AS _c0", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testBatchProjectableSourceScanPlanSQL(): Unit = {
-    val (csvTable, tableName) = tableSource
-    val util = batchTestUtil()
-
-    util.tEnv.registerTableSource(tableName, csvTable)
-
-    val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      sourceBatchTableNode(tableName, projectedFields),
-      term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testBatchProjectableSourceScanNoIdentityCalc(): Unit = {
-    val (csvTable, tableName) = tableSource
-    val util = batchTestUtil()
-    val tEnv = util.tEnv
-
-    tEnv.registerTableSource(tableName, csvTable)
-
-    val result = tEnv
-      .scan(tableName)
-      .select('id, 'score, 'first)
-
-    val expected = sourceBatchTableNode(tableName, noCalcFields)
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testStreamProjectableSourceScanPlanTableApi(): Unit = {
-    val (csvTable, tableName) = tableSource
-    val util = streamTestUtil()
-    val tEnv = util.tEnv
-
-    tEnv.registerTableSource(tableName, csvTable)
-
-    val result = tEnv
-      .scan(tableName)
-      .select('last, 'id.floor(), 'score * 2)
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      sourceStreamTableNode(tableName, projectedFields),
-      term("select", "last", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testStreamProjectableSourceScanPlanSQL(): Unit = {
-    val (csvTable, tableName) = tableSource
-    val util = streamTestUtil()
-
-    util.tEnv.registerTableSource(tableName, csvTable)
-
-    val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      sourceStreamTableNode(tableName, projectedFields),
-      term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testStreamProjectableSourceScanNoIdentityCalc(): Unit = {
-    val (csvTable, tableName) = tableSource
-    val util = streamTestUtil()
-    val tEnv = util.tEnv
-
-    tEnv.registerTableSource(tableName, csvTable)
-
-    val result = tEnv
-      .scan(tableName)
-      .select('id, 'score, 'first)
-
-    val expected = sourceStreamTableNode(tableName, noCalcFields)
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testCsvTableSourceBuilder(): Unit = {
-    val source1 = CsvTableSource.builder()
-      .path("/path/to/csv")
-      .field("myfield", Types.STRING)
-      .field("myfield2", Types.INT)
-      .quoteCharacter(';')
-      .fieldDelimiter("#")
-      .lineDelimiter("\r\n")
-      .commentPrefix("%%")
-      .ignoreFirstLine()
-      .ignoreParseErrors()
-      .build()
-
-    val source2 = new CsvTableSource(
-      "/path/to/csv",
-      Array("myfield", "myfield2"),
-      Array(Types.STRING, Types.INT),
-      "#",
-      "\r\n",
-      ';',
-      true,
-      "%%",
-      true)
-
-    Assert.assertEquals(source1, source2)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testCsvTableSourceBuilderWithNullPath(): Unit = {
-    CsvTableSource.builder()
-      .field("myfield", Types.STRING)
-      // should fail, path is not defined
-      .build()
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testCsvTableSourceBuilderWithDuplicateFieldName(): Unit = {
-    CsvTableSource.builder()
-      .path("/path/to/csv")
-      .field("myfield", Types.STRING)
-      // should fail, field name must no be duplicate
-      .field("myfield", Types.INT)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testCsvTableSourceBuilderWithEmptyField(): Unit = {
-    CsvTableSource.builder()
-      .path("/path/to/csv")
-      // should fail, field can be empty
-      .build()
-  }
-
-  def tableSource: (CsvTableSource, String) = {
-    val csvTable = CommonTestData.getCsvTableSource
-    val tableName = "csvTable"
-    (csvTable, tableName)
-  }
-
-  def sourceBatchTableNode(sourceName: String, fields: Array[String]): String = {
-    s"BatchTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
-  }
-
-  def sourceStreamTableNode(sourceName: String, fields: Array[String] ): String = {
-    s"StreamTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
index 06d94aa..973c2f3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
@@ -83,4 +83,23 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
       "Williams,4.68")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testCsvTableSourceWithFilterable(): Unit = {
+    StreamITCase.testResults = mutable.MutableList()
+    val tableName = "MyTable"
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    tEnv.registerTableSource(tableName, CommonTestData.getFilterableTableSource)
+    tEnv.scan(tableName)
+      .where("amount > 4 && price < 9")
+      .select("id, name")
+      .addSink(new StreamITCase.StringSink)
+
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "5,Record_5", "6,Record_6", "7,Record_7", "8,Record_8")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractorTest.scala
deleted file mode 100644
index fe19c8e..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractorTest.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.rules.util
-
-import java.math.BigDecimal
-
-import org.apache.calcite.adapter.java.JavaTypeFactory
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
-import org.apache.calcite.rex.{RexBuilder, RexProgram, RexProgramBuilder}
-import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, DOUBLE, INTEGER, VARCHAR}
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.flink.table.plan.rules.util.RexProgramProjectExtractor._
-import org.junit.Assert.{assertArrayEquals, assertTrue}
-import org.junit.{Before, Test}
-
-import scala.collection.JavaConverters._
-
-/**
-  * This class is responsible for testing RexProgramProjectExtractor.
-  */
-class RexProgramProjectExtractorTest {
-  private var typeFactory: JavaTypeFactory = _
-  private var rexBuilder: RexBuilder = _
-  private var allFieldTypes: Seq[RelDataType] = _
-  private val allFieldNames = List("name", "id", "amount", "price")
-
-  @Before
-  def setUp(): Unit = {
-    typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT)
-    rexBuilder = new RexBuilder(typeFactory)
-    allFieldTypes = List(VARCHAR, BIGINT, INTEGER, DOUBLE).map(typeFactory.createSqlType(_))
-  }
-
-  @Test
-  def testExtractRefInputFields(): Unit = {
-    val usedFields = extractRefInputFields(buildRexProgram())
-    assertArrayEquals(usedFields, Array(2, 3, 1))
-  }
-
-  @Test
-  def testRewriteRexProgram(): Unit = {
-    val originRexProgram = buildRexProgram()
-    assertTrue(extractExprStrList(originRexProgram).sameElements(Array(
-      "$0",
-      "$1",
-      "$2",
-      "$3",
-      "*($t2, $t3)",
-      "100",
-      "<($t4, $t5)",
-      "6",
-      ">($t1, $t7)",
-      "AND($t6, $t8)")))
-    // use amount, id, price fields to create a new RexProgram
-    val usedFields = Array(2, 3, 1)
-    val types = usedFields.map(allFieldTypes(_)).toList.asJava
-    val names = usedFields.map(allFieldNames(_)).toList.asJava
-    val inputRowType = typeFactory.createStructType(types, names)
-    val newRexProgram = rewriteRexProgram(originRexProgram, inputRowType, usedFields, rexBuilder)
-    assertTrue(extractExprStrList(newRexProgram).sameElements(Array(
-      "$0",
-      "$1",
-      "$2",
-      "*($t0, $t1)",
-      "100",
-      "<($t3, $t4)",
-      "6",
-      ">($t2, $t6)",
-      "AND($t5, $t7)")))
-  }
-
-  private def buildRexProgram(): RexProgram = {
-    val types = allFieldTypes.asJava
-    val names = allFieldNames.asJava
-    val inputRowType = typeFactory.createStructType(types, names)
-    val builder = new RexProgramBuilder(inputRowType, rexBuilder)
-    val t0 = rexBuilder.makeInputRef(types.get(2), 2)
-    val t1 = rexBuilder.makeInputRef(types.get(1), 1)
-    val t2 = rexBuilder.makeInputRef(types.get(3), 3)
-    val t3 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.MULTIPLY, t0, t2))
-    val t4 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L))
-    val t5 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(6L))
-    // project: amount, amount * price
-    builder.addProject(t0, "amount")
-    builder.addProject(t3, "total")
-    // condition: amount * price < 100 and id > 6
-    val t6 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, t3, t4))
-    val t7 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN, t1, t5))
-    val t8 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.AND, List(t6, t7).asJava))
-    builder.addCondition(t8)
-    builder.getProgram
-  }
-
-  /**
-    * extract all expression string list from input RexProgram expression lists
-    *
-    * @param rexProgram input RexProgram instance to analyze
-    * @return all expression string list of input RexProgram expression lists
-    */
-  private def extractExprStrList(rexProgram: RexProgram) = {
-    rexProgram.getExprList.asScala.map(_.toString)
-  }
-
-}