You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2017/03/17 10:03:03 UTC

[1/4] flink git commit: [FLINK-3849] [table] Add FilterableTableSource interface and rules for pushing it (1)

Repository: flink
Updated Branches:
  refs/heads/master ab014ef94 -> 78f22aaec


http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractorTest.scala
new file mode 100644
index 0000000..c4059d5
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractorTest.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import java.math.BigDecimal
+
+import org.apache.calcite.adapter.java.JavaTypeFactory
+import org.apache.calcite.plan._
+import org.apache.calcite.plan.volcano.VolcanoPlanner
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rex.{RexBuilder, RexProgram, RexProgramBuilder}
+import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import org.apache.flink.table.expressions.{Expression, ExpressionParser}
+import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._
+import org.apache.flink.table.plan.schema.CompositeRelDataType
+import org.apache.flink.table.utils.CommonTestData
+import org.junit.Test
+import org.junit.Assert._
+
+import scala.collection.JavaConverters._
+
+class RexProgramExpressionExtractorTest {
+
+  private val typeFactory = new FlinkTypeFactory(RelDataTypeSystem.DEFAULT)
+  private val allFieldTypes = List(VARCHAR, DECIMAL, INTEGER, DOUBLE).map(typeFactory.createSqlType)
+  private val allFieldTypeInfos: Array[TypeInformation[_]] =
+    Array(BasicTypeInfo.STRING_TYPE_INFO,
+      BasicTypeInfo.BIG_DEC_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.DOUBLE_TYPE_INFO)
+  private val allFieldNames = List("name", "id", "amount", "price")
+
+  @Test
+  def testExtractExpression(): Unit = {
+    val builder: RexBuilder = new RexBuilder(typeFactory)
+    val program = buildRexProgram(
+      allFieldNames, allFieldTypes, typeFactory, builder)
+    val firstExp = ExpressionParser.parseExpression("id > 6")
+    val secondExp = ExpressionParser.parseExpression("amount * price < 100")
+    val expected: Array[Expression] = Array(firstExp, secondExp)
+    val actual = extractPredicateExpressions(
+      program,
+      builder,
+      CommonTestData.getMockTableEnvironment.getFunctionCatalog)
+
+    assertEquals(expected.length, actual.length)
+    // todo
+  }
+
+  @Test
+  def testRewriteRexProgramWithCondition(): Unit = {
+    val originalRexProgram = buildRexProgram(
+      allFieldNames, allFieldTypes, typeFactory, new RexBuilder(typeFactory))
+    val array = Array(
+      "$0",
+      "$1",
+      "$2",
+      "$3",
+      "*($t2, $t3)",
+      "100",
+      "<($t4, $t5)",
+      "6",
+      ">($t1, $t7)",
+      "AND($t6, $t8)")
+    assertTrue(extractExprStrList(originalRexProgram) sameElements array)
+
+    val tEnv = CommonTestData.getMockTableEnvironment
+    val builder = FlinkRelBuilder.create(tEnv.getFrameworkConfig)
+    val tableScan = new MockTableScan(builder.getRexBuilder)
+    val newExpression = ExpressionParser.parseExpression("amount * price < 100")
+    val newRexProgram = rewriteRexProgram(
+      originalRexProgram,
+      tableScan,
+      Array(newExpression)
+    )(builder)
+
+    val newArray = Array(
+      "$0",
+      "$1",
+      "$2",
+      "$3",
+      "*($t2, $t3)",
+      "100",
+      "<($t4, $t5)")
+    assertTrue(extractExprStrList(newRexProgram) sameElements newArray)
+  }
+
+//  @Test
+//  def testVerifyExpressions(): Unit = {
+//    val strPart = "f1 < 4"
+//    val part = parseExpression(strPart)
+//
+//    val shortFalseOrigin = parseExpression(s"f0 > 10 || $strPart")
+//    assertFalse(verifyExpressions(shortFalseOrigin, part))
+//
+//    val longFalseOrigin = parseExpression(s"(f0 > 10 || (($strPart) > POWER(f0, f1))) && 2")
+//    assertFalse(verifyExpressions(longFalseOrigin, part))
+//
+//    val shortOkayOrigin = parseExpression(s"f0 > 10 && ($strPart)")
+//    assertTrue(verifyExpressions(shortOkayOrigin, part))
+//
+//    val longOkayOrigin = parseExpression(s"f0 > 10 && (($strPart) > POWER(f0, f1))")
+//    assertTrue(verifyExpressions(longOkayOrigin, part))
+//
+//    val longOkayOrigin2 = parseExpression(s"(f0 > 10 || (2 > POWER(f0, f1))) && $strPart")
+//    assertTrue(verifyExpressions(longOkayOrigin2, part))
+//  }
+
+  private def buildRexProgram(
+      fieldNames: List[String],
+      fieldTypes: Seq[RelDataType],
+      typeFactory: JavaTypeFactory,
+      rexBuilder: RexBuilder): RexProgram = {
+
+    val inputRowType = typeFactory.createStructType(fieldTypes.asJava, fieldNames.asJava)
+    val builder = new RexProgramBuilder(inputRowType, rexBuilder)
+
+    val t0 = rexBuilder.makeInputRef(fieldTypes(2), 2)
+    val t1 = rexBuilder.makeInputRef(fieldTypes(1), 1)
+    val t2 = rexBuilder.makeInputRef(fieldTypes(3), 3)
+    // t3 = t0 * t2
+    val t3 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.MULTIPLY, t0, t2))
+    val t4 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L))
+    val t5 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(6L))
+    // project: amount, amount * price
+    builder.addProject(t0, "amount")
+    builder.addProject(t3, "total")
+    // t6 = t3 < t4
+    val t6 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, t3, t4))
+    // t7 = t1 > t5
+    val t7 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN, t1, t5))
+    val t8 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.AND, List(t6, t7).asJava))
+    // condition: t6 and t7
+    // (t0 * t2 < t4 && t1 > t5)
+    builder.addCondition(t8)
+    builder.getProgram
+  }
+
+  /**
+    * extract all expression string list from input RexProgram expression lists
+    *
+    * @param rexProgram input RexProgram instance to analyze
+    * @return all expression string list of input RexProgram expression lists
+    */
+  private def extractExprStrList(rexProgram: RexProgram) =
+    rexProgram.getExprList.asScala.map(_.toString).toArray
+
+  class MockTableScan(
+      rexBuilder: RexBuilder)
+    extends TableScan(
+      RelOptCluster.create(new VolcanoPlanner(), rexBuilder),
+      RelTraitSet.createEmpty,
+      new MockRelOptTable)
+
+  class MockRelOptTable
+    extends RelOptAbstractTable(
+      null,
+      "mockRelTable",
+      new CompositeRelDataType(
+        new RowTypeInfo(allFieldTypeInfos, allFieldNames.toArray), typeFactory))
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractorTest.scala
new file mode 100644
index 0000000..cea9eee
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractorTest.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import java.math.BigDecimal
+
+import org.apache.calcite.adapter.java.JavaTypeFactory
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
+import org.apache.calcite.rex.{RexBuilder, RexProgram, RexProgramBuilder}
+import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, DOUBLE, INTEGER, VARCHAR}
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.flink.table.plan.util.RexProgramProjectExtractor._
+import org.junit.Assert.{assertArrayEquals, assertTrue}
+import org.junit.{Before, Test}
+
+import scala.collection.JavaConverters._
+
+/**
+  * This class is responsible for testing RexProgramProjectExtractor.
+  */
+class RexProgramProjectExtractorTest {
+  private var typeFactory: JavaTypeFactory = _
+  private var rexBuilder: RexBuilder = _
+  private var allFieldTypes: Seq[RelDataType] = _
+  private val allFieldNames = List("name", "id", "amount", "price")
+
+  @Before
+  def setUp(): Unit = {
+    typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT)
+    rexBuilder = new RexBuilder(typeFactory)
+    allFieldTypes = List(VARCHAR, BIGINT, INTEGER, DOUBLE).map(typeFactory.createSqlType(_))
+  }
+
+  @Test
+  def testExtractRefInputFields(): Unit = {
+    val usedFields = extractRefInputFields(buildRexProgram())
+    assertArrayEquals(usedFields, Array(2, 3, 1))
+  }
+
+  @Test
+  def testRewriteRexProgram(): Unit = {
+    val originRexProgram = buildRexProgram()
+    assertTrue(extractExprStrList(originRexProgram).sameElements(Array(
+      "$0",
+      "$1",
+      "$2",
+      "$3",
+      "*($t2, $t3)",
+      "100",
+      "<($t4, $t5)",
+      "6",
+      ">($t1, $t7)",
+      "AND($t6, $t8)")))
+    // use amount, id, price fields to create a new RexProgram
+    val usedFields = Array(2, 3, 1)
+    val types = usedFields.map(allFieldTypes(_)).toList.asJava
+    val names = usedFields.map(allFieldNames(_)).toList.asJava
+    val inputRowType = typeFactory.createStructType(types, names)
+    val newRexProgram = rewriteRexProgram(originRexProgram, inputRowType, usedFields, rexBuilder)
+    assertTrue(extractExprStrList(newRexProgram).sameElements(Array(
+      "$0",
+      "$1",
+      "$2",
+      "*($t0, $t1)",
+      "100",
+      "<($t3, $t4)",
+      "6",
+      ">($t2, $t6)",
+      "AND($t5, $t7)")))
+  }
+
+  private def buildRexProgram(): RexProgram = {
+    val types = allFieldTypes.asJava
+    val names = allFieldNames.asJava
+    val inputRowType = typeFactory.createStructType(types, names)
+    val builder = new RexProgramBuilder(inputRowType, rexBuilder)
+    val t0 = rexBuilder.makeInputRef(types.get(2), 2)
+    val t1 = rexBuilder.makeInputRef(types.get(1), 1)
+    val t2 = rexBuilder.makeInputRef(types.get(3), 3)
+    val t3 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.MULTIPLY, t0, t2))
+    val t4 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L))
+    val t5 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(6L))
+    // project: amount, amount * price
+    builder.addProject(t0, "amount")
+    builder.addProject(t3, "total")
+    // condition: amount * price < 100 and id > 6
+    val t6 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, t3, t4))
+    val t7 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN, t1, t5))
+    val t8 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.AND, List(t6, t7).asJava))
+    builder.addCondition(t8)
+    builder.getProgram
+  }
+
+  /**
+    * extract all expression string list from input RexProgram expression lists
+    *
+    * @param rexProgram input RexProgram instance to analyze
+    * @return all expression string list of input RexProgram expression lists
+    */
+  private def extractExprStrList(rexProgram: RexProgram) = {
+    rexProgram.getExprList.asScala.map(_.toString)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
index 6e4859b..a720f02 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
@@ -21,14 +21,21 @@ package org.apache.flink.table.utils
 import java.io.{File, FileOutputStream, OutputStreamWriter}
 import java.util
 
-import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.table.sources.{BatchTableSource, CsvTableSource}
+import org.apache.calcite.tools.RuleSet
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo, TypeExtractor}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.table.sources.{BatchTableSource, CsvTableSource, TableSource}
-import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment}
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources._
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
 
 object CommonTestData {
 
@@ -98,4 +105,113 @@ object CommonTestData {
       this(null, null)
     }
   }
+
+  def getMockTableEnvironment: TableEnvironment = new MockTableEnvironment
+
+  def getFilterableTableSource = new TestFilterableTableSource
+}
+
+class MockTableEnvironment extends TableEnvironment(new TableConfig) {
+
+  override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = ???
+
+  override protected def checkValidTableName(name: String): Unit = ???
+
+  override def sql(query: String): Table = ???
+
+  override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = ???
+
+  override protected def getBuiltInNormRuleSet: RuleSet = ???
+
+  override protected def getBuiltInOptRuleSet: RuleSet = ???
+}
+
+class TestFilterableTableSource
+    extends BatchTableSource[Row]
+    with StreamTableSource[Row]
+    with FilterableTableSource
+    with DefinedFieldNames {
+
+  import org.apache.flink.table.api.Types._
+
+  val fieldNames = Array("name", "id", "amount", "price")
+  val fieldTypes = Array[TypeInformation[_]](STRING, LONG, INT, DOUBLE)
+
+  private var filterLiteral: Literal = _
+  private var filterPredicates: Array[Expression] = Array.empty
+
+  /** Returns the data of the table as a [[DataSet]]. */
+  override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
+    execEnv.fromCollection[Row](generateDynamicCollection(33).asJava, getReturnType)
+  }
+
+  /** Returns the data of the table as a [[DataStream]]. */
+  def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
+    execEnv.fromCollection[Row](generateDynamicCollection(33).asJava, getReturnType)
+  }
+
+  private def generateDynamicCollection(num: Int): Seq[Row] = {
+
+    if (filterLiteral == null) {
+      throw new RuntimeException("filter expression was not set")
+    }
+
+    val filterValue = filterLiteral.value.asInstanceOf[Number].intValue()
+
+    def shouldCreateRow(value: Int): Boolean = {
+      value > filterValue
+    }
+
+    for {
+      cnt <- 0 until num
+      if shouldCreateRow(cnt)
+    } yield {
+        val row = new Row(fieldNames.length)
+        fieldNames.zipWithIndex.foreach { case (name, index) =>
+          name match {
+            case "name" =>
+              row.setField(index, s"Record_$cnt")
+            case "id" =>
+              row.setField(index, cnt.toLong)
+            case "amount" =>
+              row.setField(index, cnt.toInt)
+            case "price" =>
+              row.setField(index, cnt.toDouble)
+          }
+        }
+      row
+      }
+  }
+
+  /** Returns the [[TypeInformation]] for the return type. */
+  override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes, fieldNames)
+
+  /** Returns the names of the table fields. */
+  override def getFieldNames: Array[String] = fieldNames
+
+  /** Returns the indices of the table fields. */
+  override def getFieldIndices: Array[Int] = fieldNames.indices.toArray
+
+  override def getPredicate: Array[Expression] = filterPredicates
+
+  /** Return an unsupported predicates expression. */
+  override def setPredicate(predicates: Array[Expression]): Array[Expression] = {
+    predicates(0) match {
+      case gt: GreaterThan =>
+        gt.left match {
+          case f: ResolvedFieldReference =>
+            gt.right match {
+              case l: Literal =>
+                if (f.name.equals("amount")) {
+                  filterLiteral = l
+                  filterPredicates = Array(predicates(0))
+                  Array(predicates(1))
+                } else predicates
+              case _ => predicates
+            }
+          case _ => predicates
+        }
+      case _ => predicates
+    }
+  }
 }


[2/4] flink git commit: [FLINK-3849] [table] Add FilterableTableSource interface and rules for pushing it (1)

Posted by ku...@apache.org.
[FLINK-3849] [table] Add FilterableTableSource interface and rules for pushing it (1)

fix filterable test

rebase and trying fix rexnode parsing

create wrapper and update rules


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f6cd2e7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f6cd2e7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f6cd2e7

Branch: refs/heads/master
Commit: 9f6cd2e76a44f194d96db7a218e5032791c5925c
Parents: ab014ef
Author: tonycox <an...@epam.com>
Authored: Wed Jan 11 13:15:49 2017 +0400
Committer: Kurt Young <ku...@apache.org>
Committed: Fri Mar 17 18:01:27 2017 +0800

----------------------------------------------------------------------
 .../flink/table/api/BatchTableEnvironment.scala |   2 +-
 .../table/api/StreamTableEnvironment.scala      |   2 +-
 .../flink/table/api/TableEnvironment.scala      |  12 +
 .../flink/table/calcite/RexNodeWrapper.scala    | 106 +++++++
 .../table/expressions/stringExpressions.scala   |   2 +-
 .../flink/table/plan/nodes/CommonCalc.scala     |  34 ++-
 .../nodes/dataset/BatchTableSourceScan.scala    |  20 +-
 .../table/plan/nodes/dataset/DataSetCalc.scala  |  24 +-
 .../plan/nodes/datastream/DataStreamCalc.scala  |  15 +-
 .../datastream/StreamTableSourceScan.scala      |  24 +-
 .../flink/table/plan/rules/FlinkRuleSets.scala  |  11 +-
 ...PushFilterIntoBatchTableSourceScanRule.scala |  95 ++++++
 ...ushProjectIntoBatchTableSourceScanRule.scala |   2 +-
 ...ushFilterIntoStreamTableSourceScanRule.scala |  95 ++++++
 ...shProjectIntoStreamTableSourceScanRule.scala |   2 +-
 .../rules/util/RexProgramProjectExtractor.scala | 120 --------
 .../table/plan/schema/TableSourceTable.scala    |   1 +
 .../util/RexProgramExpressionExtractor.scala    | 163 ++++++++++
 .../plan/util/RexProgramProjectExtractor.scala  | 120 ++++++++
 .../table/sources/FilterableTableSource.scala   |  38 +++
 .../flink/table/sources/TableSource.scala       |   1 -
 .../flink/table/validate/FunctionCatalog.scala  |  24 ++
 .../flink/table/TableEnvironmentTest.scala      |  25 +-
 .../apache/flink/table/TableSourceTest.scala    | 300 +++++++++++++++++++
 .../api/scala/batch/TableSourceITCase.scala     |  16 +
 .../table/api/scala/batch/TableSourceTest.scala | 209 -------------
 .../api/scala/stream/TableSourceITCase.scala    |  19 ++
 .../util/RexProgramProjectExtractorTest.scala   | 121 --------
 .../RexProgramExpressionExtractorTest.scala     | 182 +++++++++++
 .../util/RexProgramProjectExtractorTest.scala   | 121 ++++++++
 .../flink/table/utils/CommonTestData.scala      | 128 +++++++-
 31 files changed, 1511 insertions(+), 523 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index b48e9f9..7f27357 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -95,7 +95,7 @@ abstract class BatchTableEnvironment(
 
     tableSource match {
       case batchTableSource: BatchTableSource[_] =>
-        registerTableInternal(name, new TableSourceTable(batchTableSource))
+        registerTableInternal(name, new TableSourceTable(batchTableSource, this))
       case _ =>
         throw new TableException("Only BatchTableSource can be registered in " +
             "BatchTableEnvironment")

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index d927c3a..7e9f38f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -136,7 +136,7 @@ abstract class StreamTableEnvironment(
 
     tableSource match {
       case streamTableSource: StreamTableSource[_] =>
-        registerTableInternal(name, new TableSourceTable(streamTableSource))
+        registerTableInternal(name, new TableSourceTable(streamTableSource, this))
       case _ =>
         throw new TableException("Only StreamTableSource can be registered in " +
             "StreamTableEnvironment")

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 1dda3a8..291f49f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -842,6 +842,18 @@ object TableEnvironment {
   }
 
   /**
+    * Returns field types for a given [[TableSource]].
+    *
+    * @param tableSource The TableSource to extract field types from.
+    * @tparam A The type of the TableSource.
+    * @return An array holding the field types.
+    */
+  def getFieldTypes[A](tableSource: TableSource[A]): Array[TypeInformation[_]] = {
+    val returnType = tableSource.getReturnType
+    TableEnvironment.getFieldTypes(returnType)
+  }
+
+  /**
     * Returns field names for a given [[TableSource]].
     *
     * @param tableSource The TableSource to extract field names from.

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RexNodeWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RexNodeWrapper.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RexNodeWrapper.scala
new file mode 100644
index 0000000..1926a67
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RexNodeWrapper.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.calcite
+
+import org.apache.calcite.rex._
+import org.apache.calcite.sql._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference}
+import org.apache.flink.table.validate.FunctionCatalog
+import org.apache.flink.table.calcite.RexNodeWrapper._
+
+abstract class RexNodeWrapper(rex: RexNode) {
+  def get: RexNode = rex
+  def toExpression(names: Map[RexInputRef, String]): Expression
+}
+
+case class RexLiteralWrapper(literal: RexLiteral) extends RexNodeWrapper(literal) {
+  override def toExpression(names: Map[RexInputRef, String]): Expression = {
+    val typeInfo = FlinkTypeFactory.toTypeInfo(literal.getType)
+    Literal(literal.getValue, typeInfo)
+  }
+}
+
+case class RexInputWrapper(input: RexInputRef) extends RexNodeWrapper(input) {
+  override def toExpression(names: Map[RexInputRef, String]): Expression = {
+    val typeInfo = FlinkTypeFactory.toTypeInfo(input.getType)
+    ResolvedFieldReference(names(input), typeInfo)
+  }
+}
+
+case class RexCallWrapper(
+    call: RexCall,
+    operands: Seq[RexNodeWrapper]) extends RexNodeWrapper(call) {
+
+  override def toExpression(names: Map[RexInputRef, String]): Expression = {
+    val ops = operands.map(_.toExpression(names))
+    call.op match {
+      case function: SqlFunction =>
+        lookupFunction(replace(function.getName), ops)
+      case postfix: SqlPostfixOperator =>
+        lookupFunction(replace(postfix.getName), ops)
+      case operator@_ =>
+        val name = replace(s"${operator.kind}")
+        lookupFunction(name, ops)
+    }
+  }
+
+  def replace(str: String): String = {
+    str.replaceAll("\\s|_", "")
+  }
+}
+
+object RexNodeWrapper {
+
+  private var catalog: Option[FunctionCatalog] = None
+
+  def wrap(rex: RexNode, functionCatalog: FunctionCatalog): RexNodeWrapper = {
+    catalog = Option(functionCatalog)
+    rex.accept(new WrapperVisitor)
+  }
+
+  private[table] def lookupFunction(name: String, operands: Seq[Expression]): Expression = {
+    catalog.getOrElse(throw TableException("FunctionCatalog was not defined"))
+      .lookupFunction(name, operands)
+  }
+}
+
+class WrapperVisitor extends RexVisitorImpl[RexNodeWrapper](true) {
+
+  override def visitInputRef(inputRef: RexInputRef): RexNodeWrapper = {
+    RexInputWrapper(inputRef)
+  }
+
+  override def visitLiteral(literal: RexLiteral): RexNodeWrapper = {
+    RexLiteralWrapper(literal)
+  }
+
+  override def visitLocalRef(localRef: RexLocalRef): RexNodeWrapper = {
+    localRef.accept(this)
+  }
+
+  override def visitCall(call: RexCall): RexNodeWrapper = {
+    val operands = for {
+      x <- 0 until call.operands.size()
+    } yield {
+      call.operands.get(x).accept(this)
+    }
+    RexCallWrapper(call, operands)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
index f4b58cc..e8ae0d8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
@@ -111,7 +111,7 @@ case class Lower(child: Expression) extends UnaryExpression {
     }
   }
 
-  override def toString: String = s"($child).toLowerCase()"
+  override def toString: String = s"($child).lowerCase()"
 
   override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
     relBuilder.call(SqlStdOperatorTable.LOWER, child.toRexNode)

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
index 3f46258..8b07aac 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
@@ -18,8 +18,9 @@
 
 package org.apache.flink.table.plan.nodes
 
+import org.apache.calcite.plan.{RelOptCost, RelOptPlanner}
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex.{RexNode, RexProgram}
+import org.apache.calcite.rex._
 import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.TableConfig
@@ -149,4 +150,35 @@ trait CommonCalc {
     val name = calcOpName(calcProgram, expression)
     s"Calc($name)"
   }
+
+  private[flink] def computeSelfCost(
+      calcProgram: RexProgram,
+      planner: RelOptPlanner,
+      rowCnt: Double): RelOptCost = {
+
+    // compute number of expressions that do not access a field or literal, i.e. computations,
+    // conditions, etc. We only want to account for computations, not for simple projections.
+    // CASTs in RexProgram are reduced as far as possible by ReduceExpressionsRule
+    // in normalization stage. So we should ignore CASTs here in optimization stage.
+    val compCnt = calcProgram.getExprList.asScala.toList.count {
+      case i: RexInputRef => false
+      case l: RexLiteral => false
+      case c: RexCall if c.getOperator.getName.equals("CAST") => false
+      case _ => true
+    }
+
+    planner.getCostFactory.makeCost(rowCnt, rowCnt * compCnt, 0)
+  }
+
+  private[flink] def estimateRowCount(
+      calcProgram: RexProgram,
+      rowCnt: Double): Double = {
+
+    if (calcProgram.getCondition != null) {
+      // we reduce the result card to push filters down
+      (rowCnt * 0.75).min(1.0)
+    } else {
+      rowCnt
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
index 9b8e1ea..11f595c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
@@ -21,20 +21,21 @@ package org.apache.flink.table.plan.nodes.dataset
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.calcite.rex.RexNode
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.{BatchTableEnvironment, TableEnvironment}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.schema.TableSourceTable
-import org.apache.flink.table.sources.BatchTableSource
 import org.apache.flink.types.Row
+import org.apache.flink.table.sources.BatchTableSource
 
 /** Flink RelNode to read data from an external source defined by a [[BatchTableSource]]. */
 class BatchTableSourceScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     table: RelOptTable,
-    val tableSource: BatchTableSource[_])
+    val tableSource: BatchTableSource[_],
+    filterCondition: RexNode = null)
   extends BatchScan(cluster, traitSet, table) {
 
   override def deriveRowType() = {
@@ -54,13 +55,20 @@ class BatchTableSourceScan(
       cluster,
       traitSet,
       getTable,
-      tableSource
+      tableSource,
+      filterCondition
     )
   }
 
   override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw)
+    val terms = super.explainTerms(pw)
       .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", "))
