You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by suez1224 <gi...@git.apache.org> on 2018/01/26 07:43:41 UTC

[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

GitHub user suez1224 opened a pull request:

    https://github.com/apache/flink/pull/5367

    [FLINK-7923][Table API & SQL] Support field access of composite array element in SQL

    Note: This is based on FLINK-7934, will rebase once FLINK-7934 is resolved.
    
    ## What is the purpose of the change
    
    Support field access of composite array element in SQL. 
    
    
    ## Brief change log
    
      - add handling to calcite dot operator to support field access of composite array element in SQL
      - add unittests to verify that it works for tuple array, row array, pojo array and case class array
    
    
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    
      - *Added unittests to verify the query plan*
      - *Added integration tests for batch/streaming for pojo/case class/tuple/row type*
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/suez1224/flink flink-7923

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5367.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5367
    
----
commit 9e915e4144703582843b0f31bffc1481648d0119
Author: Shuyi Chen <sh...@...>
Date:   2018-01-10T00:52:56Z

    Upgrade to Calcite 1.15

commit 7a8328e4750ae95196f0b8ba20c6dff22c59ec08
Author: Shuyi Chen <sh...@...>
Date:   2018-01-25T23:36:36Z

    Support access of subfields of Array element if the element is a composite type (e.g. case class, pojo, tuple or row).

----


---

[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5367#discussion_r165515039
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala ---
    @@ -469,6 +473,148 @@ class SqlITCase extends StreamingWithStateTestBase {
         assertEquals(expected.sorted, StreamITCase.testResults.sorted)
       }
     
    +  @Test
    +  def testArrayElementAtFromTableForTuple(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.clear
    +
    +    val data = List(
    +      (1, Array((12, 45), (2, 5))),
    --- End diff --
    
    Added null check.
    for nested tuple input, it wont work for now due to https://issues.apache.org/jira/browse/CALCITE-2162. I've submitted a fix to it, should be available in Calcite 1.16.


---

[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5367#discussion_r164266897
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ---
    @@ -984,6 +987,63 @@ object ScalarOperators {
         }
       }
     
    +  def generateDot(codeGenerator: CodeGenerator,
    +                  dot: RexCall,
    +                  record: GeneratedExpression,
    +                  subField: GeneratedExpression)
    +  : GeneratedExpression = {
    +    val nullTerm = newName("isNull")
    +    val resultTerm = newName("result")
    +    val resultType = FlinkTypeFactory.toTypeInfo(dot.getType)
    +    val resultTypeTerm = boxedTypeTermForTypeInfo(resultType)
    +    dot.operands.get(0).getType match {
    +      case crdt: CompositeRelDataType => {
    +        val fieldName = dot.operands.get(1).asInstanceOf[RexLiteral]
    +          .getValue.asInstanceOf[NlsString].getValue
    +        if (crdt.compositeType.isInstanceOf[TupleTypeInfo[_]]) {
    +           return GeneratedExpression(resultTerm, nullTerm,
    +            s"""
    +                   |${record.code}
    +                   |${subField.code}
    +                   |${resultTypeTerm} $resultTerm =
    +                   |  (${resultTypeTerm}) ${record.resultTerm}.productElement(
    --- End diff --
    
    NPE will be thrown if ${record.resultTerm} is null 


---

[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5367#discussion_r164266900
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ---
    @@ -984,6 +987,63 @@ object ScalarOperators {
         }
       }
     
    +  def generateDot(codeGenerator: CodeGenerator,
    +                  dot: RexCall,
    +                  record: GeneratedExpression,
    +                  subField: GeneratedExpression)
    +  : GeneratedExpression = {
    +    val nullTerm = newName("isNull")
    +    val resultTerm = newName("result")
    +    val resultType = FlinkTypeFactory.toTypeInfo(dot.getType)
    +    val resultTypeTerm = boxedTypeTermForTypeInfo(resultType)
    +    dot.operands.get(0).getType match {
    +      case crdt: CompositeRelDataType => {
    +        val fieldName = dot.operands.get(1).asInstanceOf[RexLiteral]
    +          .getValue.asInstanceOf[NlsString].getValue
    +        if (crdt.compositeType.isInstanceOf[TupleTypeInfo[_]]) {
    +           return GeneratedExpression(resultTerm, nullTerm,
    +            s"""
    +                   |${record.code}
    +                   |${subField.code}
    +                   |${resultTypeTerm} $resultTerm =
    +                   |  (${resultTypeTerm}) ${record.resultTerm}.productElement(
    +                   |    ${fieldName.substring(1).toInt} - 1);
    +                   |boolean $nullTerm =${resultTerm} == null;
    +                   |""".stripMargin, resultType)
    +        } else if (crdt.compositeType.isInstanceOf[CaseClassTypeInfo[_]]) {
    +          return GeneratedExpression(resultTerm, nullTerm,
    +            s"""
    +               |${record.code}
    +               |${resultTypeTerm} $resultTerm =
    +               |  (${resultTypeTerm}) ${record.resultTerm}.${fieldName}();
    +               |boolean $nullTerm =${resultTerm} == null;
    +               |""".stripMargin, resultType)
    +        } else if (crdt.compositeType.isInstanceOf[PojoTypeInfo[_]]) {
    +          return GeneratedExpression(resultTerm, nullTerm,
    +            s"""
    +               |${record.code}
    +               |${resultTypeTerm} $resultTerm =
    +               |  (${resultTypeTerm}) ${record.resultTerm}.${fieldName};
    --- End diff --
    
    NPE will be thrown 


---

[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5367#discussion_r165038256
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CalcTest.scala ---
    @@ -0,0 +1,47 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api.stream.sql
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.utils.TableTestBase
    +import org.apache.flink.table.utils.TableTestUtil._
    +import org.junit.Test
    +
    +class CalcTest  extends TableTestBase {
    +
    +  @Test
    +  def testArrayElement(): Unit = {
    +    val util = streamTestUtil()
    +    util.addTable[(Long, Array[(String, Int)])]("MyTable", 'a, 'b)
    +
    +    val expected = unaryNode(
    +      "DataStreamCalc",
    +      streamTableNode(0),
    +      term("select",
    +        "a",
    +        "DOT(ITEM(b, 1), '_1') AS b11"
    --- End diff --
    
    Actually we don't need a full table test for this. As it only tests a scalar operation. Can you move the test to `CompositeAccessTest` and test it for all APIs.


---

[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5367


---

[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5367#discussion_r164266907
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala ---
    @@ -469,6 +473,148 @@ class SqlITCase extends StreamingWithStateTestBase {
         assertEquals(expected.sorted, StreamITCase.testResults.sorted)
       }
     
    +  @Test
    +  def testArrayElementAtFromTableForTuple(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.clear
    +
    +    val data = List(
    +      (1, Array((12, 45), (2, 5))),
    --- End diff --
    
    1. Add null tuple input test
    2. Add nested tuple input test


---

[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5367#discussion_r164266896
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ---
    @@ -984,6 +987,63 @@ object ScalarOperators {
         }
       }
     
    +  def generateDot(codeGenerator: CodeGenerator,
    --- End diff --
    
    Arguments of the method should be indented. 


---

[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5367#discussion_r165560235
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ---
    @@ -984,6 +987,63 @@ object ScalarOperators {
         }
       }
     
    +  def generateDot(codeGenerator: CodeGenerator,
    --- End diff --
    
    But I agree I should refactor the code in visitFieldAccess() to reuse it as much as possible.


---

[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5367#discussion_r165039357
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ---
    @@ -984,6 +987,63 @@ object ScalarOperators {
         }
       }
     
    +  def generateDot(codeGenerator: CodeGenerator,
    --- End diff --
    
    I think this method is not really necessary. We already have logic for accessing composite types (see `visitFieldAccess()`). Maybe we just have to make the methods there a bit more generic.


---

[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5367#discussion_r164266903
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ---
    @@ -984,6 +987,63 @@ object ScalarOperators {
         }
       }
     
    +  def generateDot(codeGenerator: CodeGenerator,
    +                  dot: RexCall,
    +                  record: GeneratedExpression,
    +                  subField: GeneratedExpression)
    +  : GeneratedExpression = {
    +    val nullTerm = newName("isNull")
    +    val resultTerm = newName("result")
    +    val resultType = FlinkTypeFactory.toTypeInfo(dot.getType)
    +    val resultTypeTerm = boxedTypeTermForTypeInfo(resultType)
    +    dot.operands.get(0).getType match {
    +      case crdt: CompositeRelDataType => {
    +        val fieldName = dot.operands.get(1).asInstanceOf[RexLiteral]
    +          .getValue.asInstanceOf[NlsString].getValue
    +        if (crdt.compositeType.isInstanceOf[TupleTypeInfo[_]]) {
    +           return GeneratedExpression(resultTerm, nullTerm,
    +            s"""
    +                   |${record.code}
    +                   |${subField.code}
    +                   |${resultTypeTerm} $resultTerm =
    +                   |  (${resultTypeTerm}) ${record.resultTerm}.productElement(
    +                   |    ${fieldName.substring(1).toInt} - 1);
    +                   |boolean $nullTerm =${resultTerm} == null;
    +                   |""".stripMargin, resultType)
    +        } else if (crdt.compositeType.isInstanceOf[CaseClassTypeInfo[_]]) {
    +          return GeneratedExpression(resultTerm, nullTerm,
    +            s"""
    +               |${record.code}
    +               |${resultTypeTerm} $resultTerm =
    +               |  (${resultTypeTerm}) ${record.resultTerm}.${fieldName}();
    +               |boolean $nullTerm =${resultTerm} == null;
    +               |""".stripMargin, resultType)
    +        } else if (crdt.compositeType.isInstanceOf[PojoTypeInfo[_]]) {
    +          return GeneratedExpression(resultTerm, nullTerm,
    +            s"""
    +               |${record.code}
    +               |${resultTypeTerm} $resultTerm =
    +               |  (${resultTypeTerm}) ${record.resultTerm}.${fieldName};
    +               |boolean $nullTerm =${resultTerm} == null;
    +               |""".stripMargin, resultType)
    +        } else if (crdt.compositeType.isInstanceOf[RowTypeInfo]) {
    +          val fieldIndex = dot.operands.get(0).getType.asInstanceOf[CompositeRelDataType]
    +            .compositeType.getFieldIndex(fieldName)
    +          return GeneratedExpression(resultTerm, nullTerm,
    +            s"""
    +               |${record.code}
    +               |${resultTypeTerm} $resultTerm =
    +               |  (${resultTypeTerm}) ${record.resultTerm}.getField(${fieldIndex});
    --- End diff --
    
    NPE will be thrown


---

[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5367#discussion_r165527528
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ---
    @@ -984,6 +987,63 @@ object ScalarOperators {
         }
       }
     
    +  def generateDot(codeGenerator: CodeGenerator,
    --- End diff --
    
    In Calcite, once it sees array element access, the subsequent field access is translated into DOT RexCall, not RexFieldAccess. Therefore, we need to a custom handling for the DOT RexCall.


---

[GitHub] flink issue #5367: [FLINK-7923][Table API & SQL] Support field access of com...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on the issue:

    https://github.com/apache/flink/pull/5367
  
    @hequn8128 @twalthr could you please take another look? I've refactored the code to reuse the generateFieldAccess() code, also added null test. Nested tuple wont work now due to https://issues.apache.org/jira/browse/CALCITE-2162. I've submitted a PR to fix it in Calcite.


---

[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5367#discussion_r165560261
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CalcTest.scala ---
    @@ -0,0 +1,47 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api.stream.sql
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.utils.TableTestBase
    +import org.apache.flink.table.utils.TableTestUtil._
    +import org.junit.Test
    +
    +class CalcTest  extends TableTestBase {
    +
    +  @Test
    +  def testArrayElement(): Unit = {
    +    val util = streamTestUtil()
    +    util.addTable[(Long, Array[(String, Int)])]("MyTable", 'a, 'b)
    +
    +    val expected = unaryNode(
    +      "DataStreamCalc",
    +      streamTableNode(0),
    +      term("select",
    +        "a",
    +        "DOT(ITEM(b, 1), '_1') AS b11"
    --- End diff --
    
    Will fix it.


---

[GitHub] flink issue #5367: [FLINK-7923][Table API & SQL] Support field access of com...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on the issue:

    https://github.com/apache/flink/pull/5367
  
    @hequn8128 , thanks for the review. Where do you think we can add it in sql.md?


---

[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5367#discussion_r164266898
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ---
    @@ -984,6 +987,63 @@ object ScalarOperators {
         }
       }
     
    +  def generateDot(codeGenerator: CodeGenerator,
    +                  dot: RexCall,
    +                  record: GeneratedExpression,
    +                  subField: GeneratedExpression)
    +  : GeneratedExpression = {
    +    val nullTerm = newName("isNull")
    +    val resultTerm = newName("result")
    +    val resultType = FlinkTypeFactory.toTypeInfo(dot.getType)
    +    val resultTypeTerm = boxedTypeTermForTypeInfo(resultType)
    +    dot.operands.get(0).getType match {
    +      case crdt: CompositeRelDataType => {
    +        val fieldName = dot.operands.get(1).asInstanceOf[RexLiteral]
    +          .getValue.asInstanceOf[NlsString].getValue
    +        if (crdt.compositeType.isInstanceOf[TupleTypeInfo[_]]) {
    +           return GeneratedExpression(resultTerm, nullTerm,
    +            s"""
    +                   |${record.code}
    +                   |${subField.code}
    +                   |${resultTypeTerm} $resultTerm =
    +                   |  (${resultTypeTerm}) ${record.resultTerm}.productElement(
    +                   |    ${fieldName.substring(1).toInt} - 1);
    +                   |boolean $nullTerm =${resultTerm} == null;
    +                   |""".stripMargin, resultType)
    +        } else if (crdt.compositeType.isInstanceOf[CaseClassTypeInfo[_]]) {
    +          return GeneratedExpression(resultTerm, nullTerm,
    +            s"""
    +               |${record.code}
    +               |${resultTypeTerm} $resultTerm =
    +               |  (${resultTypeTerm}) ${record.resultTerm}.${fieldName}();
    --- End diff --
    
    NPE will be thrown 


---

[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5367#discussion_r164266892
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CalcTest.scala ---
    @@ -0,0 +1,47 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api.stream.sql
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.utils.TableTestBase
    +import org.apache.flink.table.utils.TableTestUtil._
    +import org.junit.Test
    +
    +class CalcTest  extends TableTestBase {
    --- End diff --
    
    Remove redundant blank between `CalcTest` and `extends`


---

[GitHub] flink issue #5367: [FLINK-7923][Table API & SQL] Support field access of com...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/5367
  
    Btw I will remove the null tests because fields of tuples and case classes are not allowed to be null. The serializers would throw an exception.


---

[GitHub] flink issue #5367: [FLINK-7923][Table API & SQL] Support field access of com...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on the issue:

    https://github.com/apache/flink/pull/5367
  
    @twalthr rebased, could you please take another look? Thanks a lot.


---

[GitHub] flink issue #5367: [FLINK-7923][Table API & SQL] Support field access of com...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/5367
  
    Thanks for the update @suez1224. The changes look good. I will merge this...


---