You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tonycox <gi...@git.apache.org> on 2017/02/06 12:57:27 UTC

[GitHub] flink pull request #3269: [FLINK-5698] Add NestedFieldsProjectableTableSourc...

GitHub user tonycox opened a pull request:

    https://github.com/apache/flink/pull/3269

    [FLINK-5698] Add NestedFieldsProjectableTableSource trait

    - add extraction of RexFieldAccess method
    - complete PushProjectIntoBatchTableSourceScanRule with new interface
    
    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [ ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed
    
    Should we test the NestedFieldsProjectableTableSource in unit test?

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tonycox/flink nestingTableSource

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3269.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3269
    
----
commit fa8b9f2b62066c832bd36e17db4499c875d2d25f
Author: tonycox <an...@epam.com>
Date:   2017-02-06T12:32:45Z

    [FLINK-5698] Add NestedFieldsProjectableTableSource trait
    
    - add extraction of RexFieldAccess method
    - complete PushProjectIntoBatchTableSourceScanRule with new interface

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3269: [FLINK-5698] Add NestedFieldsProjectableTableSourc...

Posted by tonycox <gi...@git.apache.org>.
Github user tonycox commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3269#discussion_r106208275
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractor.scala ---
    @@ -84,6 +108,49 @@ object RexProgramProjectExtractor {
     }
     
     /**
    +  * A RexVisitor to extract used nested input fields
    +  */
    +class RefFieldAccessorVisitor(
    +    names: List[String],
    +    usedFields: Array[Int]) extends RexVisitorImpl[Unit](true) {
    +
    +  private val projectedFields = new util.ArrayList[Array[String]]
    +
    +  names.foreach { n =>
    +    projectedFields.add(Array.empty)
    +  }
    +
    +  private val order: Map[Int, Int] = names.indices.zip(usedFields).map(_.swap).toMap
    --- End diff --
    
    exactly


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3269: [FLINK-5698] Add NestedFieldsProjectableTableSourc...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3269#discussion_r101887738
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractor.scala ---
    @@ -84,6 +105,39 @@ object RexProgramProjectExtractor {
     }
     
     /**
    +  * A RexVisitor to extract used nested input fields
    +  */
    +class RefFieldAccessorVisitor(
    +    usedFields: Array[Int],
    +    names: List[String])
    +  extends RexVisitorImpl[Unit](true) {
    +
    +  private val group = usedFields.toList
    +  private var nestedFields = mutable.LinkedHashSet[String]()
    +
    +  def getNestedFields: Array[String] = nestedFields.toArray
    +
    +  override def visitFieldAccess(fieldAccess: RexFieldAccess): Unit = {
    +    fieldAccess.getReferenceExpr match {
    +      case ref: RexInputRef =>
    +        nestedFields += s"${names(ref.getIndex)}.${fieldAccess.getField.getName}"
    --- End diff --
    
    Yes, the parent of `RexFieldAccess` can be also a `RexFieldAccess`. We should take care of that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3269: [FLINK-5698] Add NestedFieldsProjectableTableSource trait

Posted by tonycox <gi...@git.apache.org>.
Github user tonycox commented on the issue:

    https://github.com/apache/flink/pull/3269
  
    @fhueske What dou you think about this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3269: [FLINK-5698] Add NestedFieldsProjectableTableSourc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3269#discussion_r103958345
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala ---
    @@ -0,0 +1,55 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sources
    +
    +/**
    +  * Adds support for projection push-down to a [[TableSource]] with nested fields.
    +  * A [[TableSource]] extending this interface is able
    +  * to project the nested fields of the return table.
    +  *
    +  * @tparam T The return type of the [[NestedFieldsProjectableTableSource]].
    +  */
    +trait NestedFieldsProjectableTableSource[T] extends ProjectableTableSource[T] {
    +
    +  /**
    +    * Creates a copy of the [[NestedFieldsProjectableTableSource]]
    +    * that projects its output on the specified nested fields.
    +    *
    +    * @param fields The indexes of the fields to return.
    +    * @param nestedFields hold the nested fields and has identical size with fields array
    +    *
    +    * e.g.
    +    * tableSchema = {
    +    *       id,
    +    *       student<\school<\city, tuition>, age, name>,
    +    *       teacher<\age, name>
    +    *       }
    +    *
    +    * select (id, student.school.city, student.age, teacher)
    +    *
    +    * fields = field = [0, 1, 2]
    +    * nestedFields  \[\[], ["school.city", "age"], ["*"\]\]
    --- End diff --
    
    That would also be OK with me, but the documentation would need to be adapted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3269: [FLINK-5698] Add NestedFieldsProjectableTableSourc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3269#discussion_r101739816
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sources
    +
    +/**
    +  * Adds support for projection push-down to a [[TableSource]] with nested fields.
    +  * A [[TableSource]] extending this interface is able
    +  * to project the nested fields of the return table.
    +  *
    +  * @tparam T The return type of the [[NestedFieldsProjectableTableSource]].
    +  */
    +trait NestedFieldsProjectableTableSource[T] extends ProjectableTableSource[T] {
    +
    +  /**
    +    * Creates a copy of the [[NestedFieldsProjectableTableSource]]
    +    * that projects its output on the specified nested fields.
    +    *
    +    * @param fields The indexes of the fields to return.
    +    * @return A copy of the [[NestedFieldsProjectableTableSource]] that projects its output.
    +    */
    +  def projectNestedFields(fields: Array[String]): NestedFieldsProjectableTableSource[T]
    --- End diff --
    
    @tonycox, @wuchong: What do you think about changing the signature to something like:
    
    ```
    def projectNestedFields(fields: Array[Int], nestedFields: Array[Array[String]])
    ```
    
    It would be more aligned with the existing `ProjectableTableSource` interface.
    Both array would need to be of identical size and the String array would hold the nested fields.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3269: [FLINK-5698] Add NestedFieldsProjectableTableSourc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3269#discussion_r103959655
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractorTest.scala ---
    @@ -55,6 +56,34 @@ class RexProgramProjectExtractorTest {
       }
     
       @Test
    +  def testExtractRefNestedInputFields(): Unit = {
    +    val rexProgram = buildRexProgramWithNesting()
    +    val usedFields = extractRefInputFields(rexProgram)
    +    val usedNestedFields = extractRefNestedInputFields(rexProgram, usedFields)
    +    val expected = Array[Array[String]](Array("amount"), Array("*"))
    +    assertThat(usedNestedFields, is(expected))
    +  }
    +
    +  @Test
    +  def testExtractRefNestedInputFieldsWithNoNesting(): Unit = {
    +    val rexProgram = buildRexProgram()
    +    val usedFields = extractRefInputFields(rexProgram)
    +    val usedNestedFields = extractRefNestedInputFields(rexProgram, usedFields)
    +    val expected = Array[Array[String]](Array("*"), Array("*"), Array("*"))
    +    assertThat(usedNestedFields, is(expected))
    +  }
    +
    +  @Test
    +  def testExtractDeepRefNestedInputFields(): Unit = {
    +    val rexProgram = buildRexProgramWithDeepNesting()
    +    val usedFields = extractRefInputFields(rexProgram)
    +    val usedNestedFields = extractRefNestedInputFields(rexProgram, usedFields)
    +    val expected = Array[Array[String]](Array("amount"), Array("passport.status"))
    --- End diff --
    
    It would be good to have a test where the Array with the nested fields contains more than one entry.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3269: [FLINK-5698] Add NestedFieldsProjectableTableSourc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3269#discussion_r101981540
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sources
    +
    +/**
    +  * Adds support for projection push-down to a [[TableSource]] with nested fields.
    +  * A [[TableSource]] extending this interface is able
    +  * to project the nested fields of the return table.
    +  *
    +  * @tparam T The return type of the [[NestedFieldsProjectableTableSource]].
    +  */
    +trait NestedFieldsProjectableTableSource[T] extends ProjectableTableSource[T] {
    +
    +  /**
    +    * Creates a copy of the [[NestedFieldsProjectableTableSource]]
    +    * that projects its output on the specified nested fields.
    +    *
    +    * @param fields The indexes of the fields to return.
    +    * @return A copy of the [[NestedFieldsProjectableTableSource]] that projects its output.
    +    */
    +  def projectNestedFields(fields: Array[String]): NestedFieldsProjectableTableSource[T]
    --- End diff --
    
    Yes, either this way, or we use wildcards to select the whole object and all nested fields, i.e., your example would be done with
    
    `nestedFields = [[], ["school.city", "age"], ["*"]]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3269: [FLINK-5698] Add NestedFieldsProjectableTableSourc...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3269


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3269: [FLINK-5698] Add NestedFieldsProjectableTableSourc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3269#discussion_r103912590
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala ---
    @@ -0,0 +1,55 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sources
    +
    +/**
    +  * Adds support for projection push-down to a [[TableSource]] with nested fields.
    +  * A [[TableSource]] extending this interface is able
    +  * to project the nested fields of the return table.
    +  *
    +  * @tparam T The return type of the [[NestedFieldsProjectableTableSource]].
    +  */
    +trait NestedFieldsProjectableTableSource[T] extends ProjectableTableSource[T] {
    --- End diff --
    
    I think it's not necessary to extend from `ProjectableTableSource`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3269: [FLINK-5698] Add NestedFieldsProjectableTableSource trait

Posted by tonycox <gi...@git.apache.org>.
Github user tonycox commented on the issue:

    https://github.com/apache/flink/pull/3269
  
    Hi @wuchong 
    Thank you for excellent advice


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3269: [FLINK-5698] Add NestedFieldsProjectableTableSource trait

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/3269
  
    Hi @tonycox , thanks for this PR. 
    
    It seems that you are using column names to represent the (nested) fields that are projected. I'm afraid that it can't work when the composite columns have the same nested field name. Such as a table schema `student<name, age>, teacher<name, age>`, and the projected fields are `name, age`, we can't determine whether the `name` is `student.name` or `teacher.name`.
    
    IMO, we can use the full qualifier column names with `.` separator to represent the (nested) fields. For the above example, the projected fields could be `student.name, teacher.age`. And then we can use it to do nested field projection push down.
    
    What do you think ? 
    
    Thanks,
    Jark Wu


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3269: [FLINK-5698] Add NestedFieldsProjectableTableSourc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3269#discussion_r103918266
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractor.scala ---
    @@ -84,6 +108,49 @@ object RexProgramProjectExtractor {
     }
     
     /**
    +  * A RexVisitor to extract used nested input fields
    +  */
    +class RefFieldAccessorVisitor(
    +    names: List[String],
    +    usedFields: Array[Int]) extends RexVisitorImpl[Unit](true) {
    +
    +  private val projectedFields = new util.ArrayList[Array[String]]
    +
    +  names.foreach { n =>
    +    projectedFields.add(Array.empty)
    +  }
    +
    +  private val order: Map[Int, Int] = names.indices.zip(usedFields).map(_.swap).toMap
    --- End diff --
    
    `names` and `usedFields` might not have the same length.
    The result of `zip` has the length of the smaller of both lists which is not intended here, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3269: [FLINK-5698] Add NestedFieldsProjectableTableSourc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3269#discussion_r101755176
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractor.scala ---
    @@ -84,6 +105,39 @@ object RexProgramProjectExtractor {
     }
     
     /**
    +  * A RexVisitor to extract used nested input fields
    +  */
    +class RefFieldAccessorVisitor(
    +    usedFields: Array[Int],
    +    names: List[String])
    +  extends RexVisitorImpl[Unit](true) {
    +
    +  private val group = usedFields.toList
    +  private var nestedFields = mutable.LinkedHashSet[String]()
    +
    +  def getNestedFields: Array[String] = nestedFields.toArray
    +
    +  override def visitFieldAccess(fieldAccess: RexFieldAccess): Unit = {
    +    fieldAccess.getReferenceExpr match {
    +      case ref: RexInputRef =>
    +        nestedFields += s"${names(ref.getIndex)}.${fieldAccess.getField.getName}"
    --- End diff --
    
    I think it would be nice, if we would go deeper than just one level.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3269: [FLINK-5698] Add NestedFieldsProjectableTableSourc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3269#discussion_r103913118
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala ---
    @@ -0,0 +1,55 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sources
    +
    +/**
    +  * Adds support for projection push-down to a [[TableSource]] with nested fields.
    +  * A [[TableSource]] extending this interface is able
    +  * to project the nested fields of the return table.
    --- End diff --
    
    return -> returned


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3269: [FLINK-5698] Add NestedFieldsProjectableTableSourc...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3269#discussion_r101887737
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sources
    +
    +/**
    +  * Adds support for projection push-down to a [[TableSource]] with nested fields.
    +  * A [[TableSource]] extending this interface is able
    +  * to project the nested fields of the return table.
    +  *
    +  * @tparam T The return type of the [[NestedFieldsProjectableTableSource]].
    +  */
    +trait NestedFieldsProjectableTableSource[T] extends ProjectableTableSource[T] {
    +
    +  /**
    +    * Creates a copy of the [[NestedFieldsProjectableTableSource]]
    +    * that projects its output on the specified nested fields.
    +    *
    +    * @param fields The indexes of the fields to return.
    +    * @return A copy of the [[NestedFieldsProjectableTableSource]] that projects its output.
    +    */
    +  def projectNestedFields(fields: Array[String]): NestedFieldsProjectableTableSource[T]
    --- End diff --
    
    @fhueske , I'm fine with this, but have some questions to make sure I understand right.
    
    Say we have a complex table schema as shown below:
    
    ```
    id,
    student<school<city, tuition>, age, name>,
    teacher<age, name>
    ```
    
    The `id, student, teacher` is the first level column, and `student` have a nested `school, age, name` columns, and `school` has a nested `city, tuition` columns also.
    
    If a user select `id, student.school.city, student.age, teacher`, what the actual arguments should be?
    `field = [0, 1, 2]` and `nestedFields = `[ [], ["school.city", "age"], ["age", "name"] ]` ? 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3269: [FLINK-5698] Add NestedFieldsProjectableTableSourc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3269#discussion_r103960833
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractorTest.scala ---
    @@ -55,6 +56,34 @@ class RexProgramProjectExtractorTest {
       }
     
       @Test
    +  def testExtractRefNestedInputFields(): Unit = {
    +    val rexProgram = buildRexProgramWithNesting()
    +    val usedFields = extractRefInputFields(rexProgram)
    +    val usedNestedFields = extractRefNestedInputFields(rexProgram, usedFields)
    +    val expected = Array[Array[String]](Array("amount"), Array("*"))
    +    assertThat(usedNestedFields, is(expected))
    +  }
    +
    +  @Test
    +  def testExtractRefNestedInputFieldsWithNoNesting(): Unit = {
    +    val rexProgram = buildRexProgram()
    +    val usedFields = extractRefInputFields(rexProgram)
    +    val usedNestedFields = extractRefNestedInputFields(rexProgram, usedFields)
    +    val expected = Array[Array[String]](Array("*"), Array("*"), Array("*"))
    +    assertThat(usedNestedFields, is(expected))
    +  }
    +
    +  @Test
    +  def testExtractDeepRefNestedInputFields(): Unit = {
    +    val rexProgram = buildRexProgramWithDeepNesting()
    +    val usedFields = extractRefInputFields(rexProgram)
    +    val usedNestedFields = extractRefNestedInputFields(rexProgram, usedFields)
    +    val expected = Array[Array[String]](Array("amount"), Array("passport.status"))
    --- End diff --
    
    Another test would be to reference the nested attribute in a call, for example something like `payments.amount * 10`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3269: [FLINK-5698] Add NestedFieldsProjectableTableSource trait

Posted by tonycox <gi...@git.apache.org>.
Github user tonycox commented on the issue:

    https://github.com/apache/flink/pull/3269
  
    Hi @fhueske I've addressed all comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3269: [FLINK-5698] Add NestedFieldsProjectableTableSourc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3269#discussion_r103958236
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala ---
    @@ -0,0 +1,55 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sources
    +
    +/**
    +  * Adds support for projection push-down to a [[TableSource]] with nested fields.
    +  * A [[TableSource]] extending this interface is able
    +  * to project the nested fields of the return table.
    +  *
    +  * @tparam T The return type of the [[NestedFieldsProjectableTableSource]].
    +  */
    +trait NestedFieldsProjectableTableSource[T] extends ProjectableTableSource[T] {
    +
    +  /**
    +    * Creates a copy of the [[NestedFieldsProjectableTableSource]]
    +    * that projects its output on the specified nested fields.
    +    *
    +    * @param fields The indexes of the fields to return.
    +    * @param nestedFields hold the nested fields and has identical size with fields array
    +    *
    +    * e.g.
    +    * tableSchema = {
    +    *       id,
    +    *       student<\school<\city, tuition>, age, name>,
    +    *       teacher<\age, name>
    +    *       }
    +    *
    +    * select (id, student.school.city, student.age, teacher)
    +    *
    +    * fields = field = [0, 1, 2]
    +    * nestedFields  \[\[], ["school.city", "age"], ["*"\]\]
    --- End diff --
    
    I think with the current implementation we would get `\[\["*"], ["school.city", "age"], ["*"\]\]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3269: [FLINK-5698] Add NestedFieldsProjectableTableSource trait

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3269
  
    Hi @tonycox, thanks for the update!
    
    I'll do some minor improvements and will merge the PR.
    
    Thank you,
    Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---