+      if (filterCondition != null) {
+        import scala.collection.JavaConverters._
+        val fieldNames = getTable.getRowType.getFieldNames.asScala.toList
+        terms.item("filter", getExpressionString(filterCondition, fieldNames, None))
+      }
+    terms
   }
 
   override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
@@ -68,6 +76,6 @@ class BatchTableSourceScan(
     val config = tableEnv.getConfig
     val inputDataSet = tableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]]
 
-    convertToInternalRow(inputDataSet, new TableSourceTable(tableSource), config)
+    convertToInternalRow(inputDataSet, new TableSourceTable(tableSource, tableEnv), config)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
index 9b3ff63..972e45b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
@@ -31,8 +31,6 @@ import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.plan.nodes.CommonCalc
 import org.apache.flink.types.Row
 
-import scala.collection.JavaConverters._
-
 /**
   * Flink RelNode which matches along with LogicalCalc.
   *
@@ -71,34 +69,17 @@ class DataSetCalc(
   }
 
   override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
-
     val child = this.getInput
     val rowCnt = metadata.getRowCount(child)
 
-    // compute number of expressions that do not access a field or literal, i.e. computations,
-    //   conditions, etc. We only want to account for computations, not for simple projections.
-    // CASTs in RexProgram are reduced as far as possible by ReduceExpressionsRule
-    // in normalization stage. So we should ignore CASTs here in optimization stage.
-    val compCnt = calcProgram.getExprList.asScala.toList.count {
-      case i: RexInputRef => false
-      case l: RexLiteral => false
-      case c: RexCall if c.getOperator.getName.equals("CAST") => false
-      case _ => true
-    }
-
-    planner.getCostFactory.makeCost(rowCnt, rowCnt * compCnt, 0)
+    computeSelfCost(calcProgram, planner, rowCnt)
   }
 
   override def estimateRowCount(metadata: RelMetadataQuery): Double = {
     val child = this.getInput
     val rowCnt = metadata.getRowCount(child)
 
-    if (calcProgram.getCondition != null) {
-      // we reduce the result card to push filters down
-      (rowCnt * 0.75).min(1.0)
-    } else {
-      rowCnt
-    }
+    estimateRowCount(calcProgram, rowCnt)
   }
 
   override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
@@ -127,5 +108,4 @@ class DataSetCalc(
     val mapFunc = calcMapFunction(genFunction)
     inputDS.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index b39ae4a..26778d7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -18,8 +18,9 @@
 
 package org.apache.flink.table.plan.nodes.datastream
 
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.calcite.rex.RexProgram
 import org.apache.flink.api.common.functions.FlatMapFunction
@@ -68,6 +69,18 @@ class DataStreamCalc(
         calcProgram.getCondition != null)
   }
 
+  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+    val child = this.getInput
+    val rowCnt = metadata.getRowCount(child)
+    computeSelfCost(calcProgram, planner, rowCnt)
+  }
+
+  override def estimateRowCount(metadata: RelMetadataQuery): Double = {
+    val child = this.getInput
+    val rowCnt = metadata.getRowCount(child)
+    estimateRowCount(calcProgram, rowCnt)
+  }
+
   override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
 
     val config = tableEnv.getConfig

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 73d0291..b808d8d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -21,19 +21,21 @@ package org.apache.flink.table.plan.nodes.datastream
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter}
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment}
+import org.apache.calcite.rex.RexNode
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.schema.TableSourceTable
-import org.apache.flink.table.sources.StreamTableSource
 import org.apache.flink.types.Row
+import org.apache.flink.table.sources.StreamTableSource
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment}
 
 /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
 class StreamTableSourceScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     table: RelOptTable,
-    val tableSource: StreamTableSource[_])
+    val tableSource: StreamTableSource[_],
+  filterCondition: RexNode = null)
   extends StreamScan(cluster, traitSet, table) {
 
   override def deriveRowType() = {
@@ -53,13 +55,20 @@ class StreamTableSourceScan(
       cluster,
       traitSet,
       getTable,
-      tableSource
+      tableSource,
+      filterCondition
     )
   }
 
   override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw)
+    val terms = super.explainTerms(pw)
       .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", "))
+    if (filterCondition != null) {
+      import scala.collection.JavaConverters._
+      val fieldNames = getTable.getRowType.getFieldNames.asScala.toList
+      terms.item("filter", getExpressionString(filterCondition, fieldNames, None))
+    }
+    terms
   }
 
   override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
@@ -68,7 +77,6 @@ class StreamTableSourceScan(
     val inputDataStream: DataStream[Any] = tableSource
       .getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
 
-    convertToInternalRow(inputDataStream, new TableSourceTable(tableSource), config)
+    convertToInternalRow(inputDataStream, new TableSourceTable(tableSource, tableEnv), config)
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 3b20236..952ee34 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -122,8 +122,10 @@ object FlinkRuleSets {
     DataSetValuesRule.INSTANCE,
     DataSetCorrelateRule.INSTANCE,
     BatchTableSourceScanRule.INSTANCE,
-    // project pushdown optimization
-    PushProjectIntoBatchTableSourceScanRule.INSTANCE
+
+    // scan optimization
+    PushProjectIntoBatchTableSourceScanRule.INSTANCE,
+    PushFilterIntoBatchTableSourceScanRule.INSTANCE
   )
 
   /**
@@ -178,7 +180,10 @@ object FlinkRuleSets {
       DataStreamValuesRule.INSTANCE,
       DataStreamCorrelateRule.INSTANCE,
       StreamTableSourceScanRule.INSTANCE,
-      PushProjectIntoStreamTableSourceScanRule.INSTANCE
+
+      //  scan optimization
+      PushProjectIntoStreamTableSourceScanRule.INSTANCE,
+      PushFilterIntoStreamTableSourceScanRule.INSTANCE
   )
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala
new file mode 100644
index 0000000..f95e34e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.RexProgram
+import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc}
+import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.sources.FilterableTableSource
+
+class PushFilterIntoBatchTableSourceScanRule extends RelOptRule(
+  operand(classOf[DataSetCalc],
+    operand(classOf[BatchTableSourceScan], none)),
+  "PushFilterIntoBatchTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall) = {
+    val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
+    val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
+    scan.tableSource match {
+      case _: FilterableTableSource =>
+        calc.calcProgram.getCondition != null
+      case _ => false
+    }
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
+    val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
+
+    val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource]
+
+    val program: RexProgram = calc.calcProgram
+    val tst = scan.getTable.unwrap(classOf[TableSourceTable[_]])
+    val predicate = extractPredicateExpressions(
+      program,
+      call.builder().getRexBuilder,
+      tst.tableEnv.getFunctionCatalog)
+
+    if (predicate.length != 0) {
+      val remainingPredicate = filterableSource.setPredicate(predicate)
+
+      if (verifyExpressions(predicate, remainingPredicate)) {
+
+        val filterRexNode = getFilterExpressionAsRexNode(
+          program.getInputRowType,
+          scan,
+          predicate.diff(remainingPredicate))(call.builder())
+
+        val newScan = new BatchTableSourceScan(
+          scan.getCluster,
+          scan.getTraitSet,
+          scan.getTable,
+          scan.tableSource,
+          filterRexNode)
+
+        val newCalcProgram = rewriteRexProgram(
+          program,
+          newScan,
+          remainingPredicate)(call.builder())
+
+        val newCalc = new DataSetCalc(
+          calc.getCluster,
+          calc.getTraitSet,
+          newScan,
+          calc.getRowType,
+          newCalcProgram,
+          description)
+
+        call.transformTo(newCalc)
+      }
+    }
+  }
+}
+
+object PushFilterIntoBatchTableSourceScanRule {
+  val INSTANCE: RelOptRule = new PushFilterIntoBatchTableSourceScanRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
index 70639b7..53f5fff 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
@@ -22,7 +22,7 @@ import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.plan.RelOptRule.{none, operand}
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc}
-import org.apache.flink.table.plan.rules.util.RexProgramProjectExtractor._
+import org.apache.flink.table.plan.util.RexProgramProjectExtractor._
 import org.apache.flink.table.sources.{BatchTableSource, ProjectableTableSource}
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala
new file mode 100644
index 0000000..9c02dd7
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan}
+import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.sources.FilterableTableSource
+
+class PushFilterIntoStreamTableSourceScanRule extends RelOptRule(
+  operand(classOf[DataStreamCalc],
+    operand(classOf[StreamTableSourceScan], none)),
+  "PushFilterIntoStreamTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall) = {
+    val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc]
+    val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan]
+    scan.tableSource match {
+      case _: FilterableTableSource =>
+        calc.calcProgram.getCondition != null
+      case _ => false
+    }
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc]
+    val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan]
+
+    val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource]
+
+    val program = calc.calcProgram
+    val tst = scan.getTable.unwrap(classOf[TableSourceTable[_]])
+    val predicates = extractPredicateExpressions(
+      program,
+      call.builder().getRexBuilder,
+      tst.tableEnv.getFunctionCatalog)
+
+    if (predicates.length != 0) {
+      val remainingPredicate = filterableSource.setPredicate(predicates)
+
+      if (verifyExpressions(predicates, remainingPredicate)) {
+
+        val filterRexNode = getFilterExpressionAsRexNode(
+          program.getInputRowType,
+          scan,
+          predicates.diff(remainingPredicate))(call.builder())
+
+        val newScan = new StreamTableSourceScan(
+          scan.getCluster,
+          scan.getTraitSet,
+          scan.getTable,
+          scan.tableSource,
+          filterRexNode)
+
+        val newCalcProgram = rewriteRexProgram(
+          program,
+          newScan,
+          remainingPredicate)(call.builder())
+
+        val newCalc = new DataStreamCalc(
+          calc.getCluster,
+          calc.getTraitSet,
+          newScan,
+          calc.getRowType,
+          newCalcProgram,
+          description)
+
+        call.transformTo(newCalc)
+      }
+    }
+  }
+
+}
+
+object PushFilterIntoStreamTableSourceScanRule {
+  val INSTANCE: RelOptRule = new PushFilterIntoStreamTableSourceScanRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
index a6d4b82..0c20f2a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
@@ -22,7 +22,7 @@ import org.apache.calcite.plan.RelOptRule._
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan}
-import org.apache.flink.table.plan.rules.util.RexProgramProjectExtractor._
+import org.apache.flink.table.plan.util.RexProgramProjectExtractor._
 import org.apache.flink.table.sources.{ProjectableTableSource, StreamTableSource}
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractor.scala
deleted file mode 100644
index 129cfd1..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractor.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.rules.util
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex._
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable
-import scala.collection.JavaConverters._
-
-object RexProgramProjectExtractor {
-
-  /**
-    * Extracts the indexes of input fields accessed by the RexProgram.
-    *
-    * @param rexProgram The RexProgram to analyze
-    * @return The indexes of accessed input fields
-    */
-  def extractRefInputFields(rexProgram: RexProgram): Array[Int] = {
-    val visitor = new RefFieldsVisitor
-    // extract input fields from project expressions
-    rexProgram.getProjectList.foreach(exp => rexProgram.expandLocalRef(exp).accept(visitor))
-    val condition = rexProgram.getCondition
-    // extract input fields from condition expression
-    if (condition != null) {
-      rexProgram.expandLocalRef(condition).accept(visitor)
-    }
-    visitor.getFields
-  }
-
-  /**
-    * Generates a new RexProgram based on mapped input fields.
-    *
-    * @param rexProgram      original RexProgram
-    * @param inputRowType    input row type
-    * @param usedInputFields indexes of used input fields
-    * @param rexBuilder      builder for Rex expressions
-    *
-    * @return A RexProgram with mapped input field expressions.
-    */
-  def rewriteRexProgram(
-      rexProgram: RexProgram,
-      inputRowType: RelDataType,
-      usedInputFields: Array[Int],
-      rexBuilder: RexBuilder): RexProgram = {
-
-    val inputRewriter = new InputRewriter(usedInputFields)
-    val newProjectExpressions = rexProgram.getProjectList.map(
-      exp => rexProgram.expandLocalRef(exp).accept(inputRewriter)
-    ).toList.asJava
-
-    val oldCondition = rexProgram.getCondition
-    val newConditionExpression = {
-      oldCondition match {
-        case ref: RexLocalRef => rexProgram.expandLocalRef(ref).accept(inputRewriter)
-        case _ => null // null does not match any type
-      }
-    }
-    RexProgram.create(
-      inputRowType,
-      newProjectExpressions,
-      newConditionExpression,
-      rexProgram.getOutputRowType,
-      rexBuilder
-    )
-  }
-}
-
-/**
-  * A RexVisitor to extract used input fields
-  */
-class RefFieldsVisitor extends RexVisitorImpl[Unit](true) {
-  private var fields = mutable.LinkedHashSet[Int]()
-
-  def getFields: Array[Int] = fields.toArray
-
-  override def visitInputRef(inputRef: RexInputRef): Unit = fields += inputRef.getIndex
-
-  override def visitCall(call: RexCall): Unit =
-    call.operands.foreach(operand => operand.accept(this))
-}
-
-/**
-  * A RexShuttle to rewrite field accesses of a RexProgram.
-  *
-  * @param fields fields mapping
-  */
-class InputRewriter(fields: Array[Int]) extends RexShuttle {
-
-  /** old input fields ref index -> new input fields ref index mappings */
-  private val fieldMap: Map[Int, Int] =
-    fields.zipWithIndex.toMap
-
-  override def visitInputRef(inputRef: RexInputRef): RexNode =
-    new RexInputRef(relNodeIndex(inputRef), inputRef.getType)
-
-  override def visitLocalRef(localRef: RexLocalRef): RexNode =
-    new RexInputRef(relNodeIndex(localRef), localRef.getType)
-
-  private def relNodeIndex(ref: RexSlot): Int =
-    fieldMap.getOrElse(ref.getIndex,
-      throw new IllegalArgumentException("input field contains invalid index"))
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
index a3851e3..faf5efc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.sources.TableSource
 /** Table which defines an external table via a [[TableSource]] */
 class TableSourceTable[T](
     val tableSource: TableSource[T],
+    val tableEnv: TableEnvironment,
     override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
   extends FlinkTable[T](
     typeInfo = tableSource.getReturnType,

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala
new file mode 100644
index 0000000..337b3de
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rex._
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.calcite.{FlinkTypeFactory, RexNodeWrapper}
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.validate.FunctionCatalog
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.immutable.IndexedSeq
+
+object RexProgramExpressionExtractor {
+
+  /**
+    * converts a rexProgram condition into independent CNF expressions
+    *
+    * @param rexProgram The RexProgram to analyze
+    * @return converted expression
+    */
+  private[flink] def extractPredicateExpressions(
+      rexProgram: RexProgram,
+      rexBuilder: RexBuilder,
+      catalog: FunctionCatalog): Array[Expression] = {
+
+    val fieldNames = getInputsWithNames(rexProgram)
+
+    val condition = rexProgram.getCondition
+    if (condition == null) {
+      return Array.empty
+    }
+    val call = rexProgram.expandLocalRef(condition)
+    val cnf = RexUtil.toCnf(rexBuilder, call)
+    val conjunctions = RelOptUtil.conjunctions(cnf)
+    val expressions = conjunctions.asScala.map(
+      RexNodeWrapper.wrap(_, catalog).toExpression(fieldNames)
+    )
+    expressions.toArray
+  }
+
+  /**
+    * verify should we apply remained expressions on
+    *
+    * @param original initial expression
+    * @param remained remained part of original expression
+    * @return whether or not to decouple parts of the origin expression
+    */
+  private[flink] def verifyExpressions(
+      original: Array[Expression],
+      remained: Array[Expression]): Boolean =
+    remained forall (original contains)
+
+  /**
+    * Generates a new RexProgram based on new expression.
+    *
+    * @param rexProgram original RexProgram
+    * @param scan input source
+    * @param predicate filter condition (fields must be resolved)
+    * @param relBuilder builder for converting expression to Rex
+    */
+  private[flink] def rewriteRexProgram(
+      rexProgram: RexProgram,
+      scan: TableScan,
+      predicate: Array[Expression])(implicit relBuilder: RelBuilder): RexProgram = {
+
+    relBuilder.push(scan)
+
+    val inType = rexProgram.getInputRowType
+    val resolvedExps = resolveFields(predicate, inType)
+    val projs = rexProgram.getProjectList.map(rexProgram.expandLocalRef)
+
+    RexProgram.create(
+      inType,
+      projs,
+      conjunct(resolvedExps).get.toRexNode,
+      rexProgram.getOutputRowType,
+      relBuilder.getRexBuilder)
+  }
+
+  private[flink] def getFilterExpressionAsRexNode(
+      inputTpe: RelDataType,
+      scan: TableScan,
+      exps: Array[Expression])(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.push(scan)
+    val resolvedExps = resolveFields(exps, inputTpe)
+    val fullExp = conjunct(resolvedExps)
+    if (fullExp.isDefined) {
+      fullExp.get.toRexNode
+    } else {
+      null
+    }
+  }
+
+  private def resolveFields(
+      predicate: Array[Expression],
+      inType: RelDataType): Array[Expression] = {
+    val fieldTypes: Map[String, TypeInformation[_]] = inType.getFieldList
+      .map(f => f.getName -> FlinkTypeFactory.toTypeInfo(f.getType))
+      .toMap
+    val rule: PartialFunction[Expression, Expression] = {
+      case u@UnresolvedFieldReference(name) =>
+        ResolvedFieldReference(name, fieldTypes(name))
+    }
+    predicate.map(_.postOrderTransform(rule))
+  }
+
+  private def conjunct(exps: Array[Expression]): Option[Expression] = {
+    def overIndexes(): IndexedSeq[Expression] = {
+      for {
+        i <- exps.indices by 2
+      } yield {
+        if (i + 1 < exps.length) {
+          And(exps(i), exps(i + 1))
+        } else {
+          exps(i)
+        }
+      }
+    }
+    exps.length match {
+      case 0 =>
+        None
+      case 1 =>
+        Option(exps(0))
+      case _ =>
+        conjunct(overIndexes().toArray)
+    }
+  }
+
+  private def getInputsWithNames(rexProgram: RexProgram): Map[RexInputRef, String] = {
+    val names = rexProgram.getInputRowType.getFieldNames
+
+    val buffer = for {
+      exp <- rexProgram.getExprList.asScala
+      if exp.isInstanceOf[RexInputRef]
+      ref = exp.asInstanceOf[RexInputRef]
+    } yield {
+      ref -> names(ref.getIndex)
+    }
+    buffer.toMap
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractor.scala
new file mode 100644
index 0000000..1198167
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractor.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex._
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+object RexProgramProjectExtractor {
+
+  /**
+    * Extracts the indexes of input fields accessed by the RexProgram.
+    *
+    * @param rexProgram The RexProgram to analyze
+    * @return The indexes of accessed input fields
+    */
+  def extractRefInputFields(rexProgram: RexProgram): Array[Int] = {
+    val visitor = new RefFieldsVisitor
+    // extract input fields from project expressions
+    rexProgram.getProjectList.foreach(exp => rexProgram.expandLocalRef(exp).accept(visitor))
+    val condition = rexProgram.getCondition
+    // extract input fields from condition expression
+    if (condition != null) {
+      rexProgram.expandLocalRef(condition).accept(visitor)
+    }
+    visitor.getFields
+  }
+
+  /**
+    * Generates a new RexProgram based on mapped input fields.
+    *
+    * @param rexProgram      original RexProgram
+    * @param inputRowType    input row type
+    * @param usedInputFields indexes of used input fields
+    * @param rexBuilder      builder for Rex expressions
+    *
+    * @return A RexProgram with mapped input field expressions.
+    */
+  def rewriteRexProgram(
+      rexProgram: RexProgram,
+      inputRowType: RelDataType,
+      usedInputFields: Array[Int],
+      rexBuilder: RexBuilder): RexProgram = {
+
+    val inputRewriter = new InputRewriter(usedInputFields)
+    val newProjectExpressions = rexProgram.getProjectList.map(
+      exp => rexProgram.expandLocalRef(exp).accept(inputRewriter)
+    ).toList.asJava
+
+    val oldCondition = rexProgram.getCondition
+    val newConditionExpression = {
+      oldCondition match {
+        case ref: RexLocalRef => rexProgram.expandLocalRef(ref).accept(inputRewriter)
+        case _ => null // null does not match any type
+      }
+    }
+    RexProgram.create(
+      inputRowType,
+      newProjectExpressions,
+      newConditionExpression,
+      rexProgram.getOutputRowType,
+      rexBuilder
+    )
+  }
+}
+
+/**
+  * A RexVisitor to extract used input fields
+  */
+class RefFieldsVisitor extends RexVisitorImpl[Unit](true) {
+  private var fields = mutable.LinkedHashSet[Int]()
+
+  def getFields: Array[Int] = fields.toArray
+
+  override def visitInputRef(inputRef: RexInputRef): Unit = fields += inputRef.getIndex
+
+  override def visitCall(call: RexCall): Unit =
+    call.operands.foreach(operand => operand.accept(this))
+}
+
+/**
+  * A RexShuttle to rewrite field accesses of a RexProgram.
+  *
+  * @param fields fields mapping
+  */
+class InputRewriter(fields: Array[Int]) extends RexShuttle {
+
+  /** old input fields ref index -> new input fields ref index mappings */
+  private val fieldMap: Map[Int, Int] =
+    fields.zipWithIndex.toMap
+
+  override def visitInputRef(inputRef: RexInputRef): RexNode =
+    new RexInputRef(relNodeIndex(inputRef), inputRef.getType)
+
+  override def visitLocalRef(localRef: RexLocalRef): RexNode =
+    new RexInputRef(relNodeIndex(localRef), localRef.getType)
+
+  private def relNodeIndex(ref: RexSlot): Int =
+    fieldMap.getOrElse(ref.getIndex,
+      throw new IllegalArgumentException("input field contains invalid index"))
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala
new file mode 100644
index 0000000..bbbf862
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.table.expressions.Expression
+
+/**
+  * Adds support for filtering push-down to a [[TableSource]].
+  * A [[TableSource]] extending this interface is able to filter the fields of the return table.
+  *
+  */
+trait FilterableTableSource {
+
+  /** return an predicate expression that was set. */
+  def getPredicate: Array[Expression]
+
+  /**
+    * @param predicate a filter expression that will be applied to fields to return.
+    * @return an unsupported predicate expression.
+    */
+  def setPredicate(predicate: Array[Expression]): Array[Expression]
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
index a3eb03d..fe205f1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.table.sources
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.TableEnvironment
 
 /** Defines an external table by providing schema information and used to produce a
   * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]].

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 3c89ec4..2c08d8d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -139,7 +139,21 @@ class FunctionCatalog {
 object FunctionCatalog {
 
   val builtInFunctions: Map[String, Class[_]] = Map(
+
+//    SqlStdOperatorTable.AS,
+//    SqlStdOperatorTable.DIVIDE_INTEGER,
+//    SqlStdOperatorTable.DOT,
+
     // logic
+    "and" -> classOf[And],
+    "or" -> classOf[Or],
+    "not" -> classOf[Not],
+    "equals" -> classOf[EqualTo],
+    "greaterThan" -> classOf[GreaterThan],
+    "greaterThanOrEqual" -> classOf[GreaterThanOrEqual],
+    "lessThan" -> classOf[LessThan],
+    "lessThanOrEqual" -> classOf[LessThanOrEqual],
+    "notEquals" -> classOf[NotEqualTo],
     "isNull" -> classOf[IsNull],
     "isNotNull" -> classOf[IsNotNull],
     "isTrue" -> classOf[IsTrue],
@@ -158,15 +172,23 @@ object FunctionCatalog {
     "charLength" -> classOf[CharLength],
     "initCap" -> classOf[InitCap],
     "like" -> classOf[Like],
+    "concat" -> classOf[Plus],
+    "lower" -> classOf[Lower],
     "lowerCase" -> classOf[Lower],
     "similar" -> classOf[Similar],
     "substring" -> classOf[Substring],
     "trim" -> classOf[Trim],
+    // duplicate functions for calcite
+    "upper" -> classOf[Upper],
     "upperCase" -> classOf[Upper],
     "position" -> classOf[Position],
     "overlay" -> classOf[Overlay],
 
     // math functions
+    "plus" -> classOf[Plus],
+    "minus" -> classOf[Minus],
+    "divide" -> classOf[Div],
+    "times" -> classOf[Mul],
     "abs" -> classOf[Abs],
     "ceil" -> classOf[Ceil],
     "exp" -> classOf[Exp],
@@ -176,6 +198,7 @@ object FunctionCatalog {
     "power" -> classOf[Power],
     "mod" -> classOf[Mod],
     "sqrt" -> classOf[Sqrt],
+    "minusPrefix" -> classOf[UnaryMinus],
 
     // temporal functions
     "extract" -> classOf[Extract],
@@ -186,6 +209,7 @@ object FunctionCatalog {
     "localTimestamp" -> classOf[LocalTimestamp],
     "quarter" -> classOf[Quarter],
     "temporalOverlaps" -> classOf[TemporalOverlaps],
+    "dateTimePlus" -> classOf[Plus],
 
     // array
     "cardinality" -> classOf[ArrayCardinality],

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
index 05c2a49..767e83f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
@@ -18,18 +18,16 @@
 
 package org.apache.flink.table
 
-import org.apache.calcite.tools.RuleSet
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
-import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment, TableException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.TableException
 import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference}
-import org.apache.flink.table.sinks.TableSink
-import org.apache.flink.table.sources.TableSource
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil.{batchTableNode, term, unaryNode, binaryNode, streamTableNode}
+import org.apache.flink.table.utils.{MockTableEnvironment, TableTestBase}
+import org.apache.flink.table.utils.TableTestUtil._
+
 import org.junit.Test
 import org.junit.Assert.assertEquals
 
@@ -350,19 +348,6 @@ class TableEnvironmentTest extends TableTestBase {
 
 }
 
-class MockTableEnvironment extends TableEnvironment(new TableConfig) {
-
-  override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = ???
-
-  override protected def checkValidTableName(name: String): Unit = ???
-
-  override protected def getBuiltInNormRuleSet: RuleSet = ???
-
-  override protected def getBuiltInOptRuleSet: RuleSet = ???
-
-  override def registerTableSource(name: String, tableSource: TableSource[_]) = ???
-}
-
 case class CClass(cf1: Int, cf2: String, cf3: Double)
 
 class PojoClass(var pf1: Int, var pf2: String, var pf3: Double) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
new file mode 100644
index 0000000..058eca7
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table
+
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.sources.{CsvTableSource, TableSource}
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.{Assert, Test}
+import org.apache.flink.table.utils.{CommonTestData, TableTestBase}
+
+class TableSourceTest extends TableTestBase {
+
+  private val projectedFields: Array[String] = Array("last", "id", "score")
+  private val noCalcFields: Array[String] = Array("id", "score", "first")
+
+  // batch plan
+
+  @Test
+  def testBatchProjectableSourceScanPlanTableApi(): Unit = {
+    val (tableSource, tableName) = csvTable
+    val util = batchTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+      .scan(tableName)
+      .select('last.upperCase(), 'id.floor(), 'score * 2)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      projectableSourceBatchTableNode(tableName, projectedFields),
+      term("select", "UPPER(last) AS _c0", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testBatchProjectableSourceScanPlanSQL(): Unit = {
+    val (tableSource, tableName) = csvTable
+    val util = batchTestUtil()
+
+    util.tEnv.registerTableSource(tableName, tableSource)
+
+    val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName"
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      projectableSourceBatchTableNode(tableName, projectedFields),
+      term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testBatchProjectableSourceScanNoIdentityCalc(): Unit = {
+    val (tableSource, tableName) = csvTable
+    val util = batchTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+      .scan(tableName)
+      .select('id, 'score, 'first)
+
+    val expected = projectableSourceBatchTableNode(tableName, noCalcFields)
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testBatchFilterableSourceScanPlanTableApi(): Unit = {
+    val (tableSource, tableName) = filterableTableSource
+    val util = batchTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+      .scan(tableName)
+      .select('price, 'id, 'amount)
+      .where("amount > 2 && price * 2 < 32")
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      filterableSourceBatchTableNode(
+        tableName,
+        Array("name", "id", "amount", "price"),
+        ">(amount, 2)"),
+      term("select", "price", "id", "amount"),
+      term("where", "<(*(price, 2), 32)")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  // stream plan
+
+  @Test
+  def testStreamProjectableSourceScanPlanTableApi(): Unit = {
+    val (tableSource, tableName) = csvTable
+    val util = streamTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+      .scan(tableName)
+      .select('last, 'id.floor(), 'score * 2)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      projectableSourceStreamTableNode(tableName, projectedFields),
+      term("select", "last", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testStreamProjectableSourceScanPlanSQL(): Unit = {
+    val (tableSource, tableName) = csvTable
+    val util = streamTestUtil()
+
+    util.tEnv.registerTableSource(tableName, tableSource)
+
+    val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName"
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      projectableSourceStreamTableNode(tableName, projectedFields),
+      term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testStreamProjectableSourceScanNoIdentityCalc(): Unit = {
+    val (tableSource, tableName) = csvTable
+    val util = streamTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+      .scan(tableName)
+      .select('id, 'score, 'first)
+
+    val expected = projectableSourceStreamTableNode(tableName, noCalcFields)
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testStreamFilterableSourceScanPlanTableApi(): Unit = {
+    val (tableSource, tableName) = filterableTableSource
+    val util = streamTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+      .scan(tableName)
+      .select('price, 'id, 'amount)
+      .where("amount > 2 && price * 2 < 32")
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      filterableSourceStreamTableNode(
+        tableName,
+        Array("name", "id", "amount", "price"),
+        ">(amount, 2)"),
+      term("select", "price", "id", "amount"),
+      term("where", "<(*(price, 2), 32)")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  // csv builder
+
+  @Test
+  def testCsvTableSourceBuilder(): Unit = {
+    val source1 = CsvTableSource.builder()
+      .path("/path/to/csv")
+      .field("myfield", Types.STRING)
+      .field("myfield2", Types.INT)
+      .quoteCharacter(';')
+      .fieldDelimiter("#")
+      .lineDelimiter("\r\n")
+      .commentPrefix("%%")
+      .ignoreFirstLine()
+      .ignoreParseErrors()
+      .build()
+
+    val source2 = new CsvTableSource(
+      "/path/to/csv",
+      Array("myfield", "myfield2"),
+      Array(Types.STRING, Types.INT),
+      "#",
+      "\r\n",
+      ';',
+      true,
+      "%%",
+      true)
+
+    Assert.assertEquals(source1, source2)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testCsvTableSourceBuilderWithNullPath(): Unit = {
+    CsvTableSource.builder()
+      .field("myfield", Types.STRING)
+      // should fail, path is not defined
+      .build()
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testCsvTableSourceBuilderWithDuplicateFieldName(): Unit = {
+    CsvTableSource.builder()
+      .path("/path/to/csv")
+      .field("myfield", Types.STRING)
+      // should fail, field name must no be duplicate
+      .field("myfield", Types.INT)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testCsvTableSourceBuilderWithEmptyField(): Unit = {
+    CsvTableSource.builder()
+      .path("/path/to/csv")
+      // should fail, field can be empty
+      .build()
+  }
+
+  // utils
+
+  def filterableTableSource:(TableSource[_], String) = {
+    val tableSource = CommonTestData.getFilterableTableSource
+    (tableSource, "filterableTable")
+  }
+
+  def csvTable: (CsvTableSource, String) = {
+    val csvTable = CommonTestData.getCsvTableSource
+    val tableName = "csvTable"
+    (csvTable, tableName)
+  }
+
+  def projectableSourceBatchTableNode(
+      sourceName: String,
+      fields: Array[String]): String = {
+
+    "BatchTableSourceScan(" +
+      s"table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
+  }
+
+  def projectableSourceStreamTableNode(
+      sourceName: String,
+      fields: Array[String]): String = {
+    
+    "StreamTableSourceScan(" +
+      s"table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
+  }
+
+  def filterableSourceBatchTableNode(
+    sourceName: String,
+    fields: Array[String],
+    exp: String): String = {
+
+    "BatchTableSourceScan(" +
+      s"table=[[$sourceName]], fields=[${fields.mkString(", ")}], filter=[$exp])"
+  }
+
+  def filterableSourceStreamTableNode(
+    sourceName: String,
+    fields: Array[String],
+    exp: String): String = {
+
+    "StreamTableSourceScan(" +
+      s"table=[[$sourceName]], fields=[${fields.mkString(", ")}], filter=[$exp])"
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
index 70f4345..ca7cd8a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
@@ -102,4 +102,20 @@ class TableSourceITCase(
     TestBaseUtils.compareResultAsText(result.asJava, expected)
   }
 
+  @Test
+  def testTableSourceWithFilterable(): Unit = {
+    val tableName = "MyTable"
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+    tableEnv.registerTableSource(tableName, CommonTestData.getFilterableTableSource)
+    val results = tableEnv
+      .scan(tableName)
+      .where("amount > 4 && price < 9")
+      .select("id, name")
+      .collect()
+
+    val expected = Seq(
+      "5,Record_5", "6,Record_6", "7,Record_7", "8,Record_8").mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala
deleted file mode 100644
index 670e268..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch
-
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.sources.CsvTableSource
-import org.apache.flink.table.utils.{CommonTestData, TableTestBase}
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.{Assert, Test}
-
-class TableSourceTest extends TableTestBase {
-
-  private val projectedFields: Array[String] = Array("last", "id", "score")
-  private val noCalcFields: Array[String] = Array("id", "score", "first")
-
-  @Test
-  def testBatchProjectableSourceScanPlanTableApi(): Unit = {
-    val (csvTable, tableName) = tableSource
-    val util = batchTestUtil()
-    val tEnv = util.tEnv
-
-    tEnv.registerTableSource(tableName, csvTable)
-
-    val result = tEnv
-      .scan(tableName)
-      .select('last.upperCase(), 'id.floor(), 'score * 2)
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      sourceBatchTableNode(tableName, projectedFields),
-      term("select", "UPPER(last) AS _c0", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testBatchProjectableSourceScanPlanSQL(): Unit = {
-    val (csvTable, tableName) = tableSource
-    val util = batchTestUtil()
-
-    util.tEnv.registerTableSource(tableName, csvTable)
-
-    val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      sourceBatchTableNode(tableName, projectedFields),
-      term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testBatchProjectableSourceScanNoIdentityCalc(): Unit = {
-    val (csvTable, tableName) = tableSource
-    val util = batchTestUtil()
-    val tEnv = util.tEnv
-
-    tEnv.registerTableSource(tableName, csvTable)
-
-    val result = tEnv
-      .scan(tableName)
-      .select('id, 'score, 'first)
-
-    val expected = sourceBatchTableNode(tableName, noCalcFields)
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testStreamProjectableSourceScanPlanTableApi(): Unit = {
-    val (csvTable, tableName) = tableSource
-    val util = streamTestUtil()
-    val tEnv = util.tEnv
-
-    tEnv.registerTableSource(tableName, csvTable)
-
-    val result = tEnv
-      .scan(tableName)
-      .select('last, 'id.floor(), 'score * 2)
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      sourceStreamTableNode(tableName, projectedFields),
-      term("select", "last", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testStreamProjectableSourceScanPlanSQL(): Unit = {
-    val (csvTable, tableName) = tableSource
-    val util = streamTestUtil()
-
-    util.tEnv.registerTableSource(tableName, csvTable)
-
-    val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      sourceStreamTableNode(tableName, projectedFields),
-      term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testStreamProjectableSourceScanNoIdentityCalc(): Unit = {
-    val (csvTable, tableName) = tableSource
-    val util = streamTestUtil()
-    val tEnv = util.tEnv
-
-    tEnv.registerTableSource(tableName, csvTable)
-
-    val result = tEnv
-      .scan(tableName)
-      .select('id, 'score, 'first)
-
-    val expected = sourceStreamTableNode(tableName, noCalcFields)
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testCsvTableSourceBuilder(): Unit = {
-    val source1 = CsvTableSource.builder()
-      .path("/path/to/csv")
-      .field("myfield", Types.STRING)
-      .field("myfield2", Types.INT)
-      .quoteCharacter(';')
-      .fieldDelimiter("#")
-      .lineDelimiter("\r\n")
-      .commentPrefix("%%")
-      .ignoreFirstLine()
-      .ignoreParseErrors()
-      .build()
-
-    val source2 = new CsvTableSource(
-      "/path/to/csv",
-      Array("myfield", "myfield2"),
-      Array(Types.STRING, Types.INT),
-      "#",
-      "\r\n",
-      ';',
-      true,
-      "%%",
-      true)
-
-    Assert.assertEquals(source1, source2)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testCsvTableSourceBuilderWithNullPath(): Unit = {
-    CsvTableSource.builder()
-      .field("myfield", Types.STRING)
-      // should fail, path is not defined
-      .build()
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testCsvTableSourceBuilderWithDuplicateFieldName(): Unit = {
-    CsvTableSource.builder()
-      .path("/path/to/csv")
-      .field("myfield", Types.STRING)
-      // should fail, field name must no be duplicate
-      .field("myfield", Types.INT)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testCsvTableSourceBuilderWithEmptyField(): Unit = {
-    CsvTableSource.builder()
-      .path("/path/to/csv")
-      // should fail, field can be empty
-      .build()
-  }
-
-  def tableSource: (CsvTableSource, String) = {
-    val csvTable = CommonTestData.getCsvTableSource
-    val tableName = "csvTable"
-    (csvTable, tableName)
-  }
-
-  def sourceBatchTableNode(sourceName: String, fields: Array[String]): String = {
-    s"BatchTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
-  }
-
-  def sourceStreamTableNode(sourceName: String, fields: Array[String] ): String = {
-    s"StreamTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
index 06d94aa..973c2f3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
@@ -83,4 +83,23 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
       "Williams,4.68")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testCsvTableSourceWithFilterable(): Unit = {
+    StreamITCase.testResults = mutable.MutableList()
+    val tableName = "MyTable"
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    tEnv.registerTableSource(tableName, CommonTestData.getFilterableTableSource)
+    tEnv.scan(tableName)
+      .where("amount > 4 && price < 9")
+      .select("id, name")
+      .addSink(new StreamITCase.StringSink)
+
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "5,Record_5", "6,Record_6", "7,Record_7", "8,Record_8")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6cd2e7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractorTest.scala
deleted file mode 100644
index fe19c8e..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractorTest.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.rules.util
-
-import java.math.BigDecimal
-
-import org.apache.calcite.adapter.java.JavaTypeFactory
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
-import org.apache.calcite.rex.{RexBuilder, RexProgram, RexProgramBuilder}
-import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, DOUBLE, INTEGER, VARCHAR}
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.flink.table.plan.rules.util.RexProgramProjectExtractor._
-import org.junit.Assert.{assertArrayEquals, assertTrue}
-import org.junit.{Before, Test}
-
-import scala.collection.JavaConverters._
-
-/**
-  * This class is responsible for testing RexProgramProjectExtractor.
-  */
-class RexProgramProjectExtractorTest {
-  private var typeFactory: JavaTypeFactory = _
-  private var rexBuilder: RexBuilder = _
-  private var allFieldTypes: Seq[RelDataType] = _
-  private val allFieldNames = List("name", "id", "amount", "price")
-
-  @Before
-  def setUp(): Unit = {
-    typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT)
-    rexBuilder = new RexBuilder(typeFactory)
-    allFieldTypes = List(VARCHAR, BIGINT, INTEGER, DOUBLE).map(typeFactory.createSqlType(_))
-  }
-
-  @Test
-  def testExtractRefInputFields(): Unit = {
-    val usedFields = extractRefInputFields(buildRexProgram())
-    assertArrayEquals(usedFields, Array(2, 3, 1))
-  }
-
-  @Test
-  def testRewriteRexProgram(): Unit = {
-    val originRexProgram = buildRexProgram()
-    assertTrue(extractExprStrList(originRexProgram).sameElements(Array(
-      "$0",
-      "$1",
-      "$2",
-      "$3",
-      "*($t2, $t3)",
-      "100",
-      "<($t4, $t5)",
-      "6",
-      ">($t1, $t7)",
-      "AND($t6, $t8)")))
-    // use amount, id, price fields to create a new RexProgram
-    val usedFields = Array(2, 3, 1)
-    val types = usedFields.map(allFieldTypes(_)).toList.asJava
-    val names = usedFields.map(allFieldNames(_)).toList.asJava
-    val inputRowType = typeFactory.createStructType(types, names)
-    val newRexProgram = rewriteRexProgram(originRexProgram, inputRowType, usedFields, rexBuilder)
-    assertTrue(extractExprStrList(newRexProgram).sameElements(Array(
-      "$0",
-      "$1",
-      "$2",
-      "*($t0, $t1)",
-      "100",
-      "<($t3, $t4)",
-      "6",
-      ">($t2, $t6)",
-      "AND($t5, $t7)")))
-  }
-
-  private def buildRexProgram(): RexProgram = {
-    val types = allFieldTypes.asJava
-    val names = allFieldNames.asJava
-    val inputRowType = typeFactory.createStructType(types, names)
-    val builder = new RexProgramBuilder(inputRowType, rexBuilder)
-    val t0 = rexBuilder.makeInputRef(types.get(2), 2)
-    val t1 = rexBuilder.makeInputRef(types.get(1), 1)
-    val t2 = rexBuilder.makeInputRef(types.get(3), 3)
-    val t3 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.MULTIPLY, t0, t2))
-    val t4 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L))
-    val t5 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(6L))
-    // project: amount, amount * price
-    builder.addProject(t0, "amount")
-    builder.addProject(t3, "total")
-    // condition: amount * price < 100 and id > 6
-    val t6 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, t3, t4))
-    val t7 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN, t1, t5))
-    val t8 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.AND, List(t6, t7).asJava))
-    builder.addCondition(t8)
-    builder.getProgram
-  }
-
-  /**
-    * extract all expression string list from input RexProgram expression lists
-    *
-    * @param rexProgram input RexProgram instance to analyze
-    * @return all expression string list of input RexProgram expression lists
-    */
-  private def extractExprStrList(rexProgram: RexProgram) = {
-    rexProgram.getExprList.asScala.map(_.toString)
-  }
-
-}


