You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tonycox <gi...@git.apache.org> on 2017/01/18 17:53:05 UTC

[GitHub] flink pull request #3166: [FLINK-3849] [WIP] Add FilterableTableSource inter...

GitHub user tonycox opened a pull request:

    https://github.com/apache/flink/pull/3166

    [FLINK-3849] [WIP] Add FilterableTableSource interface and Rules for pushing it

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [ ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed
    
    I hove some questions:
    1) how to mock `TableScan` and `RelBuilder` in `#testRewriteRexProgramWithCondition`
    2) how to show filter predicate in `#explainTerms`

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tonycox/flink filterableSource

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3166.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3166
    
----
commit 73448a0425cb53f6b9db0bef5200537e2703a8b0
Author: tonycox <an...@epam.com>
Date:   2017-01-13T17:08:47Z

    [FLINK-5481] Add type extraction from collection

commit 0d7e86f6e37ded863f2b930e2d8ebba9a8fc1c07
Author: tonycox <an...@epam.com>
Date:   2017-01-17T15:31:15Z

    search types by columns

commit 4f38c69c1a307d1910f8c4ad811fb32509248880
Author: tonycox <an...@epam.com>
Date:   2017-01-11T09:15:49Z

    [FLINK-3849] Add FilterableTableSource interface and Rules for pushing it

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r101310511
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramExpressionExtractor.scala ---
    @@ -0,0 +1,198 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.util
    +
    +import java.util
    +
    +import org.apache.calcite.rel.core.TableScan
    +import org.apache.calcite.rex._
    +import org.apache.calcite.sql.{SqlKind, SqlOperator}
    +import org.apache.calcite.tools.RelBuilder
    +import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.sources.TableSource
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +object RexProgramExpressionExtractor {
    +
    +  /**
    +    * converts a rexProgram condition into expression
    +    *
    +    * @param rexProgram The RexProgram to analyze
    +    * @return converted expression
    +    */
    +  def extractExpression(rexProgram: RexProgram): Expression = {
    +
    +    val refInputToName = getInputsWithNames(rexProgram)
    +    val visitor = new ExpressionVisitor(refInputToName)
    +
    +    val condition = rexProgram.getCondition
    +    if (condition == null) {
    +      return null
    +    }
    +
    +    rexProgram.expandLocalRef(condition).accept(visitor)
    +    val parsedExpression = ExpressionParser.parseExpression(visitor.getStringPredicate)
    +
    +    parsedExpression
    +  }
    +
    +  /**
    +    * verify can the original expression be divided into `new` expression
    +    * and remainder part without loss of logical correctness
    +    *
    +    * @param original initial expression
    +    * @param lump part of original expression
    +    * @return whether or not to decouple parts of the origin expression
    +    */
    +  def verifyExpressions(original: Expression, lump: Expression): Boolean = {
    +    if (original == null & lump == null) {
    +      return false
    +    }
    +    if (original.children.isEmpty | !checkOperator(original)) {
    +      return false
    +    }
    +    val head = original.children.head
    +    val last = original.children.last
    +    if (head.checkEquals(lump)) {
    +      return checkOperator(original)
    +    }
    +    if (last.checkEquals(lump)) {
    +      return checkOperator(original)
    +    }
    +    verifyExpressions(head, lump) match {
    +      case true => true
    +      case _ => verifyExpressions(last, lump)
    +    }
    +  }
    +
    +  private def checkOperator(original: Expression): Boolean = {
    +    original match {
    +      case o: Or => false
    +      case _ => true
    +    }
    +  }
    +
    +  /**
    +    * Generates a new RexProgram based on new expression.
    +    *
    +    * @param rexProgram original RexProgram
    +    * @param scan input source
    +    * @param expression filter condition (fields must be resolved)
    +    * @param tableSource source to get names and type of table
    +    * @param relBuilder builder for converting expression to Rex
    +    */
    +  def rewriteRexProgram(
    +      rexProgram: RexProgram,
    +      scan: TableScan,
    +      expression: Expression,
    +      tableSource: TableSource[_])(implicit relBuilder: RelBuilder): RexProgram = {
    +
    +    if (expression != null) {
    +
    +      val names = TableEnvironment.getFieldNames(tableSource)
    +
    +      val nameToType = names
    +        .zip(TableEnvironment.getFieldTypes(tableSource)).toMap
    +
    +      relBuilder.push(scan)
    +
    +      val rule: PartialFunction[Expression, Expression] = {
    +        case u@UnresolvedFieldReference(name) =>
    +          ResolvedFieldReference(name, nameToType(name))
    +      }
    +
    +      val newProjectExpressions = rewriteProjects(rexProgram, names)
    +
    +      val resolvedExp = expression.postOrderTransform(rule)
    +
    +      RexProgram.create(
    +        rexProgram.getInputRowType,
    +        newProjectExpressions,
    +        resolvedExp.toRexNode,
    +        rexProgram.getOutputRowType,
    +        relBuilder.getRexBuilder)
    +    } else {
    +      rexProgram
    +    }
    +  }
    +
    +  private def rewriteProjects(
    +      rexProgram: RexProgram,
    +      names: Array[String]): util.List[RexNode] = {
    +
    +    val inputRewriter = new InputRewriter(names.indices.toArray)
    +    val newProject = rexProgram.getProjectList.map(
    +      exp => rexProgram.expandLocalRef(exp).accept(inputRewriter)
    +    ).toList.asJava
    +    newProject
    +  }
    +
    +  private def getInputsWithNames(rexProgram: RexProgram): Map[RexInputRef, String] = {
    +    val names = rexProgram.getInputRowType.getFieldNames
    +    rexProgram.getExprList.asScala.map {
    +      case i: RexInputRef =>
    +        i -> names(i.getIndex)
    +      case _ => null
    +    }.filter(_ != null)
    --- End diff --
    
    Filter on `RexInputRef` before performing the mapping?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r102497345
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala ---
    @@ -0,0 +1,95 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.datastream
    +
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
    +import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan}
    +import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._
    +import org.apache.flink.table.plan.schema.TableSourceTable
    +import org.apache.flink.table.sources.FilterableTableSource
    +
    +class PushFilterIntoStreamTableSourceScanRule extends RelOptRule(
    +  operand(classOf[DataStreamCalc],
    +    operand(classOf[StreamTableSourceScan], none)),
    +  "PushFilterIntoStreamTableSourceScanRule") {
    +
    +  override def matches(call: RelOptRuleCall) = {
    +    val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc]
    +    val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan]
    +    scan.tableSource match {
    +      case _: FilterableTableSource =>
    +        calc.calcProgram.getCondition != null
    +      case _ => false
    +    }
    +  }
    +
    +  override def onMatch(call: RelOptRuleCall): Unit = {
    +    val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc]
    +    val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan]
    +
    +    val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource]
    +
    +    val program = calc.calcProgram
    +    val tst = scan.getTable.unwrap(classOf[TableSourceTable[_]])
    +    val predicates = extractPredicateExpressions(
    +      program,
    +      call.builder().getRexBuilder,
    +      tst.tableEnv.getFunctionCatalog)
    +
    +    if (predicates.length != 0) {
    +      val remainingPredicate = filterableSource.setPredicate(predicates)
    --- End diff --
    
    Do not continue if `remainingPredicate == predicates`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r102486920
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala ---
    @@ -0,0 +1,95 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.dataSet
    +
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
    +import org.apache.calcite.rex.RexProgram
    +import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc}
    +import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._
    +import org.apache.flink.table.plan.schema.TableSourceTable
    +import org.apache.flink.table.sources.FilterableTableSource
    +
    +class PushFilterIntoBatchTableSourceScanRule extends RelOptRule(
    +  operand(classOf[DataSetCalc],
    +    operand(classOf[BatchTableSourceScan], none)),
    +  "PushFilterIntoBatchTableSourceScanRule") {
    +
    +  override def matches(call: RelOptRuleCall) = {
    +    val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
    +    val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
    +    scan.tableSource match {
    +      case _: FilterableTableSource =>
    +        calc.calcProgram.getCondition != null
    +      case _ => false
    +    }
    +  }
    +
    +  override def onMatch(call: RelOptRuleCall): Unit = {
    +    val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
    +    val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
    +
    +    val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource]
    +
    +    val program: RexProgram = calc.calcProgram
    +    val tst = scan.getTable.unwrap(classOf[TableSourceTable[_]])
    +    val predicate = extractPredicateExpressions(
    +      program,
    +      call.builder().getRexBuilder,
    +      tst.tableEnv.getFunctionCatalog)
    +
    +    if (predicate.length != 0) {
    +      val remainingPredicate = filterableSource.setPredicate(predicate)
    +
    +      if (verifyExpressions(predicate, remainingPredicate)) {
    --- End diff --
    
    The `FilterableTableSource` violates the method contract if this is not true. I would log a WARN message here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r101303197
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.dataSet
    +
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
    +import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc}
    +import org.apache.flink.table.plan.rules.util.RexProgramExpressionExtractor._
    +import org.apache.flink.table.sources.FilterableTableSource
    +
    +class PushFilterIntoBatchTableSourceScanRule extends RelOptRule(
    +  operand(classOf[DataSetCalc],
    +    operand(classOf[BatchTableSourceScan], none)),
    +  "PushFilterIntoBatchTableSourceScanRule") {
    +
    +  override def matches(call: RelOptRuleCall) = {
    +    val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
    +    scan.tableSource match {
    +      case _: FilterableTableSource => true
    +      case _ => false
    +    }
    +  }
    +
    +  override def onMatch(call: RelOptRuleCall): Unit = {
    +    val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
    +    val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
    +
    +    val tableSource = scan.tableSource.asInstanceOf[FilterableTableSource]
    +
    +    val expression = extractExpression(calc.calcProgram)
    +    val unusedExpr = tableSource.setPredicate(expression)
    --- End diff --
    
    Only call tableSource if predicate is not `None`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r102485722
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala ---
    @@ -0,0 +1,95 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.dataSet
    +
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
    +import org.apache.calcite.rex.RexProgram
    +import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc}
    +import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._
    +import org.apache.flink.table.plan.schema.TableSourceTable
    +import org.apache.flink.table.sources.FilterableTableSource
    +
    +class PushFilterIntoBatchTableSourceScanRule extends RelOptRule(
    +  operand(classOf[DataSetCalc],
    +    operand(classOf[BatchTableSourceScan], none)),
    +  "PushFilterIntoBatchTableSourceScanRule") {
    +
    +  override def matches(call: RelOptRuleCall) = {
    +    val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
    +    val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
    +    scan.tableSource match {
    +      case _: FilterableTableSource =>
    +        calc.calcProgram.getCondition != null
    +      case _ => false
    +    }
    +  }
    +
    +  override def onMatch(call: RelOptRuleCall): Unit = {
    +    val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
    +    val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
    +
    +    val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource]
    +
    +    val program: RexProgram = calc.calcProgram
    +    val tst = scan.getTable.unwrap(classOf[TableSourceTable[_]])
    +    val predicate = extractPredicateExpressions(
    +      program,
    +      call.builder().getRexBuilder,
    +      tst.tableEnv.getFunctionCatalog)
    +
    +    if (predicate.length != 0) {
    +      val remainingPredicate = filterableSource.setPredicate(predicate)
    +
    +      if (verifyExpressions(predicate, remainingPredicate)) {
    +
    +        val filterRexNode = getFilterExpressionAsRexNode(
    +          program.getInputRowType,
    +          scan,
    +          predicate.diff(remainingPredicate))(call.builder())
    +
    +        val newScan = new BatchTableSourceScan(
    +          scan.getCluster,
    +          scan.getTraitSet,
    +          scan.getTable,
    +          scan.tableSource,
    +          filterRexNode)
    +
    +        val newCalcProgram = rewriteRexProgram(
    --- End diff --
    
    We would need to add those conjunctive terms which could not be translated by `extractPredicateExpressions()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r101297805
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala ---
    @@ -59,8 +59,16 @@ class BatchTableSourceScan(
       }
     
       override def explainTerms(pw: RelWriter): RelWriter = {
    +    val s = tableSource match {
    +      case source: FilterableTableSource =>
    +        source.getPredicate.getOrElse("").toString.replaceAll("\\'|\\\"|\\s", "")
    +      case _ => ""
    +    }
         super.explainTerms(pw)
           .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", "))
    +      // TODO should we have this? If yes how it should look like, as in DataCalc?
    --- End diff --
    
    Yes, the filter should be in the explain string of the table source. 
    I think it would be good if it was formatted as the filter in calc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r102484272
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala ---
    @@ -0,0 +1,163 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.util
    +
    +import org.apache.calcite.rel.core.TableScan
    +import org.apache.calcite.rex._
    +import org.apache.calcite.plan.RelOptUtil
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.tools.RelBuilder
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.calcite.{FlinkTypeFactory, RexNodeWrapper}
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.validate.FunctionCatalog
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +object RexProgramExpressionExtractor {
    +
    +  /**
    +    * converts a rexProgram condition into independent CNF expressions
    +    *
    +    * @param rexProgram The RexProgram to analyze
    +    * @return converted expression
    +    */
    +  private[flink] def extractPredicateExpressions(
    +      rexProgram: RexProgram,
    +      rexBuilder: RexBuilder,
    +      catalog: FunctionCatalog): Array[Expression] = {
    +
    +    val fieldNames = getInputsWithNames(rexProgram)
    +
    +    val condition = rexProgram.getCondition
    +    if (condition == null) {
    +      return Array.empty
    +    }
    +    val call = rexProgram.expandLocalRef(condition)
    +    val cnf = RexUtil.toCnf(rexBuilder, call)
    +    val conjunctions = RelOptUtil.conjunctions(cnf)
    +    val expressions = conjunctions.asScala.map(
    +      RexNodeWrapper.wrap(_, catalog).toExpression(fieldNames)
    +    )
    +    expressions.toArray
    +  }
    +
    +  /**
    +    * verify should we apply remained expressions on
    --- End diff --
    
    Complete comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r101319389
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramExpressionExtractor.scala ---
    @@ -0,0 +1,198 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.util
    +
    +import java.util
    +
    +import org.apache.calcite.rel.core.TableScan
    +import org.apache.calcite.rex._
    +import org.apache.calcite.sql.{SqlKind, SqlOperator}
    +import org.apache.calcite.tools.RelBuilder
    +import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.sources.TableSource
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +object RexProgramExpressionExtractor {
    +
    +  /**
    +    * converts a rexProgram condition into expression
    +    *
    +    * @param rexProgram The RexProgram to analyze
    +    * @return converted expression
    +    */
    +  def extractExpression(rexProgram: RexProgram): Expression = {
    +
    +    val refInputToName = getInputsWithNames(rexProgram)
    +    val visitor = new ExpressionVisitor(refInputToName)
    +
    +    val condition = rexProgram.getCondition
    +    if (condition == null) {
    +      return null
    +    }
    +
    +    rexProgram.expandLocalRef(condition).accept(visitor)
    --- End diff --
    
    The condition should be converted into conjunctive normal form (CNF) before translating it. Calcite provides `RexUtil.toCnf()` to do that.
    
    The reason for CNF is that it makes the handling in `FilterableTableSource` much easier. We could also think about passing the conjunctive terms as a list to the `FilterableTableSource` and receiving a list of unsupported conjunctive terms. The list might be shorter but the elements may not change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r102507787
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala ---
    @@ -126,21 +156,49 @@ class TableSourceTest extends TableTestBase {
     
       @Test
       def testStreamProjectableSourceScanNoIdentityCalc(): Unit = {
    -    val (csvTable, tableName) = tableSource
    +    val (tableSource, tableName) = csvTable
         val util = streamTestUtil()
         val tEnv = util.tEnv
     
    -    tEnv.registerTableSource(tableName, csvTable)
    +    tEnv.registerTableSource(tableName, tableSource)
     
         val result = tEnv
           .scan(tableName)
           .select('id, 'score, 'first)
     
    -    val expected = sourceStreamTableNode(tableName, noCalcFields)
    +    val expected = projectableSourceStreamTableNode(tableName, noCalcFields)
         util.verifyTable(result, expected)
       }
     
       @Test
    +  def testStreamFilterableSourceScanPlanTableApi(): Unit = {
    +    val (tableSource, tableName) = filterableTableSource
    +    val util = streamTestUtil()
    +    val tEnv = util.tEnv
    +
    +    tEnv.registerTableSource(tableName, tableSource)
    +
    +    val result = tEnv
    +      .scan(tableName)
    +      .select('price, 'id, 'amount)
    +      .where("amount > 2 && price * 2 < 32")
    --- End diff --
    
    An example for an unsupported predicate would be `'id.cast(BasicTypeInfo.STRING_TYPE_INFO) === "abc"`. This throws and exception when translating it to an `Expression`.
    
    As said before, unsupported expressions should be gracefully handled by not failing but instead by not offering this `RexNode` to the `FilterableTableSource` and evaluating it in the `DataSetCalc`.
    I would suggest to use `CAST` as an example to implement the graceful handling and adding support for it once the failure-free translation works. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by tonycox <gi...@git.apache.org>.
Github user tonycox closed the pull request at:

    https://github.com/apache/flink/pull/3166


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3166: [FLINK-3849] Add FilterableTableSource interface and Rule...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3166
  
    OK, thanks for the quick response! 
    I'll point the contributor of the partition pruning to this PR. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r101299952
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala ---
    @@ -58,8 +58,16 @@ class StreamTableSourceScan(
       }
     
       override def explainTerms(pw: RelWriter): RelWriter = {
    +    val s = tableSource match {
    +      case source: FilterableTableSource =>
    +        source.getPredicate.getOrElse("").toString.replaceAll("\\'|\\\"|\\s", "")
    +      case _ => ""
    +    }
         super.explainTerms(pw)
           .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", "))
    +      // TODO should we have this? If yes how it should look like, as in DataCalc?
    +      // (current example, s = "id>2")
    --- End diff --
    
    same as for `BatchTableSourceScan`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r102491504
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala ---
    @@ -0,0 +1,163 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.util
    +
    +import org.apache.calcite.rel.core.TableScan
    +import org.apache.calcite.rex._
    +import org.apache.calcite.plan.RelOptUtil
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.tools.RelBuilder
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.calcite.{FlinkTypeFactory, RexNodeWrapper}
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.validate.FunctionCatalog
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +object RexProgramExpressionExtractor {
    +
    +  /**
    +    * converts a rexProgram condition into independent CNF expressions
    +    *
    +    * @param rexProgram The RexProgram to analyze
    +    * @return converted expression
    +    */
    +  private[flink] def extractPredicateExpressions(
    +      rexProgram: RexProgram,
    +      rexBuilder: RexBuilder,
    +      catalog: FunctionCatalog): Array[Expression] = {
    +
    +    val fieldNames = getInputsWithNames(rexProgram)
    +
    +    val condition = rexProgram.getCondition
    +    if (condition == null) {
    +      return Array.empty
    +    }
    +    val call = rexProgram.expandLocalRef(condition)
    +    val cnf = RexUtil.toCnf(rexBuilder, call)
    +    val conjunctions = RelOptUtil.conjunctions(cnf)
    +    val expressions = conjunctions.asScala.map(
    +      RexNodeWrapper.wrap(_, catalog).toExpression(fieldNames)
    +    )
    +    expressions.toArray
    +  }
    +
    +  /**
    +    * verify should we apply remained expressions on
    +    *
    +    * @param original initial expression
    +    * @param remained remained part of original expression
    +    * @return whether or not to decouple parts of the origin expression
    +    */
    +  private[flink] def verifyExpressions(
    +      original: Array[Expression],
    +      remained: Array[Expression]): Boolean =
    +    remained forall (original contains)
    +
    +  /**
    +    * Generates a new RexProgram based on new expression.
    +    *
    +    * @param rexProgram original RexProgram
    +    * @param scan input source
    +    * @param predicate filter condition (fields must be resolved)
    +    * @param relBuilder builder for converting expression to Rex
    +    */
    +  private[flink] def rewriteRexProgram(
    +      rexProgram: RexProgram,
    +      scan: TableScan,
    +      predicate: Array[Expression])(implicit relBuilder: RelBuilder): RexProgram = {
    +
    +    relBuilder.push(scan)
    +
    +    val inType = rexProgram.getInputRowType
    +    val resolvedExps = resolveFields(predicate, inType)
    +    val projs = rexProgram.getProjectList.map(rexProgram.expandLocalRef)
    +
    +    RexProgram.create(
    +      inType,
    +      projs,
    +      conjunct(resolvedExps).get.toRexNode,
    +      rexProgram.getOutputRowType,
    +      relBuilder.getRexBuilder)
    +  }
    +
    +  private[flink] def getFilterExpressionAsRexNode(
    +      inputTpe: RelDataType,
    +      scan: TableScan,
    +      exps: Array[Expression])(implicit relBuilder: RelBuilder): RexNode = {
    +    relBuilder.push(scan)
    +    val resolvedExps = resolveFields(exps, inputTpe)
    +    val fullExp = conjunct(resolvedExps)
    +    if (fullExp.isDefined) {
    +      fullExp.get.toRexNode
    +    } else {
    +      null
    +    }
    +  }
    +
    +  private def resolveFields(
    +      predicate: Array[Expression],
    +      inType: RelDataType): Array[Expression] = {
    +    val fieldTypes: Map[String, TypeInformation[_]] = inType.getFieldList
    +      .map(f => f.getName -> FlinkTypeFactory.toTypeInfo(f.getType))
    +      .toMap
    +    val rule: PartialFunction[Expression, Expression] = {
    +      case u@UnresolvedFieldReference(name) =>
    +        ResolvedFieldReference(name, fieldTypes(name))
    +    }
    +    predicate.map(_.postOrderTransform(rule))
    +  }
    +
    +  private def conjunct(exps: Array[Expression]): Option[Expression] = {
    --- End diff --
    
    This method could be more more concise if we use `reduce` instead of recursive pair-wise conjunctions, i.e.,
    
    `AND(a, AND(b, AND (c, d)))` instead of `AND(AND(a, b), AND(c, d))`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r102473702
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala ---
    @@ -0,0 +1,163 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.util
    +
    +import org.apache.calcite.rel.core.TableScan
    +import org.apache.calcite.rex._
    +import org.apache.calcite.plan.RelOptUtil
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.tools.RelBuilder
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.calcite.{FlinkTypeFactory, RexNodeWrapper}
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.validate.FunctionCatalog
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +object RexProgramExpressionExtractor {
    +
    +  /**
    +    * converts a rexProgram condition into independent CNF expressions
    +    *
    +    * @param rexProgram The RexProgram to analyze
    +    * @return converted expression
    +    */
    +  private[flink] def extractPredicateExpressions(
    +      rexProgram: RexProgram,
    +      rexBuilder: RexBuilder,
    +      catalog: FunctionCatalog): Array[Expression] = {
    +
    +    val fieldNames = getInputsWithNames(rexProgram)
    +
    +    val condition = rexProgram.getCondition
    +    if (condition == null) {
    +      return Array.empty
    +    }
    +    val call = rexProgram.expandLocalRef(condition)
    --- End diff --
    
    Please add a few comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3166: [FLINK-3849] Add FilterableTableSource interface and Rule...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3166
  
    I'm not sure about limiting the operator types @twalthr. If the TableSource connects for example to a relational database, also complex predicates could be evaluated in the source. There might also be data stores that can handle more complex data types. In the end, the TableSource should decide which predicates to except or not. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r102473754
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala ---
    @@ -0,0 +1,163 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.util
    +
    +import org.apache.calcite.rel.core.TableScan
    +import org.apache.calcite.rex._
    +import org.apache.calcite.plan.RelOptUtil
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.tools.RelBuilder
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.calcite.{FlinkTypeFactory, RexNodeWrapper}
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.validate.FunctionCatalog
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +object RexProgramExpressionExtractor {
    --- End diff --
    
    Rename to `RexProgramPredicateExtractor`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r102485241
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala ---
    @@ -0,0 +1,163 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.util
    +
    +import org.apache.calcite.rel.core.TableScan
    +import org.apache.calcite.rex._
    +import org.apache.calcite.plan.RelOptUtil
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.tools.RelBuilder
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.calcite.{FlinkTypeFactory, RexNodeWrapper}
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.validate.FunctionCatalog
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +object RexProgramExpressionExtractor {
    +
    +  /**
    +    * converts a rexProgram condition into independent CNF expressions
    +    *
    +    * @param rexProgram The RexProgram to analyze
    +    * @return converted expression
    +    */
    +  private[flink] def extractPredicateExpressions(
    +      rexProgram: RexProgram,
    +      rexBuilder: RexBuilder,
    +      catalog: FunctionCatalog): Array[Expression] = {
    --- End diff --
    
    We should also return those `RexNodes` that we cannot translate, i.e., return (Array[Expression], Array[RexNode])


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r102472465
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala ---
    @@ -53,13 +55,20 @@ class StreamTableSourceScan(
           cluster,
           traitSet,
           getTable,
    -      tableSource
    +      tableSource,
    +      filterCondition
         )
       }
     
       override def explainTerms(pw: RelWriter): RelWriter = {
    -    super.explainTerms(pw)
    +    val terms = super.explainTerms(pw)
           .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", "))
    +    if (filterCondition != null) {
    +      import scala.collection.JavaConverters._
    --- End diff --
    
    Please move import up


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3166: [FLINK-3849] Add FilterableTableSource interface and Rule...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/3166
  
    I quickly went through the code. IMHO, we should limit the expressions given to the FilterableTableSource to a minimum. E.g. if you look at the [HCatInputFormat](https://hive.apache.org/javadocs/hcat-r0.5.0/inputoutput.html#Filter+Operators) they only support very basic operations `'and', 'or', 'like', '()', '=', '<>' (not equal), '<', '>', '<=' and '>='`. I think that is also enough for our use case. We should also only support basic datatypes. Then we add a Filter after the TableSource in case the TableSource does not support every predicate. What do you think @fhueske?
    
    The expression translation should also be more robust, just parsing the Calcite string is very error-prone, can you construct the Expression using the case-classes for that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r101306528
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala ---
    @@ -0,0 +1,38 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sources
    +
    +import org.apache.flink.table.expressions.Expression
    +
    +/**
    +  * Adds support for filtering push-down to a [[TableSource]].
    +  * A [[TableSource]] extending this interface is able to filter the fields of the return table.
    +  *
    +  */
    +trait FilterableTableSource {
    +
    +  /** return an predicate expression that was set. */
    +  def getPredicate: Option[Expression]
    --- End diff --
    
    Do not use `Option` here. This interface might be implemented in Java.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3166: [FLINK-3849] Add FilterableTableSource interface and Rule...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/3166
  
    @fhueske the problem is that we have currently no tool that converts RexNodes back to expressions. If we want to support all types of expressions, we need to implement a conversion method for all RexNodes (including Scalar functions) or send the RexNodes in the TableSource.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r102492031
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala ---
    @@ -0,0 +1,38 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sources
    +
    +import org.apache.flink.table.expressions.Expression
    +
    +/**
    +  * Adds support for filtering push-down to a [[TableSource]].
    +  * A [[TableSource]] extending this interface is able to filter the fields of the return table.
    +  *
    +  */
    +trait FilterableTableSource {
    +
    +  /** return an predicate expression that was set. */
    +  def getPredicate: Array[Expression]
    +
    +  /**
    +    * @param predicate a filter expression that will be applied to fields to return.
    --- End diff --
    
    The method docs should be more extensive. We have to explain that the expressions in the array are conjunctive terms which have to be accepted completely or not at all. All non-accepted terms have to be returned unmodified.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r102471926
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala ---
    @@ -54,20 +55,27 @@ class BatchTableSourceScan(
           cluster,
           traitSet,
           getTable,
    -      tableSource
    +      tableSource,
    +      filterCondition
         )
       }
     
       override def explainTerms(pw: RelWriter): RelWriter = {
    -    super.explainTerms(pw)
    +    val terms = super.explainTerms(pw)
           .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", "))
    +      if (filterCondition != null) {
    +        import scala.collection.JavaConverters._
    --- End diff --
    
    Please move the import up


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3166: [FLINK-3849] Add FilterableTableSource interface and Rule...

Posted by tonycox <gi...@git.apache.org>.
Github user tonycox commented on the issue:

    https://github.com/apache/flink/pull/3166
  
    Hi @fhueske I cant continue on this PR, have not enough time for now. If you need implementation of it immediately I will unassign


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r102506293
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractorTest.scala ---
    @@ -0,0 +1,182 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.util
    +
    +import java.math.BigDecimal
    +
    +import org.apache.calcite.adapter.java.JavaTypeFactory
    +import org.apache.calcite.plan._
    +import org.apache.calcite.plan.volcano.VolcanoPlanner
    +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
    +import org.apache.calcite.rel.core.TableScan
    +import org.apache.calcite.rex.{RexBuilder, RexProgram, RexProgramBuilder}
    +import org.apache.calcite.sql.`type`.SqlTypeName._
    +import org.apache.calcite.sql.fun.SqlStdOperatorTable
    +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
    +import org.apache.flink.table.expressions.{Expression, ExpressionParser}
    +import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._
    +import org.apache.flink.table.plan.schema.CompositeRelDataType
    +import org.apache.flink.table.utils.CommonTestData
    +import org.junit.Test
    +import org.junit.Assert._
    +
    +import scala.collection.JavaConverters._
    +
    +class RexProgramExpressionExtractorTest {
    --- End diff --
    
    I think it would be good to add a few more corner cases to the tests such as unsupported RexNodes or functions, single predicates, etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by godfreyhe <gi...@git.apache.org>.
Github user godfreyhe commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r102886003
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala ---
    @@ -0,0 +1,95 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.datastream
    +
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
    +import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan}
    +import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._
    +import org.apache.flink.table.plan.schema.TableSourceTable
    +import org.apache.flink.table.sources.FilterableTableSource
    +
    +class PushFilterIntoStreamTableSourceScanRule extends RelOptRule(
    +  operand(classOf[DataStreamCalc],
    +    operand(classOf[StreamTableSourceScan], none)),
    +  "PushFilterIntoStreamTableSourceScanRule") {
    +
    +  override def matches(call: RelOptRuleCall) = {
    +    val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc]
    +    val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan]
    +    scan.tableSource match {
    +      case _: FilterableTableSource =>
    +        calc.calcProgram.getCondition != null
    +      case _ => false
    +    }
    +  }
    +
    +  override def onMatch(call: RelOptRuleCall): Unit = {
    +    val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc]
    +    val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan]
    +
    +    val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource]
    +
    +    val program = calc.calcProgram
    +    val tst = scan.getTable.unwrap(classOf[TableSourceTable[_]])
    +    val predicates = extractPredicateExpressions(
    +      program,
    +      call.builder().getRexBuilder,
    +      tst.tableEnv.getFunctionCatalog)
    +
    +    if (predicates.length != 0) {
    +      val remainingPredicate = filterableSource.setPredicate(predicates)
    --- End diff --
    
    if remainingPredicate is empty, we should remove calc node also.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r102476247
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RexNodeWrapper.scala ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.calcite
    +
    +import org.apache.calcite.rex._
    +import org.apache.calcite.sql._
    +import org.apache.flink.table.api.TableException
    +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference}
    +import org.apache.flink.table.validate.FunctionCatalog
    +import org.apache.flink.table.calcite.RexNodeWrapper._
    +
    +abstract class RexNodeWrapper(rex: RexNode) {
    +  def get: RexNode = rex
    +  def toExpression(names: Map[RexInputRef, String]): Expression
    +}
    +
    +case class RexLiteralWrapper(literal: RexLiteral) extends RexNodeWrapper(literal) {
    +  override def toExpression(names: Map[RexInputRef, String]): Expression = {
    +    val typeInfo = FlinkTypeFactory.toTypeInfo(literal.getType)
    +    Literal(literal.getValue, typeInfo)
    +  }
    +}
    +
    +case class RexInputWrapper(input: RexInputRef) extends RexNodeWrapper(input) {
    +  override def toExpression(names: Map[RexInputRef, String]): Expression = {
    +    val typeInfo = FlinkTypeFactory.toTypeInfo(input.getType)
    +    ResolvedFieldReference(names(input), typeInfo)
    +  }
    +}
    +
    +case class RexCallWrapper(
    +    call: RexCall,
    +    operands: Seq[RexNodeWrapper]) extends RexNodeWrapper(call) {
    +
    +  override def toExpression(names: Map[RexInputRef, String]): Expression = {
    +    val ops = operands.map(_.toExpression(names))
    +    call.op match {
    +      case function: SqlFunction =>
    +        lookupFunction(replace(function.getName), ops)
    +      case postfix: SqlPostfixOperator =>
    +        lookupFunction(replace(postfix.getName), ops)
    +      case operator@_ =>
    +        val name = replace(s"${operator.kind}")
    +        lookupFunction(name, ops)
    +    }
    +  }
    +
    +  def replace(str: String): String = {
    +    str.replaceAll("\\s|_", "")
    +  }
    +}
    +
    +object RexNodeWrapper {
    +
    +  private var catalog: Option[FunctionCatalog] = None
    +
    +  def wrap(rex: RexNode, functionCatalog: FunctionCatalog): RexNodeWrapper = {
    +    catalog = Option(functionCatalog)
    +    rex.accept(new WrapperVisitor)
    +  }
    +
    +  private[table] def lookupFunction(name: String, operands: Seq[Expression]): Expression = {
    +    catalog.getOrElse(throw TableException("FunctionCatalog was not defined"))
    +      .lookupFunction(name, operands)
    +  }
    +}
    +
    +class WrapperVisitor extends RexVisitorImpl[RexNodeWrapper](true) {
    --- End diff --
    
    We have to make sure that we do not miss anything here.
    IMO, we should try to translate as much as possible, but if something is not possible, we should make sure that we recognize that and do not offer this term to the `FilterableTableSource`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3166: [FLINK-3849] Add FilterableTableSource interface and Rule...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3166
  
    Hi @tonycox, the follow up PR #3520 to this one was merged including the changes of this PR.
    Can you close this PR? 
    
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r102487221
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala ---
    @@ -0,0 +1,163 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.util
    +
    +import org.apache.calcite.rel.core.TableScan
    +import org.apache.calcite.rex._
    +import org.apache.calcite.plan.RelOptUtil
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.tools.RelBuilder
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.calcite.{FlinkTypeFactory, RexNodeWrapper}
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.validate.FunctionCatalog
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +object RexProgramExpressionExtractor {
    +
    +  /**
    +    * converts a rexProgram condition into independent CNF expressions
    +    *
    +    * @param rexProgram The RexProgram to analyze
    +    * @return converted expression
    +    */
    +  private[flink] def extractPredicateExpressions(
    +      rexProgram: RexProgram,
    +      rexBuilder: RexBuilder,
    +      catalog: FunctionCatalog): Array[Expression] = {
    +
    +    val fieldNames = getInputsWithNames(rexProgram)
    +
    +    val condition = rexProgram.getCondition
    +    if (condition == null) {
    +      return Array.empty
    +    }
    +    val call = rexProgram.expandLocalRef(condition)
    +    val cnf = RexUtil.toCnf(rexBuilder, call)
    +    val conjunctions = RelOptUtil.conjunctions(cnf)
    +    val expressions = conjunctions.asScala.map(
    +      RexNodeWrapper.wrap(_, catalog).toExpression(fieldNames)
    +    )
    +    expressions.toArray
    +  }
    +
    +  /**
    +    * verify should we apply remained expressions on
    +    *
    +    * @param original initial expression
    +    * @param remained remained part of original expression
    +    * @return whether or not to decouple parts of the origin expression
    +    */
    +  private[flink] def verifyExpressions(
    +      original: Array[Expression],
    +      remained: Array[Expression]): Boolean =
    +    remained forall (original contains)
    +
    +  /**
    +    * Generates a new RexProgram based on new expression.
    +    *
    +    * @param rexProgram original RexProgram
    +    * @param scan input source
    +    * @param predicate filter condition (fields must be resolved)
    +    * @param relBuilder builder for converting expression to Rex
    +    */
    +  private[flink] def rewriteRexProgram(
    +      rexProgram: RexProgram,
    +      scan: TableScan,
    +      predicate: Array[Expression])(implicit relBuilder: RelBuilder): RexProgram = {
    --- End diff --
    
    We need to inject all conjunctive `RexNode` terms which could not be translated into `Expression` here as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r101394224
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramExpressionExtractor.scala ---
    @@ -0,0 +1,198 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.util
    +
    +import java.util
    +
    +import org.apache.calcite.rel.core.TableScan
    +import org.apache.calcite.rex._
    +import org.apache.calcite.sql.{SqlKind, SqlOperator}
    +import org.apache.calcite.tools.RelBuilder
    +import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.sources.TableSource
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +object RexProgramExpressionExtractor {
    +
    +  /**
    +    * converts a rexProgram condition into expression
    +    *
    +    * @param rexProgram The RexProgram to analyze
    +    * @return converted expression
    +    */
    +  def extractExpression(rexProgram: RexProgram): Expression = {
    +
    +    val refInputToName = getInputsWithNames(rexProgram)
    +    val visitor = new ExpressionVisitor(refInputToName)
    +
    +    val condition = rexProgram.getCondition
    +    if (condition == null) {
    +      return null
    +    }
    +
    +    rexProgram.expandLocalRef(condition).accept(visitor)
    +    val parsedExpression = ExpressionParser.parseExpression(visitor.getStringPredicate)
    +
    +    parsedExpression
    +  }
    +
    +  /**
    +    * verify can the original expression be divided into `new` expression
    +    * and remainder part without loss of logical correctness
    +    *
    +    * @param original initial expression
    +    * @param lump part of original expression
    +    * @return whether or not to decouple parts of the origin expression
    +    */
    +  def verifyExpressions(original: Expression, lump: Expression): Boolean = {
    +    if (original == null & lump == null) {
    +      return false
    +    }
    +    if (original.children.isEmpty | !checkOperator(original)) {
    +      return false
    +    }
    +    val head = original.children.head
    +    val last = original.children.last
    +    if (head.checkEquals(lump)) {
    +      return checkOperator(original)
    +    }
    +    if (last.checkEquals(lump)) {
    +      return checkOperator(original)
    +    }
    +    verifyExpressions(head, lump) match {
    +      case true => true
    +      case _ => verifyExpressions(last, lump)
    +    }
    +  }
    +
    +  private def checkOperator(original: Expression): Boolean = {
    +    original match {
    +      case o: Or => false
    +      case _ => true
    +    }
    +  }
    +
    +  /**
    +    * Generates a new RexProgram based on new expression.
    +    *
    +    * @param rexProgram original RexProgram
    +    * @param scan input source
    +    * @param expression filter condition (fields must be resolved)
    +    * @param tableSource source to get names and type of table
    +    * @param relBuilder builder for converting expression to Rex
    +    */
    +  def rewriteRexProgram(
    +      rexProgram: RexProgram,
    +      scan: TableScan,
    +      expression: Expression,
    +      tableSource: TableSource[_])(implicit relBuilder: RelBuilder): RexProgram = {
    --- End diff --
    
    the `tableSource` parameter can be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r102470146
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RexNodeWrapper.scala ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.calcite
    +
    +import org.apache.calcite.rex._
    +import org.apache.calcite.sql._
    +import org.apache.flink.table.api.TableException
    +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference}
    +import org.apache.flink.table.validate.FunctionCatalog
    +import org.apache.flink.table.calcite.RexNodeWrapper._
    +
    +abstract class RexNodeWrapper(rex: RexNode) {
    --- End diff --
    
    Please add some documentation about the purpose of this class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r101397995
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala ---
    @@ -98,4 +106,134 @@ object CommonTestData {
           this(null, null)
         }
       }
    +
    +  def getMockTableEnvironment: TableEnvironment = new MockTableEnvironment
    +
    +  def getFilterableTableSource(
    +    fieldNames: Array[String] = Array[String](
    +      "name", "id", "amount", "price"),
    +    fieldTypes: Array[TypeInformation[_]] = Array(
    +      BasicTypeInfo.STRING_TYPE_INFO,
    +      BasicTypeInfo.LONG_TYPE_INFO,
    +      BasicTypeInfo.INT_TYPE_INFO,
    +      BasicTypeInfo.DOUBLE_TYPE_INFO)) = new TestFilterableTableSource(fieldNames, fieldTypes)
    +}
    +
    +class MockTableEnvironment extends TableEnvironment(new TableConfig) {
    +
    +  override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = ???
    +
    +  override protected def checkValidTableName(name: String): Unit = ???
    +
    +  override protected def getBuiltInRuleSet: RuleSet = ???
    +
    +  override def sql(query: String): Table = ???
    +
    +  override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = ???
    +}
    +
    +class TestFilterableTableSource(
    +    fieldNames: Array[String],
    +    fieldTypes: Array[TypeInformation[_]])
    +  extends BatchTableSource[Row]
    +    with StreamTableSource[Row]
    +    with FilterableTableSource
    +    with DefinedFieldNames {
    +
    +  private var filterPredicate: Option[Expression] = None
    +
    +  /** Returns the data of the table as a [[DataSet]]. */
    +  override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
    +    execEnv.fromCollection[Row](
    +      generateDynamicCollection(33, fieldNames, filterPredicate).asJava, getReturnType)
    +  }
    +
    +  /** Returns the data of the table as a [[DataStream]]. */
    +  def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    +    execEnv.fromCollection[Row](
    +      generateDynamicCollection(33, fieldNames, filterPredicate).asJava, getReturnType)
    +  }
    +
    +  private def generateDynamicCollection(
    +    num: Int,
    +    fieldNames: Array[String],
    +    predicate: Option[Expression]): Seq[Row] = {
    +
    +    if (predicate.isEmpty) {
    +      throw new RuntimeException("filter expression was not set")
    +    }
    +
    +    val literal = predicate.get.children.last
    +      .asInstanceOf[Literal]
    +      .value.asInstanceOf[Int]
    +
    +    def shouldCreateRow(value: Int): Boolean = {
    +      value > literal
    +    }
    +
    +    def createRow(row: Row, name: String, pos: Int, value: Int): Unit = {
    --- End diff --
    
    With hard-coded schema, this methods would not be necessary


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r101302395
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramExpressionExtractor.scala ---
    @@ -0,0 +1,198 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.util
    +
    +import java.util
    +
    +import org.apache.calcite.rel.core.TableScan
    +import org.apache.calcite.rex._
    +import org.apache.calcite.sql.{SqlKind, SqlOperator}
    +import org.apache.calcite.tools.RelBuilder
    +import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.sources.TableSource
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +object RexProgramExpressionExtractor {
    +
    +  /**
    +    * converts a rexProgram condition into expression
    +    *
    +    * @param rexProgram The RexProgram to analyze
    +    * @return converted expression
    +    */
    +  def extractExpression(rexProgram: RexProgram): Expression = {
    --- End diff --
    
    Rename to `extractPredicateExpression` and return `Option[Expression]`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r101393867
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramExpressionExtractor.scala ---
    @@ -0,0 +1,198 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.util
    +
    +import java.util
    +
    +import org.apache.calcite.rel.core.TableScan
    +import org.apache.calcite.rex._
    +import org.apache.calcite.sql.{SqlKind, SqlOperator}
    +import org.apache.calcite.tools.RelBuilder
    +import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.sources.TableSource
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +object RexProgramExpressionExtractor {
    +
    +  /**
    +    * converts a rexProgram condition into expression
    +    *
    +    * @param rexProgram The RexProgram to analyze
    +    * @return converted expression
    +    */
    +  def extractExpression(rexProgram: RexProgram): Expression = {
    +
    +    val refInputToName = getInputsWithNames(rexProgram)
    +    val visitor = new ExpressionVisitor(refInputToName)
    +
    +    val condition = rexProgram.getCondition
    +    if (condition == null) {
    +      return null
    +    }
    +
    +    rexProgram.expandLocalRef(condition).accept(visitor)
    +    val parsedExpression = ExpressionParser.parseExpression(visitor.getStringPredicate)
    +
    +    parsedExpression
    +  }
    +
    +  /**
    +    * verify can the original expression be divided into `new` expression
    +    * and remainder part without loss of logical correctness
    +    *
    +    * @param original initial expression
    +    * @param lump part of original expression
    +    * @return whether or not to decouple parts of the origin expression
    +    */
    +  def verifyExpressions(original: Expression, lump: Expression): Boolean = {
    +    if (original == null & lump == null) {
    +      return false
    +    }
    +    if (original.children.isEmpty | !checkOperator(original)) {
    +      return false
    +    }
    +    val head = original.children.head
    +    val last = original.children.last
    +    if (head.checkEquals(lump)) {
    +      return checkOperator(original)
    +    }
    +    if (last.checkEquals(lump)) {
    +      return checkOperator(original)
    +    }
    +    verifyExpressions(head, lump) match {
    +      case true => true
    +      case _ => verifyExpressions(last, lump)
    +    }
    +  }
    +
    +  private def checkOperator(original: Expression): Boolean = {
    +    original match {
    +      case o: Or => false
    +      case _ => true
    +    }
    +  }
    +
    +  /**
    +    * Generates a new RexProgram based on new expression.
    +    *
    +    * @param rexProgram original RexProgram
    +    * @param scan input source
    +    * @param expression filter condition (fields must be resolved)
    +    * @param tableSource source to get names and type of table
    +    * @param relBuilder builder for converting expression to Rex
    +    */
    +  def rewriteRexProgram(
    +      rexProgram: RexProgram,
    +      scan: TableScan,
    +      expression: Expression,
    +      tableSource: TableSource[_])(implicit relBuilder: RelBuilder): RexProgram = {
    +
    +    if (expression != null) {
    +
    +      val names = TableEnvironment.getFieldNames(tableSource)
    +
    +      val nameToType = names
    +        .zip(TableEnvironment.getFieldTypes(tableSource)).toMap
    +
    +      relBuilder.push(scan)
    +
    +      val rule: PartialFunction[Expression, Expression] = {
    +        case u@UnresolvedFieldReference(name) =>
    +          ResolvedFieldReference(name, nameToType(name))
    +      }
    +
    +      val newProjectExpressions = rewriteProjects(rexProgram, names)
    --- End diff --
    
    `val projs = rexProgram.getProjectList.map(rexProgram.expandLocalRef)` should be sufficient here. Since we only apply a filter, the input schema did not change.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3166: [FLINK-3849] Add FilterableTableSource interface and Rule...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3166
  
    Hi @tonycox, what are your plans regarding this PR?
    I'm asking because we would like to put another feature ([FLINK-5859](https://issues.apache.org/jira/browse/FLINK-5859): Partition Pruning TableSource) on top of `FilterableTableSource`.
    
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3166: [FLINK-3849] Add FilterableTableSource interface and Rule...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3166
  
    Scalar functions can of course not be executed by the source. 
    
    However, sources need to group the filter conditions into supported and unsupported expressions in any case. Unsupported would also include "I don't know this expression". So I think, there would not be additional implementation overhead on the TableSource side but potential gains in performance when pushing everything down.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r101316851
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramExpressionExtractor.scala ---
    @@ -0,0 +1,198 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.util
    +
    +import java.util
    +
    +import org.apache.calcite.rel.core.TableScan
    +import org.apache.calcite.rex._
    +import org.apache.calcite.sql.{SqlKind, SqlOperator}
    +import org.apache.calcite.tools.RelBuilder
    +import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.sources.TableSource
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +object RexProgramExpressionExtractor {
    +
    +  /**
    +    * converts a rexProgram condition into expression
    +    *
    +    * @param rexProgram The RexProgram to analyze
    +    * @return converted expression
    +    */
    +  def extractExpression(rexProgram: RexProgram): Expression = {
    +
    +    val refInputToName = getInputsWithNames(rexProgram)
    +    val visitor = new ExpressionVisitor(refInputToName)
    +
    +    val condition = rexProgram.getCondition
    +    if (condition == null) {
    +      return null
    +    }
    +
    +    rexProgram.expandLocalRef(condition).accept(visitor)
    +    val parsedExpression = ExpressionParser.parseExpression(visitor.getStringPredicate)
    --- End diff --
    
    Converting by generating and parsing strings is not very reliable.
    
    We should rather map `RexNode` directly to `Expression`. 
    @twalthr suggested to add the translation logic to the corresponding `Expression` next to the `toRexNode()` method. We would need to find a "dictionary" to identify the relevant `Expression` for a `RexNode` though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3166: [FLINK-3849] Add FilterableTableSource interface and Rule...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/3166
  
    I think it would be great if the mapping of Table API Expression to RexNode could happen within each Expression class. As the opposite of `toRexNode`.  Instead of one large util class that does the conversion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3166: [FLINK-3849] Add FilterableTableSource interface and Rule...

Posted by tonycox <gi...@git.apache.org>.
Github user tonycox commented on the issue:

    https://github.com/apache/flink/pull/3166
  
    @fhueske could you look at this approach of transfering `RexNode` to `Expression`, it's stil in wip state, but I need your adjustment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r101305739
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.datastream
    +
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
    +import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan}
    +import org.apache.flink.table.plan.rules.util.RexProgramExpressionExtractor._
    +import org.apache.flink.table.sources.FilterableTableSource
    +
    +class PushFilterIntoStreamTableSourceScanRule extends RelOptRule(
    --- End diff --
    
    Same comments as for the `PushFilterIntoBatchTableSourceScanRule`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3166: [FLINK-3849] Add FilterableTableSource interface and Rule...

Posted by tonycox <gi...@git.apache.org>.
Github user tonycox commented on the issue:

    https://github.com/apache/flink/pull/3166
  
    Hi @fhueske, 
    what do you think about this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3166: [FLINK-3849] Add FilterableTableSource interface and Rule...

Posted by tonycox <gi...@git.apache.org>.
Github user tonycox commented on the issue:

    https://github.com/apache/flink/pull/3166
  
    However I think we need a tool for converting RexNodes to Table API Expressions. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r101302191
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.dataSet
    +
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
    +import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc}
    +import org.apache.flink.table.plan.rules.util.RexProgramExpressionExtractor._
    +import org.apache.flink.table.sources.FilterableTableSource
    +
    +class PushFilterIntoBatchTableSourceScanRule extends RelOptRule(
    +  operand(classOf[DataSetCalc],
    +    operand(classOf[BatchTableSourceScan], none)),
    +  "PushFilterIntoBatchTableSourceScanRule") {
    +
    +  override def matches(call: RelOptRuleCall) = {
    +    val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
    +    scan.tableSource match {
    +      case _: FilterableTableSource => true
    +      case _ => false
    +    }
    +  }
    +
    +  override def onMatch(call: RelOptRuleCall): Unit = {
    +    val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
    +    val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
    +
    +    val tableSource = scan.tableSource.asInstanceOf[FilterableTableSource]
    +
    +    val expression = extractExpression(calc.calcProgram)
    +    val unusedExpr = tableSource.setPredicate(expression)
    --- End diff --
    
    rename to `remainingPredicate`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r102912230
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala ---
    @@ -0,0 +1,95 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.datastream
    +
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
    +import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan}
    +import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._
    +import org.apache.flink.table.plan.schema.TableSourceTable
    +import org.apache.flink.table.sources.FilterableTableSource
    +
    +class PushFilterIntoStreamTableSourceScanRule extends RelOptRule(
    +  operand(classOf[DataStreamCalc],
    +    operand(classOf[StreamTableSourceScan], none)),
    +  "PushFilterIntoStreamTableSourceScanRule") {
    +
    +  override def matches(call: RelOptRuleCall) = {
    +    val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc]
    +    val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan]
    +    scan.tableSource match {
    +      case _: FilterableTableSource =>
    +        calc.calcProgram.getCondition != null
    +      case _ => false
    +    }
    +  }
    +
    +  override def onMatch(call: RelOptRuleCall): Unit = {
    +    val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc]
    +    val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan]
    +
    +    val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource]
    +
    +    val program = calc.calcProgram
    +    val tst = scan.getTable.unwrap(classOf[TableSourceTable[_]])
    +    val predicates = extractPredicateExpressions(
    +      program,
    +      call.builder().getRexBuilder,
    +      tst.tableEnv.getFunctionCatalog)
    +
    +    if (predicates.length != 0) {
    +      val remainingPredicate = filterableSource.setPredicate(predicates)
    --- End diff --
    
    Right, but only if it does not do any projection.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r101303533
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.dataSet
    +
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
    +import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc}
    +import org.apache.flink.table.plan.rules.util.RexProgramExpressionExtractor._
    +import org.apache.flink.table.sources.FilterableTableSource
    +
    +class PushFilterIntoBatchTableSourceScanRule extends RelOptRule(
    +  operand(classOf[DataSetCalc],
    +    operand(classOf[BatchTableSourceScan], none)),
    +  "PushFilterIntoBatchTableSourceScanRule") {
    +
    +  override def matches(call: RelOptRuleCall) = {
    +    val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
    +    scan.tableSource match {
    +      case _: FilterableTableSource => true
    +      case _ => false
    +    }
    +  }
    +
    +  override def onMatch(call: RelOptRuleCall): Unit = {
    +    val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
    +    val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
    +
    +    val tableSource = scan.tableSource.asInstanceOf[FilterableTableSource]
    +
    +    val expression = extractExpression(calc.calcProgram)
    +    val unusedExpr = tableSource.setPredicate(expression)
    +
    +    if (verifyExpressions(expression, unusedExpr)) {
    --- End diff --
    
    Why do we need this check?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3166: [FLINK-3849] Add FilterableTableSource interface and Rule...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/3166
  
    Thanks for working on this @tonycox. It seems that this PR contains commits that are not related to this issue (changes in TypeExtractor?). Can you remove those?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r101394185
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramExpressionExtractor.scala ---
    @@ -0,0 +1,198 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.util
    +
    +import java.util
    +
    +import org.apache.calcite.rel.core.TableScan
    +import org.apache.calcite.rex._
    +import org.apache.calcite.sql.{SqlKind, SqlOperator}
    +import org.apache.calcite.tools.RelBuilder
    +import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.sources.TableSource
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +object RexProgramExpressionExtractor {
    +
    +  /**
    +    * converts a rexProgram condition into expression
    +    *
    +    * @param rexProgram The RexProgram to analyze
    +    * @return converted expression
    +    */
    +  def extractExpression(rexProgram: RexProgram): Expression = {
    +
    +    val refInputToName = getInputsWithNames(rexProgram)
    +    val visitor = new ExpressionVisitor(refInputToName)
    +
    +    val condition = rexProgram.getCondition
    +    if (condition == null) {
    +      return null
    +    }
    +
    +    rexProgram.expandLocalRef(condition).accept(visitor)
    +    val parsedExpression = ExpressionParser.parseExpression(visitor.getStringPredicate)
    +
    +    parsedExpression
    +  }
    +
    +  /**
    +    * verify can the original expression be divided into `new` expression
    +    * and remainder part without loss of logical correctness
    +    *
    +    * @param original initial expression
    +    * @param lump part of original expression
    +    * @return whether or not to decouple parts of the origin expression
    +    */
    +  def verifyExpressions(original: Expression, lump: Expression): Boolean = {
    +    if (original == null & lump == null) {
    +      return false
    +    }
    +    if (original.children.isEmpty | !checkOperator(original)) {
    +      return false
    +    }
    +    val head = original.children.head
    +    val last = original.children.last
    +    if (head.checkEquals(lump)) {
    +      return checkOperator(original)
    +    }
    +    if (last.checkEquals(lump)) {
    +      return checkOperator(original)
    +    }
    +    verifyExpressions(head, lump) match {
    +      case true => true
    +      case _ => verifyExpressions(last, lump)
    +    }
    +  }
    +
    +  private def checkOperator(original: Expression): Boolean = {
    +    original match {
    +      case o: Or => false
    +      case _ => true
    +    }
    +  }
    +
    +  /**
    +    * Generates a new RexProgram based on new expression.
    +    *
    +    * @param rexProgram original RexProgram
    +    * @param scan input source
    +    * @param expression filter condition (fields must be resolved)
    +    * @param tableSource source to get names and type of table
    +    * @param relBuilder builder for converting expression to Rex
    +    */
    +  def rewriteRexProgram(
    +      rexProgram: RexProgram,
    +      scan: TableScan,
    +      expression: Expression,
    +      tableSource: TableSource[_])(implicit relBuilder: RelBuilder): RexProgram = {
    +
    +    if (expression != null) {
    +
    +      val names = TableEnvironment.getFieldNames(tableSource)
    --- End diff --
    
    We can get the name-type mapping also without the `tableSource` as follows:
    
    ```
    val inType = rexProgram.getInputRowType
    val fieldTypes: Map[String, TypeInformation[_]] = inType.getFieldList
            .map(f => f.getName -> FlinkTypeFactory.toTypeInfo(f.getType))
            .toMap
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---