[4/4] flink git commit: [FLINK-3849] [table] Add FilterableTableSource interface and rules for pushing it (2)

Posted by ku...@apache.org.
[FLINK-3849] [table] Add FilterableTableSource interface and rules for pushing it (2)

This closes #3520.

fix compilation failure

fix compilation failure again.

1. Deep copy TableSource when we copy TableSourceScan
2. unify push project into scan rule for both batch and stream

address comments.

expand project list before creating new RexProgram

update tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/78f22aae
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/78f22aae
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/78f22aae

Branch: refs/heads/master
Commit: 78f22aaec9bd7d39fdec8477335e5c9247d42030
Parents: 9f6cd2e
Author: Kurt Young <yk...@gmail.com>
Authored: Mon Mar 13 15:30:13 2017 +0800
Committer: Kurt Young <ku...@apache.org>
Committed: Fri Mar 17 18:01:50 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/KafkaTableSource.java      |   5 +
 .../flink/addons/hbase/HBaseTableSource.java    |   7 +-
 .../flink/table/api/BatchTableEnvironment.scala |   2 +-
 .../table/api/StreamTableEnvironment.scala      |   2 +-
 .../flink/table/api/TableEnvironment.scala      |  12 -
 .../flink/table/calcite/RexNodeWrapper.scala    | 106 ------
 .../flink/table/plan/nodes/CommonCalc.scala     |   3 +-
 .../table/plan/nodes/TableSourceScan.scala      |  63 ++++
 .../table/plan/nodes/dataset/BatchScan.scala    |  21 +-
 .../nodes/dataset/BatchTableSourceScan.scala    |  47 +--
 .../table/plan/nodes/dataset/DataSetCalc.scala  |  23 +-
 .../table/plan/nodes/dataset/DataSetScan.scala  |  14 +-
 .../plan/nodes/datastream/DataStreamCalc.scala  |  24 +-
 .../plan/nodes/datastream/DataStreamScan.scala  |   8 +-
 .../plan/nodes/datastream/StreamScan.scala      |  10 +-
 .../datastream/StreamTableSourceScan.scala      |  52 +--
 .../PushFilterIntoTableSourceScanRuleBase.scala | 104 ++++++
 ...PushProjectIntoTableSourceScanRuleBase.scala |  57 +++
 ...PushFilterIntoBatchTableSourceScanRule.scala |  58 +---
 ...ushProjectIntoBatchTableSourceScanRule.scala |  48 +--
 ...ushFilterIntoStreamTableSourceScanRule.scala |  58 +---
 ...shProjectIntoStreamTableSourceScanRule.scala |  40 +--
 .../table/plan/schema/TableSourceTable.scala    |   1 -
 .../util/RexProgramExpressionExtractor.scala    | 163 ---------
 .../table/plan/util/RexProgramExtractor.scala   | 183 ++++++++++
 .../plan/util/RexProgramProjectExtractor.scala  | 120 -------
 .../table/plan/util/RexProgramRewriter.scala    |  91 +++++
 .../table/sources/FilterableTableSource.scala   |  38 +-
 .../table/sources/ProjectableTableSource.scala  |   9 +-
 .../flink/table/sources/TableSource.scala       |   2 +
 .../flink/table/validate/FunctionCatalog.scala  |   5 +-
 .../apache/flink/table/TableSourceTest.scala    | 170 ++++++---
 .../api/scala/batch/TableSourceITCase.scala     |   4 +-
 .../api/scala/stream/TableSourceITCase.scala    |   4 +-
 .../expressions/utils/ExpressionTestBase.scala  |   4 +-
 .../RexProgramExpressionExtractorTest.scala     | 182 ----------
 .../plan/util/RexProgramExtractorTest.scala     | 346 +++++++++++++++++++
 .../util/RexProgramProjectExtractorTest.scala   | 121 -------
 .../plan/util/RexProgramRewriterTest.scala      |  62 ++++
 .../table/plan/util/RexProgramTestBase.scala    |  80 +++++
 .../flink/table/utils/CommonTestData.scala      | 122 +------
 .../table/utils/MockTableEnvironment.scala      |  39 +++
 .../table/utils/TestFilterableTableSource.scala | 134 +++++++
 43 files changed, 1452 insertions(+), 1192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
index dd32bdd..506358d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -137,4 +137,9 @@ public abstract class KafkaTableSource implements StreamTableSource<Row> {
 	protected DeserializationSchema<Row> getDeserializationSchema() {
 		return deserializationSchema;
 	}
+
+	@Override
+	public String explainSource() {
+		return "";
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
index a1be23f..f709212 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
@@ -108,7 +108,7 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable
 	}
 
 	@Override
-	public ProjectableTableSource<Row> projectFields(int[] fields) {
+	public HBaseTableSource projectFields(int[] fields) {
 		String[] famNames = schema.getFamilyNames();
 		HBaseTableSource newTableSource = new HBaseTableSource(this.conf, tableName);
 		// Extract the family from the given fields
@@ -122,4 +122,9 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable
 		}
 		return newTableSource;
 	}
+
+	@Override
+	public String explainSource() {
+		return "";
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 7f27357..b48e9f9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -95,7 +95,7 @@ abstract class BatchTableEnvironment(
 
     tableSource match {
       case batchTableSource: BatchTableSource[_] =>
-        registerTableInternal(name, new TableSourceTable(batchTableSource, this))
+        registerTableInternal(name, new TableSourceTable(batchTableSource))
       case _ =>
         throw new TableException("Only BatchTableSource can be registered in " +
             "BatchTableEnvironment")

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 7e9f38f..d927c3a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -136,7 +136,7 @@ abstract class StreamTableEnvironment(
 
     tableSource match {
       case streamTableSource: StreamTableSource[_] =>
-        registerTableInternal(name, new TableSourceTable(streamTableSource, this))
+        registerTableInternal(name, new TableSourceTable(streamTableSource))
       case _ =>
         throw new TableException("Only StreamTableSource can be registered in " +
             "StreamTableEnvironment")

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 291f49f..1dda3a8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -842,18 +842,6 @@ object TableEnvironment {
   }
 
   /**
-    * Returns field types for a given [[TableSource]].
-    *
-    * @param tableSource The TableSource to extract field types from.
-    * @tparam A The type of the TableSource.
-    * @return An array holding the field types.
-    */
-  def getFieldTypes[A](tableSource: TableSource[A]): Array[TypeInformation[_]] = {
-    val returnType = tableSource.getReturnType
-    TableEnvironment.getFieldTypes(returnType)
-  }
-
-  /**
     * Returns field names for a given [[TableSource]].
     *
     * @param tableSource The TableSource to extract field names from.

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RexNodeWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RexNodeWrapper.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RexNodeWrapper.scala
deleted file mode 100644
index 1926a67..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RexNodeWrapper.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.calcite
-
-import org.apache.calcite.rex._
-import org.apache.calcite.sql._
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference}
-import org.apache.flink.table.validate.FunctionCatalog
-import org.apache.flink.table.calcite.RexNodeWrapper._
-
-abstract class RexNodeWrapper(rex: RexNode) {
-  def get: RexNode = rex
-  def toExpression(names: Map[RexInputRef, String]): Expression
-}
-
-case class RexLiteralWrapper(literal: RexLiteral) extends RexNodeWrapper(literal) {
-  override def toExpression(names: Map[RexInputRef, String]): Expression = {
-    val typeInfo = FlinkTypeFactory.toTypeInfo(literal.getType)
-    Literal(literal.getValue, typeInfo)
-  }
-}
-
-case class RexInputWrapper(input: RexInputRef) extends RexNodeWrapper(input) {
-  override def toExpression(names: Map[RexInputRef, String]): Expression = {
-    val typeInfo = FlinkTypeFactory.toTypeInfo(input.getType)
-    ResolvedFieldReference(names(input), typeInfo)
-  }
-}
-
-case class RexCallWrapper(
-    call: RexCall,
-    operands: Seq[RexNodeWrapper]) extends RexNodeWrapper(call) {
-
-  override def toExpression(names: Map[RexInputRef, String]): Expression = {
-    val ops = operands.map(_.toExpression(names))
-    call.op match {
-      case function: SqlFunction =>
-        lookupFunction(replace(function.getName), ops)
-      case postfix: SqlPostfixOperator =>
-        lookupFunction(replace(postfix.getName), ops)
-      case operator@_ =>
-        val name = replace(s"${operator.kind}")
-        lookupFunction(name, ops)
-    }
-  }
-
-  def replace(str: String): String = {
-    str.replaceAll("\\s|_", "")
-  }
-}
-
-object RexNodeWrapper {
-
-  private var catalog: Option[FunctionCatalog] = None
-
-  def wrap(rex: RexNode, functionCatalog: FunctionCatalog): RexNodeWrapper = {
-    catalog = Option(functionCatalog)
-    rex.accept(new WrapperVisitor)
-  }
-
-  private[table] def lookupFunction(name: String, operands: Seq[Expression]): Expression = {
-    catalog.getOrElse(throw TableException("FunctionCatalog was not defined"))
-      .lookupFunction(name, operands)
-  }
-}
-
-class WrapperVisitor extends RexVisitorImpl[RexNodeWrapper](true) {
-
-  override def visitInputRef(inputRef: RexInputRef): RexNodeWrapper = {
-    RexInputWrapper(inputRef)
-  }
-
-  override def visitLiteral(literal: RexLiteral): RexNodeWrapper = {
-    RexLiteralWrapper(literal)
-  }
-
-  override def visitLocalRef(localRef: RexLocalRef): RexNodeWrapper = {
-    localRef.accept(this)
-  }
-
-  override def visitCall(call: RexCall): RexNodeWrapper = {
-    val operands = for {
-      x <- 0 until call.operands.size()
-    } yield {
-      call.operands.get(x).accept(this)
-    }
-    RexCallWrapper(call, operands)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
index 8b07aac..bc25140 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
@@ -167,7 +167,8 @@ trait CommonCalc {
       case _ => true
     }
 
-    planner.getCostFactory.makeCost(rowCnt, rowCnt * compCnt, 0)
+    val newRowCnt = estimateRowCount(calcProgram, rowCnt)
+    planner.getCostFactory.makeCost(newRowCnt, newRowCnt * compCnt, 0)
   }
 
   private[flink] def estimateRowCount(

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/TableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/TableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/TableSourceScan.scala
new file mode 100644
index 0000000..e0f7786
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/TableSourceScan.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptTable, RelTraitSet}
+import org.apache.calcite.rel.RelWriter
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.TableScan
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.sources.TableSource
+
+import scala.collection.JavaConverters._
+
+abstract class TableSourceScan(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    table: RelOptTable,
+    val tableSource: TableSource[_])
+  extends TableScan(cluster, traitSet, table) {
+
+  override def deriveRowType(): RelDataType = {
+    val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+    flinkTypeFactory.buildRowDataType(
+      TableEnvironment.getFieldNames(tableSource),
+      TableEnvironment.getFieldTypes(tableSource.getReturnType))
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    val terms = super.explainTerms(pw)
+        .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", "))
+
+    val sourceDesc = tableSource.explainSource()
+    if (sourceDesc.nonEmpty) {
+      terms.item("source", sourceDesc)
+    } else {
+      terms
+    }
+  }
+
+  override def toString: String = {
+    s"Source(from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
+  }
+
+  def copy(traitSet: RelTraitSet, tableSource: TableSource[_]): TableSourceScan
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
index 09262a6..b39b8ed 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
@@ -18,9 +18,6 @@
 
 package org.apache.flink.table.plan.nodes.dataset
 
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.core.TableScan
-import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.FlinkTypeFactory
@@ -31,23 +28,7 @@ import org.apache.flink.types.Row
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
-abstract class BatchScan(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    table: RelOptTable)
-  extends TableScan(cluster, traitSet, table)
-  with CommonScan
-  with DataSetRel {
-
-  override def toString: String = {
-    s"Source(from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
-  }
-
-  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
-
-    val rowCnt = metadata.getRowCount(this)
-    planner.getCostFactory.makeCost(rowCnt, rowCnt, 0)
-  }
+trait BatchScan extends CommonScan with DataSetRel {
 
   protected def convertToInternalRow(
       input: DataSet[Any],

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
index 11f595c..a9784e2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
@@ -19,33 +19,25 @@
 package org.apache.flink.table.plan.nodes.dataset
 
 import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{RelNode, RelWriter}
-import org.apache.calcite.rex.RexNode
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.table.api.{BatchTableEnvironment, TableEnvironment}
-import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.plan.nodes.TableSourceScan
 import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.sources.{BatchTableSource, TableSource}
 import org.apache.flink.types.Row
-import org.apache.flink.table.sources.BatchTableSource
 
 /** Flink RelNode to read data from an external source defined by a [[BatchTableSource]]. */
 class BatchTableSourceScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     table: RelOptTable,
-    val tableSource: BatchTableSource[_],
-    filterCondition: RexNode = null)
-  extends BatchScan(cluster, traitSet, table) {
+    tableSource: BatchTableSource[_])
+  extends TableSourceScan(cluster, traitSet, table, tableSource)
+  with BatchScan {
 
-  override def deriveRowType() = {
-    val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-    flinkTypeFactory.buildRowDataType(
-      TableEnvironment.getFieldNames(tableSource),
-      TableEnvironment.getFieldTypes(tableSource.getReturnType))
-  }
-
-  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+  override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
     val rowCnt = metadata.getRowCount(this)
     planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType))
   }
@@ -55,27 +47,22 @@ class BatchTableSourceScan(
       cluster,
       traitSet,
       getTable,
-      tableSource,
-      filterCondition
+      tableSource
     )
   }
 
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    val terms = super.explainTerms(pw)
-      .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", "))
-      if (filterCondition != null) {
-        import scala.collection.JavaConverters._
-        val fieldNames = getTable.getRowType.getFieldNames.asScala.toList
-        terms.item("filter", getExpressionString(filterCondition, fieldNames, None))
-      }
-    terms
+  override def copy(traitSet: RelTraitSet, newTableSource: TableSource[_]): TableSourceScan = {
+    new BatchTableSourceScan(
+      cluster,
+      traitSet,
+      getTable,
+      newTableSource.asInstanceOf[BatchTableSource[_]]
+    )
   }
 
   override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
-
     val config = tableEnv.getConfig
     val inputDataSet = tableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]]
-
-    convertToInternalRow(inputDataSet, new TableSourceTable(tableSource, tableEnv), config)
+    convertToInternalRow(inputDataSet, new TableSourceTable(tableSource), config)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
index 972e45b..e05b5a8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
@@ -20,8 +20,9 @@ package org.apache.flink.table.plan.nodes.dataset
 
 import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Calc
 import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rex._
 import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.api.java.DataSet
@@ -40,35 +41,29 @@ class DataSetCalc(
     traitSet: RelTraitSet,
     input: RelNode,
     rowRelDataType: RelDataType,
-    private[flink] val calcProgram: RexProgram, // for tests
+    calcProgram: RexProgram,
     ruleDescription: String)
-  extends SingleRel(cluster, traitSet, input)
+  extends Calc(cluster, traitSet, input, calcProgram)
   with CommonCalc
   with DataSetRel {
 
-  override def deriveRowType() = rowRelDataType
+  override def deriveRowType(): RelDataType = rowRelDataType
 
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
-    new DataSetCalc(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      getRowType,
-      calcProgram,
-      ruleDescription)
+  override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = {
+    new DataSetCalc(cluster, traitSet, child, getRowType, program, ruleDescription)
   }
 
   override def toString: String = calcToString(calcProgram, getExpressionString)
 
   override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw)
+    pw.input("input", getInput)
       .item("select", selectionToString(calcProgram, getExpressionString))
       .itemIf("where",
         conditionToString(calcProgram, getExpressionString),
         calcProgram.getCondition != null)
   }
 
-  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+  override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
     val child = this.getInput
     val rowCnt = metadata.getRowCount(child)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
index 44d2d00..b1cf106 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
@@ -21,6 +21,8 @@ package org.apache.flink.table.plan.nodes.dataset
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.BatchTableEnvironment
 import org.apache.flink.table.plan.schema.DataSetTable
@@ -36,11 +38,17 @@ class DataSetScan(
     traitSet: RelTraitSet,
     table: RelOptTable,
     rowRelDataType: RelDataType)
-  extends BatchScan(cluster, traitSet, table) {
+  extends TableScan(cluster, traitSet, table)
+  with BatchScan {
 
   val dataSetTable: DataSetTable[Any] = getTable.unwrap(classOf[DataSetTable[Any]])
 
-  override def deriveRowType() = rowRelDataType
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+    val rowCnt = metadata.getRowCount(this)
+    planner.getCostFactory.makeCost(rowCnt, rowCnt, 0)
+  }
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataSetScan(
@@ -52,10 +60,8 @@ class DataSetScan(
   }
 
   override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
-
     val config = tableEnv.getConfig
     val inputDataSet: DataSet[Any] = dataSetTable.dataSet
-
     convertToInternalRow(inputDataSet, dataSetTable, config)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index 26778d7..b015a1d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -20,8 +20,9 @@ package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Calc
 import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rex.RexProgram
 import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.streaming.api.datastream.DataStream
@@ -40,36 +41,29 @@ class DataStreamCalc(
     traitSet: RelTraitSet,
     input: RelNode,
     rowRelDataType: RelDataType,
-    private[flink] val calcProgram: RexProgram,
+    calcProgram: RexProgram,
     ruleDescription: String)
-  extends SingleRel(cluster, traitSet, input)
+  extends Calc(cluster, traitSet, input, calcProgram)
   with CommonCalc
   with DataStreamRel {
 
-  override def deriveRowType() = rowRelDataType
+  override def deriveRowType(): RelDataType = rowRelDataType
 
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
-    new DataStreamCalc(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      getRowType,
-      calcProgram,
-      ruleDescription
-    )
+  override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = {
+    new DataStreamCalc(cluster, traitSet, child, getRowType, program, ruleDescription)
   }
 
   override def toString: String = calcToString(calcProgram, getExpressionString)
 
   override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw)
+    pw.input("input", getInput)
       .item("select", selectionToString(calcProgram, getExpressionString))
       .itemIf("where",
         conditionToString(calcProgram, getExpressionString),
         calcProgram.getCondition != null)
   }
 
-  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+  override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
     val child = this.getInput
     val rowCnt = metadata.getRowCount(child)
     computeSelfCost(calcProgram, planner, rowCnt)

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
index e8d218e..c187ae8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.plan.nodes.datastream
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.TableScan
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
 import org.apache.flink.table.plan.schema.DataStreamTable
@@ -36,11 +37,12 @@ class DataStreamScan(
     traitSet: RelTraitSet,
     table: RelOptTable,
     rowRelDataType: RelDataType)
-  extends StreamScan(cluster, traitSet, table) {
+  extends TableScan(cluster, traitSet, table)
+  with StreamScan {
 
   val dataStreamTable: DataStreamTable[Any] = getTable.unwrap(classOf[DataStreamTable[Any]])
 
-  override def deriveRowType() = rowRelDataType
+  override def deriveRowType(): RelDataType = rowRelDataType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataStreamScan(
@@ -52,10 +54,8 @@ class DataStreamScan(
   }
 
   override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
-
     val config = tableEnv.getConfig
     val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
-
     convertToInternalRow(inputDataStream, dataStreamTable, config)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
index 56f7f27..6d08302 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.plan.nodes.datastream
 
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.core.TableScan
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.FlinkTypeFactory
@@ -30,13 +28,7 @@ import org.apache.flink.types.Row
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
-abstract class StreamScan(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    table: RelOptTable)
-  extends TableScan(cluster, traitSet, table)
-  with CommonScan
-  with DataStreamRel {
+trait StreamScan extends CommonScan with DataStreamRel {
 
   protected def convertToInternalRow(
       input: DataStream[Any],

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index b808d8d..013c55f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -19,33 +19,25 @@
 package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{RelNode, RelWriter}
-import org.apache.calcite.rex.RexNode
-import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.plan.nodes.TableSourceScan
 import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.sources.{StreamTableSource, TableSource}
 import org.apache.flink.types.Row
-import org.apache.flink.table.sources.StreamTableSource
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment}
 
 /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
 class StreamTableSourceScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     table: RelOptTable,
-    val tableSource: StreamTableSource[_],
-  filterCondition: RexNode = null)
-  extends StreamScan(cluster, traitSet, table) {
-
-  override def deriveRowType() = {
-    val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-    flinkTypeFactory.buildRowDataType(
-      TableEnvironment.getFieldNames(tableSource),
-      TableEnvironment.getFieldTypes(tableSource.getReturnType))
-  }
+    tableSource: StreamTableSource[_])
+  extends TableSourceScan(cluster, traitSet, table, tableSource)
+  with StreamScan {
 
-  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+  override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
     val rowCnt = metadata.getRowCount(this)
     planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType))
   }
@@ -55,28 +47,22 @@ class StreamTableSourceScan(
       cluster,
       traitSet,
       getTable,
-      tableSource,
-      filterCondition
+      tableSource
     )
   }
 
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    val terms = super.explainTerms(pw)
-      .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", "))
-    if (filterCondition != null) {
-      import scala.collection.JavaConverters._
-      val fieldNames = getTable.getRowType.getFieldNames.asScala.toList
-      terms.item("filter", getExpressionString(filterCondition, fieldNames, None))
-    }
-    terms
+  override def copy(traitSet: RelTraitSet, newTableSource: TableSource[_]): TableSourceScan = {
+    new StreamTableSourceScan(
+      cluster,
+      traitSet,
+      getTable,
+      newTableSource.asInstanceOf[StreamTableSource[_]]
+    )
   }
 
   override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
-
     val config = tableEnv.getConfig
-    val inputDataStream: DataStream[Any] = tableSource
-      .getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
-
-    convertToInternalRow(inputDataStream, new TableSourceTable(tableSource, tableEnv), config)
+    val inputDataStream = tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
+    convertToInternalRow(inputDataStream, new TableSourceTable(tableSource), config)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala
new file mode 100644
index 0000000..b07f78e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.common
+
+import java.util
+
+import org.apache.calcite.plan.RelOptRuleCall
+import org.apache.calcite.rel.core.Calc
+import org.apache.calcite.rex.RexProgram
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.plan.nodes.TableSourceScan
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.plan.util.RexProgramExtractor
+import org.apache.flink.table.sources.FilterableTableSource
+import org.apache.flink.table.validate.FunctionCatalog
+import org.apache.flink.util.Preconditions
+
+import scala.collection.JavaConverters._
+
+trait PushFilterIntoTableSourceScanRuleBase {
+
+  private[flink] def pushFilterIntoScan(
+      call: RelOptRuleCall,
+      calc: Calc,
+      scan: TableSourceScan,
+      tableSourceTable: TableSourceTable[_],
+      filterableSource: FilterableTableSource[_],
+      description: String): Unit = {
+
+    Preconditions.checkArgument(!filterableSource.isFilterPushedDown)
+
+    val program = calc.getProgram
+    val functionCatalog = FunctionCatalog.withBuiltIns
+    val (predicates, unconvertedRexNodes) =
+      RexProgramExtractor.extractConjunctiveConditions(
+        program,
+        call.builder().getRexBuilder,
+        functionCatalog)
+    if (predicates.isEmpty) {
+      // no condition can be translated to expression
+      return
+    }
+
+    val remainingPredicates = new util.LinkedList[Expression]()
+    predicates.foreach(e => remainingPredicates.add(e))
+
+    val newTableSource = filterableSource.applyPredicate(remainingPredicates)
+
+    // check whether framework still need to do a filter
+    val relBuilder = call.builder()
+    val remainingCondition = {
+      if (!remainingPredicates.isEmpty || unconvertedRexNodes.nonEmpty) {
+        relBuilder.push(scan)
+        val remainingConditions =
+          (remainingPredicates.asScala.map(expr => expr.toRexNode(relBuilder))
+              ++ unconvertedRexNodes)
+        remainingConditions.reduce((l, r) => relBuilder.and(l, r))
+      } else {
+        null
+      }
+    }
+
+    // check whether we still need a RexProgram. An RexProgram is needed when either
+    // projection or filter exists.
+    val newScan = scan.copy(scan.getTraitSet, newTableSource)
+    val newRexProgram = {
+      if (remainingCondition != null || !program.projectsOnlyIdentity) {
+        val expandedProjectList = program.getProjectList.asScala
+            .map(ref => program.expandLocalRef(ref)).asJava
+        RexProgram.create(
+          program.getInputRowType,
+          expandedProjectList,
+          remainingCondition,
+          program.getOutputRowType,
+          relBuilder.getRexBuilder)
+      } else {
+        null
+      }
+    }
+
+    if (newRexProgram != null) {
+      val newCalc = calc.copy(calc.getTraitSet, newScan, newRexProgram)
+      call.transformTo(newCalc)
+    } else {
+      call.transformTo(newScan)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushProjectIntoTableSourceScanRuleBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushProjectIntoTableSourceScanRuleBase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushProjectIntoTableSourceScanRuleBase.scala
new file mode 100644
index 0000000..9f9c805
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushProjectIntoTableSourceScanRuleBase.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.common
+
+import org.apache.calcite.plan.RelOptRuleCall
+import org.apache.calcite.rel.core.Calc
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.plan.nodes.TableSourceScan
+import org.apache.flink.table.plan.util.{RexProgramExtractor, RexProgramRewriter}
+import org.apache.flink.table.sources.ProjectableTableSource
+
+trait PushProjectIntoTableSourceScanRuleBase {
+
+  private[flink] def pushProjectIntoScan(
+      call: RelOptRuleCall,
+      calc: Calc,
+      scan: TableSourceScan): Unit = {
+
+    val usedFields = RexProgramExtractor.extractRefInputFields(calc.getProgram)
+
+    // if no fields can be projected, we keep the original plan.
+    if (TableEnvironment.getFieldNames(scan.tableSource).length != usedFields.length) {
+      val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]]
+      val newTableSource = originTableSource.projectFields(usedFields)
+      val newScan = scan.copy(scan.getTraitSet, newTableSource)
+      val newCalcProgram = RexProgramRewriter.rewriteWithFieldProjection(
+        calc.getProgram,
+        newScan.getRowType,
+        calc.getCluster.getRexBuilder,
+        usedFields)
+
+      if (newCalcProgram.isTrivial) {
+        // drop calc if the transformed program merely returns its input and doesn't exist filter
+        call.transformTo(newScan)
+      } else {
+        val newCalc = calc.copy(calc.getTraitSet, newScan, newCalcProgram)
+        call.transformTo(newCalc)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala
index f95e34e..8cfd748 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala
@@ -20,23 +20,23 @@ package org.apache.flink.table.plan.rules.dataSet
 
 import org.apache.calcite.plan.RelOptRule._
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.rex.RexProgram
 import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc}
-import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._
+import org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase
 import org.apache.flink.table.plan.schema.TableSourceTable
 import org.apache.flink.table.sources.FilterableTableSource
 
 class PushFilterIntoBatchTableSourceScanRule extends RelOptRule(
   operand(classOf[DataSetCalc],
     operand(classOf[BatchTableSourceScan], none)),
-  "PushFilterIntoBatchTableSourceScanRule") {
+  "PushFilterIntoBatchTableSourceScanRule")
+  with PushFilterIntoTableSourceScanRuleBase {
 
-  override def matches(call: RelOptRuleCall) = {
+  override def matches(call: RelOptRuleCall): Boolean = {
     val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
     val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
     scan.tableSource match {
-      case _: FilterableTableSource =>
-        calc.calcProgram.getCondition != null
+      case source: FilterableTableSource[_] =>
+        calc.getProgram.getCondition != null && !source.isFilterPushedDown
       case _ => false
     }
   }
@@ -44,49 +44,9 @@ class PushFilterIntoBatchTableSourceScanRule extends RelOptRule(
   override def onMatch(call: RelOptRuleCall): Unit = {
     val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
     val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
-
-    val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource]
-
-    val program: RexProgram = calc.calcProgram
-    val tst = scan.getTable.unwrap(classOf[TableSourceTable[_]])
-    val predicate = extractPredicateExpressions(
-      program,
-      call.builder().getRexBuilder,
-      tst.tableEnv.getFunctionCatalog)
-
-    if (predicate.length != 0) {
-      val remainingPredicate = filterableSource.setPredicate(predicate)
-
-      if (verifyExpressions(predicate, remainingPredicate)) {
-
-        val filterRexNode = getFilterExpressionAsRexNode(
-          program.getInputRowType,
-          scan,
-          predicate.diff(remainingPredicate))(call.builder())
-
-        val newScan = new BatchTableSourceScan(
-          scan.getCluster,
-          scan.getTraitSet,
-          scan.getTable,
-          scan.tableSource,
-          filterRexNode)
-
-        val newCalcProgram = rewriteRexProgram(
-          program,
-          newScan,
-          remainingPredicate)(call.builder())
-
-        val newCalc = new DataSetCalc(
-          calc.getCluster,
-          calc.getTraitSet,
-          newScan,
-          calc.getRowType,
-          newCalcProgram,
-          description)
-
-        call.transformTo(newCalc)
-      }
-    }
+    val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable[_]])
+    val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource[_]]
+    pushFilterIntoScan(call, calc, scan, tableSourceTable, filterableSource, description)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
index 53f5fff..8c83047 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
@@ -18,22 +18,22 @@
 
 package org.apache.flink.table.plan.rules.dataSet
 
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.plan.RelOptRule.{none, operand}
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc}
-import org.apache.flink.table.plan.util.RexProgramProjectExtractor._
-import org.apache.flink.table.sources.{BatchTableSource, ProjectableTableSource}
+import org.apache.flink.table.plan.rules.common.PushProjectIntoTableSourceScanRuleBase
+import org.apache.flink.table.sources.ProjectableTableSource
 
 /**
   * This rule tries to push projections into a BatchTableSourceScan.
   */
 class PushProjectIntoBatchTableSourceScanRule extends RelOptRule(
   operand(classOf[DataSetCalc],
-          operand(classOf[BatchTableSourceScan], none)),
-  "PushProjectIntoBatchTableSourceScanRule") {
+    operand(classOf[BatchTableSourceScan], none)),
+  "PushProjectIntoBatchTableSourceScanRule")
+  with PushProjectIntoTableSourceScanRuleBase {
 
-  override def matches(call: RelOptRuleCall) = {
+  override def matches(call: RelOptRuleCall): Boolean = {
     val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
     scan.tableSource match {
       case _: ProjectableTableSource[_] => true
@@ -44,39 +44,7 @@ class PushProjectIntoBatchTableSourceScanRule extends RelOptRule(
   override def onMatch(call: RelOptRuleCall) {
     val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
     val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
-
-    val usedFields: Array[Int] = extractRefInputFields(calc.calcProgram)
-
-    // if no fields can be projected, we keep the original plan.
-    if (TableEnvironment.getFieldNames(scan.tableSource).length != usedFields.length) {
-      val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]]
-      val newTableSource = originTableSource.projectFields(usedFields)
-      val newScan = new BatchTableSourceScan(
-        scan.getCluster,
-        scan.getTraitSet,
-        scan.getTable,
-        newTableSource.asInstanceOf[BatchTableSource[_]])
-
-      val newCalcProgram = rewriteRexProgram(
-        calc.calcProgram,
-        newScan.getRowType,
-        usedFields,
-        calc.getCluster.getRexBuilder)
-
-      if (newCalcProgram.isTrivial) {
-        // drop calc if the transformed program merely returns its input and doesn't exist filter
-        call.transformTo(newScan)
-      } else {
-        val newCalc = new DataSetCalc(
-          calc.getCluster,
-          calc.getTraitSet,
-          newScan,
-          calc.getRowType,
-          newCalcProgram,
-          description)
-        call.transformTo(newCalc)
-      }
-    }
+    pushProjectIntoScan(call, calc, scan)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala
index 9c02dd7..53a3bcd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala
@@ -21,21 +21,22 @@ package org.apache.flink.table.plan.rules.datastream
 import org.apache.calcite.plan.RelOptRule._
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan}
-import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._
+import org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase
 import org.apache.flink.table.plan.schema.TableSourceTable
 import org.apache.flink.table.sources.FilterableTableSource
 
 class PushFilterIntoStreamTableSourceScanRule extends RelOptRule(
   operand(classOf[DataStreamCalc],
     operand(classOf[StreamTableSourceScan], none)),
-  "PushFilterIntoStreamTableSourceScanRule") {
+  "PushFilterIntoStreamTableSourceScanRule")
+  with PushFilterIntoTableSourceScanRuleBase {
 
-  override def matches(call: RelOptRuleCall) = {
+  override def matches(call: RelOptRuleCall): Boolean = {
     val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc]
     val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan]
     scan.tableSource match {
-      case _: FilterableTableSource =>
-        calc.calcProgram.getCondition != null
+      case source: FilterableTableSource[_] =>
+        calc.getProgram.getCondition != null && !source.isFilterPushedDown
       case _ => false
     }
   }
@@ -43,51 +44,10 @@ class PushFilterIntoStreamTableSourceScanRule extends RelOptRule(
   override def onMatch(call: RelOptRuleCall): Unit = {
     val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc]
     val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan]
-
-    val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource]
-
-    val program = calc.calcProgram
-    val tst = scan.getTable.unwrap(classOf[TableSourceTable[_]])
-    val predicates = extractPredicateExpressions(
-      program,
-      call.builder().getRexBuilder,
-      tst.tableEnv.getFunctionCatalog)
-
-    if (predicates.length != 0) {
-      val remainingPredicate = filterableSource.setPredicate(predicates)
-
-      if (verifyExpressions(predicates, remainingPredicate)) {
-
-        val filterRexNode = getFilterExpressionAsRexNode(
-          program.getInputRowType,
-          scan,
-          predicates.diff(remainingPredicate))(call.builder())
-
-        val newScan = new StreamTableSourceScan(
-          scan.getCluster,
-          scan.getTraitSet,
-          scan.getTable,
-          scan.tableSource,
-          filterRexNode)
-
-        val newCalcProgram = rewriteRexProgram(
-          program,
-          newScan,
-          remainingPredicate)(call.builder())
-
-        val newCalc = new DataStreamCalc(
-          calc.getCluster,
-          calc.getTraitSet,
-          newScan,
-          calc.getRowType,
-          newCalcProgram,
-          description)
-
-        call.transformTo(newCalc)
-      }
-    }
+    val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable[_]])
+    val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource[_]]
+    pushFilterIntoScan(call, calc, scan, tableSourceTable, filterableSource, description)
   }
-
 }
 
 object PushFilterIntoStreamTableSourceScanRule {

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
index 0c20f2a..903162e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
@@ -20,9 +20,8 @@ package org.apache.flink.table.plan.rules.datastream
 
 import org.apache.calcite.plan.RelOptRule._
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan}
-import org.apache.flink.table.plan.util.RexProgramProjectExtractor._
+import org.apache.flink.table.plan.rules.common.PushProjectIntoTableSourceScanRuleBase
 import org.apache.flink.table.sources.{ProjectableTableSource, StreamTableSource}
 
 /**
@@ -31,7 +30,8 @@ import org.apache.flink.table.sources.{ProjectableTableSource, StreamTableSource
 class PushProjectIntoStreamTableSourceScanRule extends RelOptRule(
   operand(classOf[DataStreamCalc],
     operand(classOf[StreamTableSourceScan], none())),
-  "PushProjectIntoStreamTableSourceScanRule") {
+  "PushProjectIntoStreamTableSourceScanRule")
+  with PushProjectIntoTableSourceScanRuleBase {
 
   /** Rule must only match if [[StreamTableSource]] targets a [[ProjectableTableSource]] */
   override def matches(call: RelOptRuleCall): Boolean = {
@@ -45,39 +45,7 @@ class PushProjectIntoStreamTableSourceScanRule extends RelOptRule(
   override def onMatch(call: RelOptRuleCall): Unit = {
     val calc = call.rel(0).asInstanceOf[DataStreamCalc]
     val scan = call.rel(1).asInstanceOf[StreamTableSourceScan]
-
-    val usedFields = extractRefInputFields(calc.calcProgram)
-
-    // if no fields can be projected, we keep the original plan
-    if (TableEnvironment.getFieldNames(scan.tableSource).length != usedFields.length) {
-      val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]]
-      val newTableSource = originTableSource.projectFields(usedFields)
-      val newScan = new StreamTableSourceScan(
-        scan.getCluster,
-        scan.getTraitSet,
-        scan.getTable,
-        newTableSource.asInstanceOf[StreamTableSource[_]])
-
-      val newProgram = rewriteRexProgram(
-        calc.calcProgram,
-        newScan.getRowType,
-        usedFields,
-        calc.getCluster.getRexBuilder)
-
-      if (newProgram.isTrivial) {
-        // drop calc if the transformed program merely returns its input and doesn't exist filter
-        call.transformTo(newScan)
-      } else {
-        val newCalc = new DataStreamCalc(
-          calc.getCluster,
-          calc.getTraitSet,
-          newScan,
-          calc.getRowType,
-          newProgram,
-          description)
-        call.transformTo(newCalc)
-      }
-    }
+    pushProjectIntoScan(call, calc, scan)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
index faf5efc..a3851e3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
@@ -25,7 +25,6 @@ import org.apache.flink.table.sources.TableSource
 /** Table which defines an external table via a [[TableSource]] */
 class TableSourceTable[T](
     val tableSource: TableSource[T],
-    val tableEnv: TableEnvironment,
     override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
   extends FlinkTable[T](
     typeInfo = tableSource.getReturnType,

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala
deleted file mode 100644
index 337b3de..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.util
-
-import org.apache.calcite.rel.core.TableScan
-import org.apache.calcite.rex._
-import org.apache.calcite.plan.RelOptUtil
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.calcite.{FlinkTypeFactory, RexNodeWrapper}
-import org.apache.flink.table.expressions._
-import org.apache.flink.table.validate.FunctionCatalog
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-import scala.collection.immutable.IndexedSeq
-
-object RexProgramExpressionExtractor {
-
-  /**
-    * converts a rexProgram condition into independent CNF expressions
-    *
-    * @param rexProgram The RexProgram to analyze
-    * @return converted expression
-    */
-  private[flink] def extractPredicateExpressions(
-      rexProgram: RexProgram,
-      rexBuilder: RexBuilder,
-      catalog: FunctionCatalog): Array[Expression] = {
-
-    val fieldNames = getInputsWithNames(rexProgram)
-
-    val condition = rexProgram.getCondition
-    if (condition == null) {
-      return Array.empty
-    }
-    val call = rexProgram.expandLocalRef(condition)
-    val cnf = RexUtil.toCnf(rexBuilder, call)
-    val conjunctions = RelOptUtil.conjunctions(cnf)
-    val expressions = conjunctions.asScala.map(
-      RexNodeWrapper.wrap(_, catalog).toExpression(fieldNames)
-    )
-    expressions.toArray
-  }
-
-  /**
-    * verify should we apply remained expressions on
-    *
-    * @param original initial expression
-    * @param remained remained part of original expression
-    * @return whether or not to decouple parts of the origin expression
-    */
-  private[flink] def verifyExpressions(
-      original: Array[Expression],
-      remained: Array[Expression]): Boolean =
-    remained forall (original contains)
-
-  /**
-    * Generates a new RexProgram based on new expression.
-    *
-    * @param rexProgram original RexProgram
-    * @param scan input source
-    * @param predicate filter condition (fields must be resolved)
-    * @param relBuilder builder for converting expression to Rex
-    */
-  private[flink] def rewriteRexProgram(
-      rexProgram: RexProgram,
-      scan: TableScan,
-      predicate: Array[Expression])(implicit relBuilder: RelBuilder): RexProgram = {
-
-    relBuilder.push(scan)
-
-    val inType = rexProgram.getInputRowType
-    val resolvedExps = resolveFields(predicate, inType)
-    val projs = rexProgram.getProjectList.map(rexProgram.expandLocalRef)
-
-    RexProgram.create(
-      inType,
-      projs,
-      conjunct(resolvedExps).get.toRexNode,
-      rexProgram.getOutputRowType,
-      relBuilder.getRexBuilder)
-  }
-
-  private[flink] def getFilterExpressionAsRexNode(
-      inputTpe: RelDataType,
-      scan: TableScan,
-      exps: Array[Expression])(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.push(scan)
-    val resolvedExps = resolveFields(exps, inputTpe)
-    val fullExp = conjunct(resolvedExps)
-    if (fullExp.isDefined) {
-      fullExp.get.toRexNode
-    } else {
-      null
-    }
-  }
-
-  private def resolveFields(
-      predicate: Array[Expression],
-      inType: RelDataType): Array[Expression] = {
-    val fieldTypes: Map[String, TypeInformation[_]] = inType.getFieldList
-      .map(f => f.getName -> FlinkTypeFactory.toTypeInfo(f.getType))
-      .toMap
-    val rule: PartialFunction[Expression, Expression] = {
-      case u@UnresolvedFieldReference(name) =>
-        ResolvedFieldReference(name, fieldTypes(name))
-    }
-    predicate.map(_.postOrderTransform(rule))
-  }
-
-  private def conjunct(exps: Array[Expression]): Option[Expression] = {
-    def overIndexes(): IndexedSeq[Expression] = {
-      for {
-        i <- exps.indices by 2
-      } yield {
-        if (i + 1 < exps.length) {
-          And(exps(i), exps(i + 1))
-        } else {
-          exps(i)
-        }
-      }
-    }
-    exps.length match {
-      case 0 =>
-        None
-      case 1 =>
-        Option(exps(0))
-      case _ =>
-        conjunct(overIndexes().toArray)
-    }
-  }
-
-  private def getInputsWithNames(rexProgram: RexProgram): Map[RexInputRef, String] = {
-    val names = rexProgram.getInputRowType.getFieldNames
-
-    val buffer = for {
-      exp <- rexProgram.getExprList.asScala
-      if exp.isInstanceOf[RexInputRef]
-      ref = exp.asInstanceOf[RexInputRef]
-    } yield {
-      ref -> names(ref.getIndex)
-    }
-    buffer.toMap
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
new file mode 100644
index 0000000..433a35b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference}
+import org.apache.flink.table.validate.FunctionCatalog
+import org.apache.flink.util.Preconditions
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+object RexProgramExtractor {
+
+  /**
+    * Extracts the indices of input fields which accessed by the RexProgram.
+    *
+    * @param rexProgram The RexProgram to analyze
+    * @return The indices of accessed input fields
+    */
+  def extractRefInputFields(rexProgram: RexProgram): Array[Int] = {
+    val visitor = new InputRefVisitor
+
+    // extract referenced input fields from projections
+    rexProgram.getProjectList.foreach(
+      exp => rexProgram.expandLocalRef(exp).accept(visitor))
+
+    // extract referenced input fields from condition
+    val condition = rexProgram.getCondition
+    if (condition != null) {
+      rexProgram.expandLocalRef(condition).accept(visitor)
+    }
+
+    visitor.getFields
+  }
+
+  /**
+    * Extract condition from RexProgram and convert it into independent CNF expressions.
+    *
+    * @param rexProgram The RexProgram to analyze
+    * @return converted expressions as well as RexNodes which cannot be translated
+    */
+  def extractConjunctiveConditions(
+      rexProgram: RexProgram,
+      rexBuilder: RexBuilder,
+      catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = {
+
+    rexProgram.getCondition match {
+      case condition: RexLocalRef =>
+        val expanded = rexProgram.expandLocalRef(condition)
+        // converts the expanded expression to conjunctive normal form,
+        // like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR c)"
+        val cnf = RexUtil.toCnf(rexBuilder, expanded)
+        // converts the cnf condition to a list of AND conditions
+        val conjunctions = RelOptUtil.conjunctions(cnf)
+
+        val convertedExpressions = new mutable.ArrayBuffer[Expression]
+        val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode]
+        val inputNames = rexProgram.getInputRowType.getFieldNames.asScala.toArray
+        val converter = new RexNodeToExpressionConverter(inputNames, catalog)
+
+        conjunctions.asScala.foreach(rex => {
+          rex.accept(converter) match {
+            case Some(expression) => convertedExpressions += expression
+            case None => unconvertedRexNodes += rex
+          }
+        })
+        (convertedExpressions.toArray, unconvertedRexNodes.toArray)
+
+      case _ => (Array.empty, Array.empty)
+    }
+  }
+}
+
+/**
+  * An RexVisitor to extract all referenced input fields
+  */
+class InputRefVisitor extends RexVisitorImpl[Unit](true) {
+
+  private val fields = mutable.LinkedHashSet[Int]()
+
+  def getFields: Array[Int] = fields.toArray
+
+  override def visitInputRef(inputRef: RexInputRef): Unit =
+    fields += inputRef.getIndex
+
+  override def visitCall(call: RexCall): Unit =
+    call.operands.foreach(operand => operand.accept(this))
+}
+
+/**
+  * An RexVisitor to convert RexNode to Expression.
+  *
+  * @param inputNames      The input names of the relation node
+  * @param functionCatalog The function catalog
+  */
+class RexNodeToExpressionConverter(
+    inputNames: Array[String],
+    functionCatalog: FunctionCatalog)
+    extends RexVisitor[Option[Expression]] {
+
+  override def visitInputRef(inputRef: RexInputRef): Option[Expression] = {
+    Preconditions.checkArgument(inputRef.getIndex < inputNames.length)
+    Some(ResolvedFieldReference(
+      inputNames(inputRef.getIndex),
+      FlinkTypeFactory.toTypeInfo(inputRef.getType)
+    ))
+  }
+
+  override def visitLocalRef(localRef: RexLocalRef): Option[Expression] = {
+    throw new TableException("Bug: RexLocalRef should have been expanded")
+  }
+
+  override def visitLiteral(literal: RexLiteral): Option[Expression] = {
+    Some(Literal(literal.getValue, FlinkTypeFactory.toTypeInfo(literal.getType)))
+  }
+
+  override def visitCall(call: RexCall): Option[Expression] = {
+    val operands = call.getOperands.map(
+      operand => operand.accept(this).orNull
+    )
+
+    // return null if we cannot translate all the operands of the call
+    if (operands.contains(null)) {
+      None
+    } else {
+        call.getOperator match {
+          case function: SqlFunction =>
+            lookupFunction(replace(function.getName), operands)
+          case postfix: SqlPostfixOperator =>
+            lookupFunction(replace(postfix.getName), operands)
+          case operator@_ =>
+            lookupFunction(replace(s"${operator.getKind}"), operands)
+      }
+    }
+  }
+
+  override def visitFieldAccess(fieldAccess: RexFieldAccess): Option[Expression] = None
+
+  override def visitCorrelVariable(correlVariable: RexCorrelVariable): Option[Expression] = None
+
+  override def visitRangeRef(rangeRef: RexRangeRef): Option[Expression] = None
+
+  override def visitSubQuery(subQuery: RexSubQuery): Option[Expression] = None
+
+  override def visitDynamicParam(dynamicParam: RexDynamicParam): Option[Expression] = None
+
+  override def visitOver(over: RexOver): Option[Expression] = None
+
+  private def lookupFunction(name: String, operands: Seq[Expression]): Option[Expression] = {
+    Try(functionCatalog.lookupFunction(name, operands)) match {
+      case Success(expr) => Some(expr)
+      case Failure(_) => None
+    }
+  }
+
+  private def replace(str: String): String = {
+    str.replaceAll("\\s|_", "")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractor.scala
deleted file mode 100644
index 1198167..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractor.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.util
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex._
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable
-import scala.collection.JavaConverters._
-
-object RexProgramProjectExtractor {
-
-  /**
-    * Extracts the indexes of input fields accessed by the RexProgram.
-    *
-    * @param rexProgram The RexProgram to analyze
-    * @return The indexes of accessed input fields
-    */
-  def extractRefInputFields(rexProgram: RexProgram): Array[Int] = {
-    val visitor = new RefFieldsVisitor
-    // extract input fields from project expressions
-    rexProgram.getProjectList.foreach(exp => rexProgram.expandLocalRef(exp).accept(visitor))
-    val condition = rexProgram.getCondition
-    // extract input fields from condition expression
-    if (condition != null) {
-      rexProgram.expandLocalRef(condition).accept(visitor)
-    }
-    visitor.getFields
-  }
-
-  /**
-    * Generates a new RexProgram based on mapped input fields.
-    *
-    * @param rexProgram      original RexProgram
-    * @param inputRowType    input row type
-    * @param usedInputFields indexes of used input fields
-    * @param rexBuilder      builder for Rex expressions
-    *
-    * @return A RexProgram with mapped input field expressions.
-    */
-  def rewriteRexProgram(
-      rexProgram: RexProgram,
-      inputRowType: RelDataType,
-      usedInputFields: Array[Int],
-      rexBuilder: RexBuilder): RexProgram = {
-
-    val inputRewriter = new InputRewriter(usedInputFields)
-    val newProjectExpressions = rexProgram.getProjectList.map(
-      exp => rexProgram.expandLocalRef(exp).accept(inputRewriter)
-    ).toList.asJava
-
-    val oldCondition = rexProgram.getCondition
-    val newConditionExpression = {
-      oldCondition match {
-        case ref: RexLocalRef => rexProgram.expandLocalRef(ref).accept(inputRewriter)
-        case _ => null // null does not match any type
-      }
-    }
-    RexProgram.create(
-      inputRowType,
-      newProjectExpressions,
-      newConditionExpression,
-      rexProgram.getOutputRowType,
-      rexBuilder
-    )
-  }
-}
-
-/**
-  * A RexVisitor to extract used input fields
-  */
-class RefFieldsVisitor extends RexVisitorImpl[Unit](true) {
-  private var fields = mutable.LinkedHashSet[Int]()
-
-  def getFields: Array[Int] = fields.toArray
-
-  override def visitInputRef(inputRef: RexInputRef): Unit = fields += inputRef.getIndex
-
-  override def visitCall(call: RexCall): Unit =
-    call.operands.foreach(operand => operand.accept(this))
-}
-
-/**
-  * A RexShuttle to rewrite field accesses of a RexProgram.
-  *
-  * @param fields fields mapping
-  */
-class InputRewriter(fields: Array[Int]) extends RexShuttle {
-
-  /** old input fields ref index -> new input fields ref index mappings */
-  private val fieldMap: Map[Int, Int] =
-    fields.zipWithIndex.toMap
-
-  override def visitInputRef(inputRef: RexInputRef): RexNode =
-    new RexInputRef(relNodeIndex(inputRef), inputRef.getType)
-
-  override def visitLocalRef(localRef: RexLocalRef): RexNode =
-    new RexInputRef(relNodeIndex(localRef), localRef.getType)
-
-  private def relNodeIndex(ref: RexSlot): Int =
-    fieldMap.getOrElse(ref.getIndex,
-      throw new IllegalArgumentException("input field contains invalid index"))
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramRewriter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramRewriter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramRewriter.scala
new file mode 100644
index 0000000..c8bbf2d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramRewriter.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex._
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+object RexProgramRewriter {
+
+  /**
+    * Generates a new RexProgram with used input fields. The used fields maybe
+    * a subset of total input fields, so we need to convert the field index in
+    * new RexProgram based on given fields.
+    *
+    * @param rexProgram   original RexProgram
+    * @param inputRowType input row type
+    * @param rexBuilder   builder for Rex expressions
+    * @param usedFields   indices of used input fields
+    * @return A new RexProgram with only used input fields
+    */
+  def rewriteWithFieldProjection(
+      rexProgram: RexProgram,
+      inputRowType: RelDataType,
+      rexBuilder: RexBuilder,
+      usedFields: Array[Int]): RexProgram = {
+
+    val inputRewriter = new InputRewriter(usedFields)
+
+    // rewrite input field in projections
+    val newProjectExpressions = rexProgram.getProjectList.map(
+      exp => rexProgram.expandLocalRef(exp).accept(inputRewriter)
+    ).toList.asJava
+
+    // rewrite input field in condition
+    val newConditionExpression = {
+      rexProgram.getCondition match {
+        case ref: RexLocalRef => rexProgram.expandLocalRef(ref).accept(inputRewriter)
+        case _ => null // null does not match any type
+      }
+    }
+
+    RexProgram.create(
+      inputRowType,
+      newProjectExpressions,
+      newConditionExpression,
+      rexProgram.getOutputRowType,
+      rexBuilder
+    )
+  }
+}
+
+/**
+  * A RexShuttle to rewrite field accesses of a RexProgram.
+  *
+  * @param fields used input fields
+  */
+class InputRewriter(fields: Array[Int]) extends RexShuttle {
+
+  /** old input fields ref index -> new input fields ref index mappings */
+  private val fieldMap: Map[Int, Int] =
+    fields.zipWithIndex.toMap
+
+  override def visitInputRef(inputRef: RexInputRef): RexNode =
+    new RexInputRef(refNewIndex(inputRef), inputRef.getType)
+
+  override def visitLocalRef(localRef: RexLocalRef): RexNode =
+    new RexInputRef(refNewIndex(localRef), localRef.getType)
+
+  private def refNewIndex(ref: RexSlot): Int =
+    fieldMap.getOrElse(ref.getIndex,
+      throw new IllegalArgumentException("input field contains invalid index"))
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala
index bbbf862..67529a7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala
@@ -18,21 +18,41 @@
 
 package org.apache.flink.table.sources
 
-import org.apache.flink.table.expressions.Expression
+import java.util.{List => JList}
 
+import org.apache.flink.table.expressions.Expression
 /**
   * Adds support for filtering push-down to a [[TableSource]].
-  * A [[TableSource]] extending this interface is able to filter the fields of the return table.
-  *
+  * A [[TableSource]] extending this interface is able to filter records before returning.
   */
-trait FilterableTableSource {
+trait FilterableTableSource[T] {
 
-  /** return an predicate expression that was set. */
-  def getPredicate: Array[Expression]
+  /**
+    * Check and pick all predicates this table source can support. The passed in predicates
+    * have been translated in conjunctive form, and table source can only pick those predicates
+    * that it supports.
+    * <p>
+    * After trying to push predicates down, we should return a new [[TableSource]]
+    * instance which holds all pushed down predicates. Even if we actually pushed nothing down,
+    * it is recommended that we still return a new [[TableSource]] instance since we will
+    * mark the returned instance as filter push down has been tried.
+    * <p>
+    * We also should note to not changing the form of the predicates passed in. It has been
+    * organized in CNF conjunctive form, and we should only take or leave each element from the
+    * list. Don't try to reorganize the predicates if you are absolutely confident with that.
+    *
+    * @param predicates A list contains conjunctive predicates, you should pick and remove all
+    *                   expressions that can be pushed down. The remaining elements of this list
+    *                   will further evaluated by framework.
+    * @return A new cloned instance of [[TableSource]] with or without any filters been
+    *         pushed into it.
+    */
+  def applyPredicate(predicates: JList[Expression]): TableSource[T]
 
   /**
-    * @param predicate a filter expression that will be applied to fields to return.
-    * @return an unsupported predicate expression.
+    * Return the flag to indicate whether filter push down has been tried. Must return true on
+    * the returned instance of [[applyPredicate]].
     */
-  def setPredicate(predicate: Array[Expression]): Array[Expression]
+  def isFilterPushedDown: Boolean
+
 }


[3/4] flink git commit: [FLINK-3849] [table] Add FilterableTableSource interface and rules for pushing it (2)

Posted by ku...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala
index 429cccb..570bdff 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala
@@ -22,17 +22,16 @@ package org.apache.flink.table.sources
   * Adds support for projection push-down to a [[TableSource]].
   * A [[TableSource]] extending this interface is able to project the fields of the return table.
   *
-  * @tparam T The return type of the [[ProjectableTableSource]].
+  * @tparam T The return type of the [[TableSource]].
   */
 trait ProjectableTableSource[T] {
 
   /**
-    * Creates a copy of the [[ProjectableTableSource]] that projects its output on the specified
-    * fields.
+    * Creates a copy of the [[TableSource]] that projects its output on the specified fields.
     *
     * @param fields The indexes of the fields to return.
-    * @return A copy of the [[ProjectableTableSource]] that projects its output.
+    * @return A copy of the [[TableSource]] that projects its output.
     */
-  def projectFields(fields: Array[Int]): ProjectableTableSource[T]
+  def projectFields(fields: Array[Int]): TableSource[T]
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
index fe205f1..c41582e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
@@ -38,4 +38,6 @@ trait TableSource[T] {
   /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. */
   def getReturnType: TypeInformation[T]
 
+  /** Describes the table source */
+  def explainSource(): String = ""
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 2c08d8d..fcfcf43 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -86,6 +86,7 @@ class FunctionCatalog {
           .getOrElse(throw ValidationException(s"Undefined scalar function: $name"))
           .asInstanceOf[ScalarSqlFunction]
         ScalarFunctionCall(scalarSqlFunction.getScalarFunction, children)
+
       // user-defined table function call
       case tf if classOf[TableFunction[_]].isAssignableFrom(tf) =>
         val tableSqlFunction = sqlFunctions
@@ -105,7 +106,7 @@ class FunctionCatalog {
               case Success(expr) => expr
               case Failure(e) => throw new ValidationException(e.getMessage)
             }
-          case Failure(e) =>
+          case Failure(_) =>
             val childrenClass = Seq.fill(children.length)(classOf[Expression])
             // try to find a constructor matching the exact number of children
             Try(funcClass.getDeclaredConstructor(childrenClass: _*)) match {
@@ -114,7 +115,7 @@ class FunctionCatalog {
                   case Success(expr) => expr
                   case Failure(exception) => throw ValidationException(exception.getMessage)
                 }
-              case Failure(exception) =>
+              case Failure(_) =>
                 throw ValidationException(s"Invalid number of arguments for function $funcClass")
             }
         }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
index 058eca7..97d4d59 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
@@ -22,8 +22,9 @@ import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.sources.{CsvTableSource, TableSource}
 import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.expressions.utils._
+import org.apache.flink.table.utils.{CommonTestData, TableTestBase, TestFilterableTableSource}
 import org.junit.{Assert, Test}
-import org.apache.flink.table.utils.{CommonTestData, TableTestBase}
 
 class TableSourceTest extends TableTestBase {
 
@@ -46,7 +47,7 @@ class TableSourceTest extends TableTestBase {
 
     val expected = unaryNode(
       "DataSetCalc",
-      projectableSourceBatchTableNode(tableName, projectedFields),
+      batchSourceTableNode(tableName, projectedFields),
       term("select", "UPPER(last) AS _c0", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
     )
 
@@ -64,7 +65,7 @@ class TableSourceTest extends TableTestBase {
 
     val expected = unaryNode(
       "DataSetCalc",
-      projectableSourceBatchTableNode(tableName, projectedFields),
+      batchSourceTableNode(tableName, projectedFields),
       term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
     )
 
@@ -83,12 +84,37 @@ class TableSourceTest extends TableTestBase {
       .scan(tableName)
       .select('id, 'score, 'first)
 
-    val expected = projectableSourceBatchTableNode(tableName, noCalcFields)
+    val expected = batchSourceTableNode(tableName, noCalcFields)
     util.verifyTable(result, expected)
   }
 
   @Test
-  def testBatchFilterableSourceScanPlanTableApi(): Unit = {
+  def testBatchFilterableWithoutPushDown(): Unit = {
+    val (tableSource, tableName) = filterableTableSource
+    val util = batchTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+        .scan(tableName)
+        .select('price, 'id, 'amount)
+        .where("price * 2 < 32")
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchSourceTableNode(
+        tableName,
+        Array("name", "id", "amount", "price")),
+      term("select", "price", "id", "amount"),
+      term("where", "<(*(price, 2), 32)")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testBatchFilterablePartialPushDown(): Unit = {
     val (tableSource, tableName) = filterableTableSource
     val util = batchTestUtil()
     val tEnv = util.tEnv
@@ -97,18 +123,94 @@ class TableSourceTest extends TableTestBase {
 
     val result = tEnv
       .scan(tableName)
-      .select('price, 'id, 'amount)
       .where("amount > 2 && price * 2 < 32")
+      .select('price, 'name.lowerCase(), 'amount)
 
     val expected = unaryNode(
       "DataSetCalc",
-      filterableSourceBatchTableNode(
+      batchFilterableSourceTableNode(
         tableName,
         Array("name", "id", "amount", "price"),
-        ">(amount, 2)"),
-      term("select", "price", "id", "amount"),
+        "'amount > 2"),
+      term("select", "price", "LOWER(name) AS _c1", "amount"),
       term("where", "<(*(price, 2), 32)")
     )
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testBatchFilterableFullyPushedDown(): Unit = {
+    val (tableSource, tableName) = filterableTableSource
+    val util = batchTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+        .scan(tableName)
+        .select('price, 'id, 'amount)
+        .where("amount > 2 && amount < 32")
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchFilterableSourceTableNode(
+        tableName,
+        Array("name", "id", "amount", "price"),
+        "'amount > 2 && 'amount < 32"),
+      term("select", "price", "id", "amount")
+    )
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testBatchFilterableWithUnconvertedExpression(): Unit = {
+    val (tableSource, tableName) = filterableTableSource
+    val util = batchTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+        .scan(tableName)
+        .select('price, 'id, 'amount)
+        .where("amount > 2 && (amount < 32 || amount.cast(LONG) > 10)") // cast can not be converted
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchFilterableSourceTableNode(
+        tableName,
+        Array("name", "id", "amount", "price"),
+        "'amount > 2"),
+      term("select", "price", "id", "amount"),
+      term("where", "OR(<(amount, 32), >(CAST(amount), 10))")
+    )
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testBatchFilterableWithUDF(): Unit = {
+    val (tableSource, tableName) = filterableTableSource
+    val util = batchTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+    val func = Func0
+    tEnv.registerFunction("func0", func)
+
+    val result = tEnv
+        .scan(tableName)
+        .select('price, 'id, 'amount)
+        .where("amount > 2 && func0(amount) < 32")
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchFilterableSourceTableNode(
+        tableName,
+        Array("name", "id", "amount", "price"),
+        "'amount > 2"),
+      term("select", "price", "id", "amount"),
+      term("where", s"<(${func.functionIdentifier}(amount), 32)")
+    )
 
     util.verifyTable(result, expected)
   }
@@ -129,7 +231,7 @@ class TableSourceTest extends TableTestBase {
 
     val expected = unaryNode(
       "DataStreamCalc",
-      projectableSourceStreamTableNode(tableName, projectedFields),
+      streamSourceTableNode(tableName, projectedFields),
       term("select", "last", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
     )
 
@@ -147,7 +249,7 @@ class TableSourceTest extends TableTestBase {
 
     val expected = unaryNode(
       "DataStreamCalc",
-      projectableSourceStreamTableNode(tableName, projectedFields),
+      streamSourceTableNode(tableName, projectedFields),
       term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
     )
 
@@ -166,7 +268,7 @@ class TableSourceTest extends TableTestBase {
       .scan(tableName)
       .select('id, 'score, 'first)
 
-    val expected = projectableSourceStreamTableNode(tableName, noCalcFields)
+    val expected = streamSourceTableNode(tableName, noCalcFields)
     util.verifyTable(result, expected)
   }
 
@@ -185,10 +287,10 @@ class TableSourceTest extends TableTestBase {
 
     val expected = unaryNode(
       "DataStreamCalc",
-      filterableSourceStreamTableNode(
+      streamFilterableSourceTableNode(
         tableName,
         Array("name", "id", "amount", "price"),
-        ">(amount, 2)"),
+        "'amount > 2"),
       term("select", "price", "id", "amount"),
       term("where", "<(*(price, 2), 32)")
     )
@@ -254,7 +356,7 @@ class TableSourceTest extends TableTestBase {
   // utils
 
   def filterableTableSource:(TableSource[_], String) = {
-    val tableSource = CommonTestData.getFilterableTableSource
+    val tableSource = new TestFilterableTableSource
     (tableSource, "filterableTable")
   }
 
@@ -264,37 +366,27 @@ class TableSourceTest extends TableTestBase {
     (csvTable, tableName)
   }
 
-  def projectableSourceBatchTableNode(
-      sourceName: String,
-      fields: Array[String]): String = {
-
-    "BatchTableSourceScan(" +
-      s"table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
+  def batchSourceTableNode(sourceName: String, fields: Array[String]): String = {
+    s"BatchTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
   }
 
-  def projectableSourceStreamTableNode(
-      sourceName: String,
-      fields: Array[String]): String = {
-    
-    "StreamTableSourceScan(" +
-      s"table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
+  def streamSourceTableNode(sourceName: String, fields: Array[String] ): String = {
+    s"StreamTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
   }
 
-  def filterableSourceBatchTableNode(
-    sourceName: String,
-    fields: Array[String],
-    exp: String): String = {
-
+  def batchFilterableSourceTableNode(
+      sourceName: String,
+      fields: Array[String],
+      exp: String): String = {
     "BatchTableSourceScan(" +
-      s"table=[[$sourceName]], fields=[${fields.mkString(", ")}], filter=[$exp])"
+      s"table=[[$sourceName]], fields=[${fields.mkString(", ")}], source=[filter=[$exp]])"
   }
 
-  def filterableSourceStreamTableNode(
-    sourceName: String,
-    fields: Array[String],
-    exp: String): String = {
-
+  def streamFilterableSourceTableNode(
+      sourceName: String,
+      fields: Array[String],
+      exp: String): String = {
     "StreamTableSourceScan(" +
-      s"table=[[$sourceName]], fields=[${fields.mkString(", ")}], filter=[$exp])"
+      s"table=[[$sourceName]], fields=[${fields.mkString(", ")}], source=[filter=[$exp]])"
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
index ca7cd8a..7e349cf 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
@@ -23,7 +23,7 @@ import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestB
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.utils.CommonTestData
+import org.apache.flink.table.utils.{CommonTestData, TestFilterableTableSource}
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit.Test
 import org.junit.runner.RunWith
@@ -107,7 +107,7 @@ class TableSourceITCase(
     val tableName = "MyTable"
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tableEnv = TableEnvironment.getTableEnvironment(env, config)
-    tableEnv.registerTableSource(tableName, CommonTestData.getFilterableTableSource)
+    tableEnv.registerTableSource(tableName, new TestFilterableTableSource)
     val results = tableEnv
       .scan(tableName)
       .where("amount > 4 && price < 9")

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
index 973c2f3..66711cb 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.utils.CommonTestData
+import org.apache.flink.table.utils.{CommonTestData, TestFilterableTableSource}
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit.Test
@@ -90,7 +90,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
     val tableName = "MyTable"
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    tEnv.registerTableSource(tableName, CommonTestData.getFilterableTableSource)
+    tEnv.registerTableSource(tableName, new TestFilterableTableSource)
     tEnv.scan(tableName)
       .where("amount > 4 && price < 9")
       .select("id, name")

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
index 30da5ba..d8de554 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
@@ -199,7 +199,7 @@ abstract class ExpressionTestBase {
     // extract RexNode
     val calcProgram = dataSetCalc
      .asInstanceOf[DataSetCalc]
-     .calcProgram
+     .getProgram
     val expanded = calcProgram.expandLocalRef(calcProgram.getProjectList.get(0))
 
     testExprs += ((expanded, expected))
@@ -222,7 +222,7 @@ abstract class ExpressionTestBase {
     // extract RexNode
     val calcProgram = dataSetCalc
      .asInstanceOf[DataSetCalc]
-     .calcProgram
+     .getProgram
     val expanded = calcProgram.expandLocalRef(calcProgram.getProjectList.get(0))
 
     testExprs += ((expanded, expected))

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractorTest.scala
deleted file mode 100644
index c4059d5..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractorTest.scala
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.util
-
-import java.math.BigDecimal
-
-import org.apache.calcite.adapter.java.JavaTypeFactory
-import org.apache.calcite.plan._
-import org.apache.calcite.plan.volcano.VolcanoPlanner
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
-import org.apache.calcite.rel.core.TableScan
-import org.apache.calcite.rex.{RexBuilder, RexProgram, RexProgramBuilder}
-import org.apache.calcite.sql.`type`.SqlTypeName._
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
-import org.apache.flink.table.expressions.{Expression, ExpressionParser}
-import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._
-import org.apache.flink.table.plan.schema.CompositeRelDataType
-import org.apache.flink.table.utils.CommonTestData
-import org.junit.Test
-import org.junit.Assert._
-
-import scala.collection.JavaConverters._
-
-class RexProgramExpressionExtractorTest {
-
-  private val typeFactory = new FlinkTypeFactory(RelDataTypeSystem.DEFAULT)
-  private val allFieldTypes = List(VARCHAR, DECIMAL, INTEGER, DOUBLE).map(typeFactory.createSqlType)
-  private val allFieldTypeInfos: Array[TypeInformation[_]] =
-    Array(BasicTypeInfo.STRING_TYPE_INFO,
-      BasicTypeInfo.BIG_DEC_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.DOUBLE_TYPE_INFO)
-  private val allFieldNames = List("name", "id", "amount", "price")
-
-  @Test
-  def testExtractExpression(): Unit = {
-    val builder: RexBuilder = new RexBuilder(typeFactory)
-    val program = buildRexProgram(
-      allFieldNames, allFieldTypes, typeFactory, builder)
-    val firstExp = ExpressionParser.parseExpression("id > 6")
-    val secondExp = ExpressionParser.parseExpression("amount * price < 100")
-    val expected: Array[Expression] = Array(firstExp, secondExp)
-    val actual = extractPredicateExpressions(
-      program,
-      builder,
-      CommonTestData.getMockTableEnvironment.getFunctionCatalog)
-
-    assertEquals(expected.length, actual.length)
-    // todo
-  }
-
-  @Test
-  def testRewriteRexProgramWithCondition(): Unit = {
-    val originalRexProgram = buildRexProgram(
-      allFieldNames, allFieldTypes, typeFactory, new RexBuilder(typeFactory))
-    val array = Array(
-      "$0",
-      "$1",
-      "$2",
-      "$3",
-      "*($t2, $t3)",
-      "100",
-      "<($t4, $t5)",
-      "6",
-      ">($t1, $t7)",
-      "AND($t6, $t8)")
-    assertTrue(extractExprStrList(originalRexProgram) sameElements array)
-
-    val tEnv = CommonTestData.getMockTableEnvironment
-    val builder = FlinkRelBuilder.create(tEnv.getFrameworkConfig)
-    val tableScan = new MockTableScan(builder.getRexBuilder)
-    val newExpression = ExpressionParser.parseExpression("amount * price < 100")
-    val newRexProgram = rewriteRexProgram(
-      originalRexProgram,
-      tableScan,
-      Array(newExpression)
-    )(builder)
-
-    val newArray = Array(
-      "$0",
-      "$1",
-      "$2",
-      "$3",
-      "*($t2, $t3)",
-      "100",
-      "<($t4, $t5)")
-    assertTrue(extractExprStrList(newRexProgram) sameElements newArray)
-  }
-
-//  @Test
-//  def testVerifyExpressions(): Unit = {
-//    val strPart = "f1 < 4"
-//    val part = parseExpression(strPart)
-//
-//    val shortFalseOrigin = parseExpression(s"f0 > 10 || $strPart")
-//    assertFalse(verifyExpressions(shortFalseOrigin, part))
-//
-//    val longFalseOrigin = parseExpression(s"(f0 > 10 || (($strPart) > POWER(f0, f1))) && 2")
-//    assertFalse(verifyExpressions(longFalseOrigin, part))
-//
-//    val shortOkayOrigin = parseExpression(s"f0 > 10 && ($strPart)")
-//    assertTrue(verifyExpressions(shortOkayOrigin, part))
-//
-//    val longOkayOrigin = parseExpression(s"f0 > 10 && (($strPart) > POWER(f0, f1))")
-//    assertTrue(verifyExpressions(longOkayOrigin, part))
-//
-//    val longOkayOrigin2 = parseExpression(s"(f0 > 10 || (2 > POWER(f0, f1))) && $strPart")
-//    assertTrue(verifyExpressions(longOkayOrigin2, part))
-//  }
-
-  private def buildRexProgram(
-      fieldNames: List[String],
-      fieldTypes: Seq[RelDataType],
-      typeFactory: JavaTypeFactory,
-      rexBuilder: RexBuilder): RexProgram = {
-
-    val inputRowType = typeFactory.createStructType(fieldTypes.asJava, fieldNames.asJava)
-    val builder = new RexProgramBuilder(inputRowType, rexBuilder)
-
-    val t0 = rexBuilder.makeInputRef(fieldTypes(2), 2)
-    val t1 = rexBuilder.makeInputRef(fieldTypes(1), 1)
-    val t2 = rexBuilder.makeInputRef(fieldTypes(3), 3)
-    // t3 = t0 * t2
-    val t3 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.MULTIPLY, t0, t2))
-    val t4 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L))
-    val t5 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(6L))
-    // project: amount, amount * price
-    builder.addProject(t0, "amount")
-    builder.addProject(t3, "total")
-    // t6 = t3 < t4
-    val t6 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, t3, t4))
-    // t7 = t1 > t5
-    val t7 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN, t1, t5))
-    val t8 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.AND, List(t6, t7).asJava))
-    // condition: t6 and t7
-    // (t0 * t2 < t4 && t1 > t5)
-    builder.addCondition(t8)
-    builder.getProgram
-  }
-
-  /**
-    * extract all expression string list from input RexProgram expression lists
-    *
-    * @param rexProgram input RexProgram instance to analyze
-    * @return all expression string list of input RexProgram expression lists
-    */
-  private def extractExprStrList(rexProgram: RexProgram) =
-    rexProgram.getExprList.asScala.map(_.toString).toArray
-
-  class MockTableScan(
-      rexBuilder: RexBuilder)
-    extends TableScan(
-      RelOptCluster.create(new VolcanoPlanner(), rexBuilder),
-      RelTraitSet.createEmpty,
-      new MockRelOptTable)
-
-  class MockRelOptTable
-    extends RelOptAbstractTable(
-      null,
-      "mockRelTable",
-      new CompositeRelDataType(
-        new RowTypeInfo(allFieldTypeInfos, allFieldNames.toArray), typeFactory))
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala
new file mode 100644
index 0000000..b0a5fcf
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import java.math.BigDecimal
+
+import org.apache.calcite.rex.{RexBuilder, RexProgramBuilder}
+import org.apache.calcite.sql.SqlPostfixOperator
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.flink.table.expressions.{Expression, ExpressionParser}
+import org.apache.flink.table.validate.FunctionCatalog
+import org.junit.Assert.{assertArrayEquals, assertEquals}
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class RexProgramExtractorTest extends RexProgramTestBase {
+
+  private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns
+
+  @Test
+  def testExtractRefInputFields(): Unit = {
+    val usedFields = RexProgramExtractor.extractRefInputFields(buildSimpleRexProgram())
+    assertArrayEquals(usedFields, Array(2, 3, 1))
+  }
+
+  @Test
+  def testExtractSimpleCondition(): Unit = {
+    val builder: RexBuilder = new RexBuilder(typeFactory)
+    val program = buildSimpleRexProgram()
+
+    val firstExp = ExpressionParser.parseExpression("id > 6")
+    val secondExp = ExpressionParser.parseExpression("amount * price < 100")
+    val expected: Array[Expression] = Array(firstExp, secondExp)
+
+    val (convertedExpressions, unconvertedRexNodes) =
+      RexProgramExtractor.extractConjunctiveConditions(
+        program,
+        builder,
+        functionCatalog)
+
+    assertExpressionArrayEquals(expected, convertedExpressions)
+    assertEquals(0, unconvertedRexNodes.length)
+  }
+
+  @Test
+  def testExtractSingleCondition(): Unit = {
+    val inputRowType = typeFactory.createStructType(allFieldTypes, allFieldNames)
+    val builder = new RexProgramBuilder(inputRowType, rexBuilder)
+
+    // amount
+    val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2)
+    // id
+    val t1 = rexBuilder.makeInputRef(allFieldTypes.get(1), 1)
+
+    // a = amount >= id
+    val a = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, t0, t1))
+    builder.addCondition(a)
+
+    val program = builder.getProgram
+    val relBuilder: RexBuilder = new RexBuilder(typeFactory)
+    val (convertedExpressions, unconvertedRexNodes) =
+      RexProgramExtractor.extractConjunctiveConditions(
+        program,
+        relBuilder,
+        functionCatalog)
+
+    val expected: Array[Expression] = Array(ExpressionParser.parseExpression("amount >= id"))
+    assertExpressionArrayEquals(expected, convertedExpressions)
+    assertEquals(0, unconvertedRexNodes.length)
+  }
+
+  // ((a AND b) OR c) AND (NOT d) => (a OR c) AND (b OR c) AND (NOT d)
+  @Test
+  def testExtractCnfCondition(): Unit = {
+    val inputRowType = typeFactory.createStructType(allFieldTypes, allFieldNames)
+    val builder = new RexProgramBuilder(inputRowType, rexBuilder)
+
+    // amount
+    val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2)
+    // id
+    val t1 = rexBuilder.makeInputRef(allFieldTypes.get(1), 1)
+    // price
+    val t2 = rexBuilder.makeInputRef(allFieldTypes.get(3), 3)
+    // 100
+    val t3 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L))
+
+    // a = amount < 100
+    val a = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, t0, t3))
+    // b = id > 100
+    val b = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN, t1, t3))
+    // c = price == 100
+    val c = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, t2, t3))
+    // d = amount <= id
+    val d = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, t0, t1))
+
+    // a AND b
+    val and = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.AND, List(a, b).asJava))
+    // (a AND b) or c
+    val or = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.OR, List(and, c).asJava))
+    // not d
+    val not = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.NOT, List(d).asJava))
+
+    // (a AND b) OR c) AND (NOT d)
+    builder.addCondition(builder.addExpr(
+      rexBuilder.makeCall(SqlStdOperatorTable.AND, List(or, not).asJava)))
+
+    val program = builder.getProgram
+    val relBuilder: RexBuilder = new RexBuilder(typeFactory)
+    val (convertedExpressions, unconvertedRexNodes) =
+      RexProgramExtractor.extractConjunctiveConditions(
+        program,
+        relBuilder,
+        functionCatalog)
+
+    val expected: Array[Expression] = Array(
+      ExpressionParser.parseExpression("amount < 100 || price == 100"),
+      ExpressionParser.parseExpression("id > 100 || price == 100"),
+      ExpressionParser.parseExpression("!(amount <= id)"))
+    assertExpressionArrayEquals(expected, convertedExpressions)
+    assertEquals(0, unconvertedRexNodes.length)
+  }
+
+  @Test
+  def testExtractArithmeticConditions(): Unit = {
+    val inputRowType = typeFactory.createStructType(allFieldTypes, allFieldNames)
+    val builder = new RexProgramBuilder(inputRowType, rexBuilder)
+
+    // amount
+    val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2)
+    // id
+    val t1 = rexBuilder.makeInputRef(allFieldTypes.get(1), 1)
+    // 100
+    val t2 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L))
+
+    val condition = List(
+      // amount < id
+      builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, t0, t1)),
+      // amount <= id
+      builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, t0, t1)),
+      // amount <> id
+      builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.NOT_EQUALS, t0, t1)),
+      // amount == id
+      builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, t0, t1)),
+      // amount >= id
+      builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, t0, t1)),
+      // amount > id
+      builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN, t0, t1)),
+      // amount + id == 100
+      builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.EQUALS,
+        rexBuilder.makeCall(SqlStdOperatorTable.PLUS, t0, t1), t2)),
+      // amount - id == 100
+      builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.EQUALS,
+        rexBuilder.makeCall(SqlStdOperatorTable.MINUS, t0, t1), t2)),
+      // amount * id == 100
+      builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.EQUALS,
+        rexBuilder.makeCall(SqlStdOperatorTable.MULTIPLY, t0, t1), t2)),
+      // amount / id == 100
+      builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.EQUALS,
+        rexBuilder.makeCall(SqlStdOperatorTable.DIVIDE, t0, t1), t2)),
+      // -amount == 100
+      builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.EQUALS,
+        rexBuilder.makeCall(SqlStdOperatorTable.UNARY_MINUS, t0), t2))
+    ).asJava
+
+    builder.addCondition(builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.AND, condition)))
+    val program = builder.getProgram
+    val relBuilder: RexBuilder = new RexBuilder(typeFactory)
+    val (convertedExpressions, unconvertedRexNodes) =
+      RexProgramExtractor.extractConjunctiveConditions(
+        program,
+        relBuilder,
+        functionCatalog)
+
+    val expected: Array[Expression] = Array(
+      ExpressionParser.parseExpression("amount < id"),
+      ExpressionParser.parseExpression("amount <= id"),
+      ExpressionParser.parseExpression("amount <> id"),
+      ExpressionParser.parseExpression("amount == id"),
+      ExpressionParser.parseExpression("amount >= id"),
+      ExpressionParser.parseExpression("amount > id"),
+      ExpressionParser.parseExpression("amount + id == 100"),
+      ExpressionParser.parseExpression("amount - id == 100"),
+      ExpressionParser.parseExpression("amount * id == 100"),
+      ExpressionParser.parseExpression("amount / id == 100"),
+      ExpressionParser.parseExpression("-amount == 100")
+    )
+    assertExpressionArrayEquals(expected, convertedExpressions)
+    assertEquals(0, unconvertedRexNodes.length)
+  }
+
+  @Test
+  def testExtractPostfixConditions(): Unit = {
+    testExtractSinglePostfixCondition(4, SqlStdOperatorTable.IS_NULL, "('flag).isNull")
+    // IS_NOT_NULL will be eliminated since flag is not nullable
+    // testExtractSinglePostfixCondition(SqlStdOperatorTable.IS_NOT_NULL, "('flag).isNotNull")
+    testExtractSinglePostfixCondition(4, SqlStdOperatorTable.IS_TRUE, "('flag).isTrue")
+    testExtractSinglePostfixCondition(4, SqlStdOperatorTable.IS_NOT_TRUE, "('flag).isNotTrue")
+    testExtractSinglePostfixCondition(4, SqlStdOperatorTable.IS_FALSE, "('flag).isFalse")
+    testExtractSinglePostfixCondition(4, SqlStdOperatorTable.IS_NOT_FALSE, "('flag).isNotFalse")
+  }
+
+  @Test
+  def testExtractConditionWithFunctionCalls(): Unit = {
+    val inputRowType = typeFactory.createStructType(allFieldTypes, allFieldNames)
+    val builder = new RexProgramBuilder(inputRowType, rexBuilder)
+
+    // amount
+    val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2)
+    // id
+    val t1 = rexBuilder.makeInputRef(allFieldTypes.get(1), 1)
+    // 100
+    val t2 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L))
+
+    // sum(amount) > 100
+    val condition1 = builder.addExpr(
+      rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN,
+        rexBuilder.makeCall(SqlStdOperatorTable.SUM, t0), t2))
+
+    // min(id) == 100
+    val condition2 = builder.addExpr(
+      rexBuilder.makeCall(SqlStdOperatorTable.EQUALS,
+        rexBuilder.makeCall(SqlStdOperatorTable.MIN, t1), t2))
+
+    builder.addCondition(builder.addExpr(
+      rexBuilder.makeCall(SqlStdOperatorTable.AND, condition1, condition2)))
+
+    val program = builder.getProgram
+    val relBuilder: RexBuilder = new RexBuilder(typeFactory)
+    val (convertedExpressions, unconvertedRexNodes) =
+      RexProgramExtractor.extractConjunctiveConditions(
+        program,
+        relBuilder,
+        functionCatalog)
+
+    val expected: Array[Expression] = Array(
+      ExpressionParser.parseExpression("sum(amount) > 100"),
+      ExpressionParser.parseExpression("min(id) == 100")
+    )
+    assertExpressionArrayEquals(expected, convertedExpressions)
+    assertEquals(0, unconvertedRexNodes.length)
+  }
+
+  @Test
+  def testExtractWithUnsupportedConditions(): Unit = {
+    val inputRowType = typeFactory.createStructType(allFieldTypes, allFieldNames)
+    val builder = new RexProgramBuilder(inputRowType, rexBuilder)
+
+    // amount
+    val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2)
+    // id
+    val t1 = rexBuilder.makeInputRef(allFieldTypes.get(1), 1)
+    // 100
+    val t2 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L))
+
+    // unsupported now: amount.cast(BigInteger)
+    val cast = builder.addExpr(rexBuilder.makeCast(allFieldTypes.get(1), t0))
+
+    // unsupported now: amount.cast(BigInteger) > 100
+    val condition1 = builder.addExpr(
+      rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN, cast, t2))
+
+    // amount <= id
+    val condition2 = builder.addExpr(
+      rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, t0, t1))
+
+    // contains unsupported condition: (amount.cast(BigInteger) > 100 OR amount <= id)
+    val condition3 = builder.addExpr(
+      rexBuilder.makeCall(SqlStdOperatorTable.OR, condition1, condition2))
+
+    // only condition2 can be translated
+    builder.addCondition(
+      rexBuilder.makeCall(SqlStdOperatorTable.AND, condition1, condition2, condition3))
+
+    val program = builder.getProgram
+    val relBuilder: RexBuilder = new RexBuilder(typeFactory)
+    val (convertedExpressions, unconvertedRexNodes) =
+      RexProgramExtractor.extractConjunctiveConditions(
+        program,
+        relBuilder,
+        functionCatalog)
+
+    val expected: Array[Expression] = Array(
+      ExpressionParser.parseExpression("amount <= id")
+    )
+    assertExpressionArrayEquals(expected, convertedExpressions)
+    assertEquals(2, unconvertedRexNodes.length)
+    assertEquals(">(CAST($2):BIGINT NOT NULL, 100)", unconvertedRexNodes(0).toString)
+    assertEquals("OR(>(CAST($2):BIGINT NOT NULL, 100), <=($2, $1))",
+      unconvertedRexNodes(1).toString)
+  }
+
+  private def testExtractSinglePostfixCondition(
+      fieldIndex: Integer,
+      op: SqlPostfixOperator,
+      expr: String) : Unit = {
+
+    val inputRowType = typeFactory.createStructType(allFieldTypes, allFieldNames)
+    val builder = new RexProgramBuilder(inputRowType, rexBuilder)
+    rexBuilder = new RexBuilder(typeFactory)
+
+    // flag
+    val t0 = rexBuilder.makeInputRef(allFieldTypes.get(fieldIndex), fieldIndex)
+    builder.addCondition(builder.addExpr(rexBuilder.makeCall(op, t0)))
+
+    val program = builder.getProgram(false)
+    val relBuilder: RexBuilder = new RexBuilder(typeFactory)
+    val (convertedExpressions, unconvertedRexNodes) =
+      RexProgramExtractor.extractConjunctiveConditions(
+        program,
+        relBuilder,
+        functionCatalog)
+
+    assertEquals(1, convertedExpressions.length)
+    assertEquals(expr, convertedExpressions.head.toString)
+    assertEquals(0, unconvertedRexNodes.length)
+  }
+
+  private def assertExpressionArrayEquals(
+      expected: Array[Expression],
+      actual: Array[Expression]) = {
+    val sortedExpected = expected.sortBy(e => e.toString)
+    val sortedActual = actual.sortBy(e => e.toString)
+
+    assertEquals(sortedExpected.length, sortedActual.length)
+    sortedExpected.zip(sortedActual).foreach {
+      case (l, r) => assertEquals(l.toString, r.toString)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractorTest.scala
deleted file mode 100644
index cea9eee..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractorTest.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.util
-
-import java.math.BigDecimal
-
-import org.apache.calcite.adapter.java.JavaTypeFactory
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
-import org.apache.calcite.rex.{RexBuilder, RexProgram, RexProgramBuilder}
-import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, DOUBLE, INTEGER, VARCHAR}
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.flink.table.plan.util.RexProgramProjectExtractor._
-import org.junit.Assert.{assertArrayEquals, assertTrue}
-import org.junit.{Before, Test}
-
-import scala.collection.JavaConverters._
-
-/**
-  * This class is responsible for testing RexProgramProjectExtractor.
-  */
-class RexProgramProjectExtractorTest {
-  private var typeFactory: JavaTypeFactory = _
-  private var rexBuilder: RexBuilder = _
-  private var allFieldTypes: Seq[RelDataType] = _
-  private val allFieldNames = List("name", "id", "amount", "price")
-
-  @Before
-  def setUp(): Unit = {
-    typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT)
-    rexBuilder = new RexBuilder(typeFactory)
-    allFieldTypes = List(VARCHAR, BIGINT, INTEGER, DOUBLE).map(typeFactory.createSqlType(_))
-  }
-
-  @Test
-  def testExtractRefInputFields(): Unit = {
-    val usedFields = extractRefInputFields(buildRexProgram())
-    assertArrayEquals(usedFields, Array(2, 3, 1))
-  }
-
-  @Test
-  def testRewriteRexProgram(): Unit = {
-    val originRexProgram = buildRexProgram()
-    assertTrue(extractExprStrList(originRexProgram).sameElements(Array(
-      "$0",
-      "$1",
-      "$2",
-      "$3",
-      "*($t2, $t3)",
-      "100",
-      "<($t4, $t5)",
-      "6",
-      ">($t1, $t7)",
-      "AND($t6, $t8)")))
-    // use amount, id, price fields to create a new RexProgram
-    val usedFields = Array(2, 3, 1)
-    val types = usedFields.map(allFieldTypes(_)).toList.asJava
-    val names = usedFields.map(allFieldNames(_)).toList.asJava
-    val inputRowType = typeFactory.createStructType(types, names)
-    val newRexProgram = rewriteRexProgram(originRexProgram, inputRowType, usedFields, rexBuilder)
-    assertTrue(extractExprStrList(newRexProgram).sameElements(Array(
-      "$0",
-      "$1",
-      "$2",
-      "*($t0, $t1)",
-      "100",
-      "<($t3, $t4)",
-      "6",
-      ">($t2, $t6)",
-      "AND($t5, $t7)")))
-  }
-
-  private def buildRexProgram(): RexProgram = {
-    val types = allFieldTypes.asJava
-    val names = allFieldNames.asJava
-    val inputRowType = typeFactory.createStructType(types, names)
-    val builder = new RexProgramBuilder(inputRowType, rexBuilder)
-    val t0 = rexBuilder.makeInputRef(types.get(2), 2)
-    val t1 = rexBuilder.makeInputRef(types.get(1), 1)
-    val t2 = rexBuilder.makeInputRef(types.get(3), 3)
-    val t3 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.MULTIPLY, t0, t2))
-    val t4 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L))
-    val t5 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(6L))
-    // project: amount, amount * price
-    builder.addProject(t0, "amount")
-    builder.addProject(t3, "total")
-    // condition: amount * price < 100 and id > 6
-    val t6 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, t3, t4))
-    val t7 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN, t1, t5))
-    val t8 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.AND, List(t6, t7).asJava))
-    builder.addCondition(t8)
-    builder.getProgram
-  }
-
-  /**
-    * extract all expression string list from input RexProgram expression lists
-    *
-    * @param rexProgram input RexProgram instance to analyze
-    * @return all expression string list of input RexProgram expression lists
-    */
-  private def extractExprStrList(rexProgram: RexProgram) = {
-    rexProgram.getExprList.asScala.map(_.toString)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramRewriterTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramRewriterTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramRewriterTest.scala
new file mode 100644
index 0000000..899eed2
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramRewriterTest.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.junit.Assert.assertTrue
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class RexProgramRewriterTest extends RexProgramTestBase {
+
+  @Test
+  def testRewriteRexProgram(): Unit = {
+    val rexProgram = buildSimpleRexProgram()
+    assertTrue(extractExprStrList(rexProgram) == wrapRefArray(Array(
+      "$0",
+      "$1",
+      "$2",
+      "$3",
+      "$4",
+      "*($t2, $t3)",
+      "100",
+      "<($t5, $t6)",
+      "6",
+      ">($t1, $t8)",
+      "AND($t7, $t9)")))
+
+    // use amount, id, price fields to create a new RexProgram
+    val usedFields = Array(2, 3, 1)
+    val types = usedFields.map(allFieldTypes.get).toList.asJava
+    val names = usedFields.map(allFieldNames.get).toList.asJava
+    val inputRowType = typeFactory.createStructType(types, names)
+    val newRexProgram = RexProgramRewriter.rewriteWithFieldProjection(
+      rexProgram, inputRowType, rexBuilder, usedFields)
+    assertTrue(extractExprStrList(newRexProgram) == wrapRefArray(Array(
+      "$0",
+      "$1",
+      "$2",
+      "*($t0, $t1)",
+      "100",
+      "<($t3, $t4)",
+      "6",
+      ">($t2, $t6)",
+      "AND($t5, $t7)")))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramTestBase.scala
new file mode 100644
index 0000000..6ef3d82
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramTestBase.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import java.math.BigDecimal
+import java.util
+
+import org.apache.calcite.adapter.java.JavaTypeFactory
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
+import org.apache.calcite.rex.{RexBuilder, RexProgram, RexProgramBuilder}
+import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, DOUBLE, INTEGER, VARCHAR, BOOLEAN}
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+abstract class RexProgramTestBase {
+
+  val typeFactory: JavaTypeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT)
+
+  val allFieldNames: util.List[String] = List("name", "id", "amount", "price", "flag").asJava
+
+  val allFieldTypes: util.List[RelDataType] =
+    List(VARCHAR, BIGINT, INTEGER, DOUBLE, BOOLEAN).map(typeFactory.createSqlType).asJava
+
+  var rexBuilder: RexBuilder = new RexBuilder(typeFactory)
+
+  /**
+    * extract all expression string list from input RexProgram expression lists
+    *
+    * @param rexProgram input RexProgram instance to analyze
+    * @return all expression string list of input RexProgram expression lists
+    */
+  protected def extractExprStrList(rexProgram: RexProgram): mutable.Buffer[String] = {
+    rexProgram.getExprList.asScala.map(_.toString)
+  }
+
+  // select amount, amount * price as total where amount * price < 100 and id > 6
+  protected def buildSimpleRexProgram(): RexProgram = {
+    val inputRowType = typeFactory.createStructType(allFieldTypes, allFieldNames)
+    val builder = new RexProgramBuilder(inputRowType, rexBuilder)
+
+    val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2)
+    val t1 = rexBuilder.makeInputRef(allFieldTypes.get(1), 1)
+    val t2 = rexBuilder.makeInputRef(allFieldTypes.get(3), 3)
+    val t3 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.MULTIPLY, t0, t2))
+    val t4 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L))
+    val t5 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(6L))
+
+    // project: amount, amount * price as total
+    builder.addProject(t0, "amount")
+    builder.addProject(t3, "total")
+
+    // condition: amount * price < 100 and id > 6
+    val t6 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, t3, t4))
+    val t7 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN, t1, t5))
+    val t8 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.AND, List(t6, t7).asJava))
+    builder.addCondition(t8)
+
+    builder.getProgram
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
index a720f02..2364f23 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
@@ -21,21 +21,11 @@ package org.apache.flink.table.utils
 import java.io.{File, FileOutputStream, OutputStreamWriter}
 import java.util
 
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.table.sources.{BatchTableSource, CsvTableSource}
-import org.apache.calcite.tools.RuleSet
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment}
-import org.apache.flink.table.expressions._
-import org.apache.flink.table.sinks.TableSink
-import org.apache.flink.table.sources._
-import org.apache.flink.types.Row
-
-import scala.collection.JavaConverters._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.sources.{BatchTableSource, CsvTableSource}
 
 object CommonTestData {
 
@@ -108,110 +98,4 @@ object CommonTestData {
 
   def getMockTableEnvironment: TableEnvironment = new MockTableEnvironment
 
-  def getFilterableTableSource = new TestFilterableTableSource
-}
-
-class MockTableEnvironment extends TableEnvironment(new TableConfig) {
-
-  override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = ???
-
-  override protected def checkValidTableName(name: String): Unit = ???
-
-  override def sql(query: String): Table = ???
-
-  override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = ???
-
-  override protected def getBuiltInNormRuleSet: RuleSet = ???
-
-  override protected def getBuiltInOptRuleSet: RuleSet = ???
-}
-
-class TestFilterableTableSource
-    extends BatchTableSource[Row]
-    with StreamTableSource[Row]
-    with FilterableTableSource
-    with DefinedFieldNames {
-
-  import org.apache.flink.table.api.Types._
-
-  val fieldNames = Array("name", "id", "amount", "price")
-  val fieldTypes = Array[TypeInformation[_]](STRING, LONG, INT, DOUBLE)
-
-  private var filterLiteral: Literal = _
-  private var filterPredicates: Array[Expression] = Array.empty
-
-  /** Returns the data of the table as a [[DataSet]]. */
-  override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
-    execEnv.fromCollection[Row](generateDynamicCollection(33).asJava, getReturnType)
-  }
-
-  /** Returns the data of the table as a [[DataStream]]. */
-  def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
-    execEnv.fromCollection[Row](generateDynamicCollection(33).asJava, getReturnType)
-  }
-
-  private def generateDynamicCollection(num: Int): Seq[Row] = {
-
-    if (filterLiteral == null) {
-      throw new RuntimeException("filter expression was not set")
-    }
-
-    val filterValue = filterLiteral.value.asInstanceOf[Number].intValue()
-
-    def shouldCreateRow(value: Int): Boolean = {
-      value > filterValue
-    }
-
-    for {
-      cnt <- 0 until num
-      if shouldCreateRow(cnt)
-    } yield {
-        val row = new Row(fieldNames.length)
-        fieldNames.zipWithIndex.foreach { case (name, index) =>
-          name match {
-            case "name" =>
-              row.setField(index, s"Record_$cnt")
-            case "id" =>
-              row.setField(index, cnt.toLong)
-            case "amount" =>
-              row.setField(index, cnt.toInt)
-            case "price" =>
-              row.setField(index, cnt.toDouble)
-          }
-        }
-      row
-      }
-  }
-
-  /** Returns the [[TypeInformation]] for the return type. */
-  override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes, fieldNames)
-
-  /** Returns the names of the table fields. */
-  override def getFieldNames: Array[String] = fieldNames
-
-  /** Returns the indices of the table fields. */
-  override def getFieldIndices: Array[Int] = fieldNames.indices.toArray
-
-  override def getPredicate: Array[Expression] = filterPredicates
-
-  /** Return an unsupported predicates expression. */
-  override def setPredicate(predicates: Array[Expression]): Array[Expression] = {
-    predicates(0) match {
-      case gt: GreaterThan =>
-        gt.left match {
-          case f: ResolvedFieldReference =>
-            gt.right match {
-              case l: Literal =>
-                if (f.name.equals("amount")) {
-                  filterLiteral = l
-                  filterPredicates = Array(predicates(0))
-                  Array(predicates(1))
-                } else predicates
-              case _ => predicates
-            }
-          case _ => predicates
-        }
-      case _ => predicates
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
new file mode 100644
index 0000000..6a86ace
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import org.apache.calcite.tools.RuleSet
+import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment}
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
+
+class MockTableEnvironment extends TableEnvironment(new TableConfig) {
+
+  override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = ???
+
+  override protected def checkValidTableName(name: String): Unit = ???
+
+  override def sql(query: String): Table = ???
+
+  override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = ???
+
+  override protected def getBuiltInNormRuleSet: RuleSet = ???
+
+  override protected def getBuiltInOptRuleSet: RuleSet = ???
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala
new file mode 100644
index 0000000..dcf2acd
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.api.Types._
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.sources.{BatchTableSource, FilterableTableSource, StreamTableSource, TableSource}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+  * This source can only handle simple comparision with field "amount".
+  * Supports ">, <, >=, <=, =, <>" with an integer.
+  */
+class TestFilterableTableSource(
+    val recordNum: Int = 33)
+    extends BatchTableSource[Row]
+        with StreamTableSource[Row]
+        with FilterableTableSource[Row] {
+
+  var filterPushedDown: Boolean = false
+
+  val fieldNames: Array[String] = Array("name", "id", "amount", "price")
+
+  val fieldTypes: Array[TypeInformation[_]] = Array(STRING, LONG, INT, DOUBLE)
+
+  // all predicates with field "amount"
+  private var filterPredicates = new mutable.ArrayBuffer[Expression]
+
+  // all comparing values for field "amount"
+  private val filterValues = new mutable.ArrayBuffer[Int]
+
+  override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
+    execEnv.fromCollection[Row](generateDynamicCollection().asJava, getReturnType)
+  }
+
+  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
+    execEnv.fromCollection[Row](generateDynamicCollection().asJava, getReturnType)
+  }
+
+  override def explainSource(): String = {
+    if (filterPredicates.nonEmpty) {
+      s"filter=[${filterPredicates.reduce((l, r) => And(l, r)).toString}]"
+    } else {
+      ""
+    }
+  }
+
+  override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes, fieldNames)
+
+  override def applyPredicate(predicates: JList[Expression]): TableSource[Row] = {
+    val newSource = new TestFilterableTableSource(recordNum)
+    newSource.filterPushedDown = true
+
+    val iterator = predicates.iterator()
+    while (iterator.hasNext) {
+      iterator.next() match {
+        case expr: BinaryComparison =>
+          (expr.left, expr.right) match {
+            case (f: ResolvedFieldReference, v: Literal) if f.name.equals("amount") =>
+              newSource.filterPredicates += expr
+              newSource.filterValues += v.value.asInstanceOf[Number].intValue()
+              iterator.remove()
+            case (_, _) =>
+          }
+      }
+    }
+
+    newSource
+  }
+
+  override def isFilterPushedDown: Boolean = filterPushedDown
+
+  private def generateDynamicCollection(): Seq[Row] = {
+    Preconditions.checkArgument(filterPredicates.length == filterValues.length)
+
+    for {
+      cnt <- 0 until recordNum
+      if shouldCreateRow(cnt)
+    } yield {
+      Row.of(
+        s"Record_$cnt",
+        cnt.toLong.asInstanceOf[Object],
+        cnt.toInt.asInstanceOf[Object],
+        cnt.toDouble.asInstanceOf[Object])
+    }
+  }
+
+  private def shouldCreateRow(value: Int): Boolean = {
+    filterPredicates.zip(filterValues).forall {
+      case (_: GreaterThan, v) =>
+        value > v
+      case (_: LessThan, v) =>
+        value < v
+      case (_: GreaterThanOrEqual, v) =>
+        value >= v
+      case (_: LessThanOrEqual, v) =>
+        value <= v
+      case (_: EqualTo, v) =>
+        value == v
+      case (_: NotEqualTo, v) =>
+        value != v
+      case (expr, _) =>
+        throw new RuntimeException(expr + " not supported!")
+    }
+  }
+}
+