You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by suez1224 <gi...@git.apache.org> on 2018/01/26 07:43:41 UTC
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
GitHub user suez1224 opened a pull request:
https://github.com/apache/flink/pull/5367
[FLINK-7923][Table API & SQL] Support field access of composite array element in SQL
Note: This is based on FLINK-7934, will rebase once FLINK-7934 is resolved.
## What is the purpose of the change
Support field access of composite array element in SQL.
## Brief change log
- add handling to calcite dot operator to support field access of composite array element in SQL
- add unittests to verify that it works for tuple array, row array, pojo array and case class array
## Verifying this change
This change added tests and can be verified as follows:
- *Added unittests to verify the query plan*
- *Added integration tests for batch/streaming for pojo/case class/tuple/row type*
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (yes)
- If yes, how is the feature documented? (not applicable)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/suez1224/flink flink-7923
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5367.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5367
----
commit 9e915e4144703582843b0f31bffc1481648d0119
Author: Shuyi Chen <sh...@...>
Date: 2018-01-10T00:52:56Z
Upgrade to Calcite 1.15
commit 7a8328e4750ae95196f0b8ba20c6dff22c59ec08
Author: Shuyi Chen <sh...@...>
Date: 2018-01-25T23:36:36Z
Support access of subfields of Array element if the element is a composite type (e.g. case class, pojo, tuple or row).
----
---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5367#discussion_r165515039
--- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala ---
@@ -469,6 +473,148 @@ class SqlITCase extends StreamingWithStateTestBase {
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+ @Test
+ def testArrayElementAtFromTableForTuple(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val data = List(
+ (1, Array((12, 45), (2, 5))),
--- End diff --
Added null check.
for nested tuple input, it wont work for now due to https://issues.apache.org/jira/browse/CALCITE-2162. I've submitted a fix to it, should be available in Calcite 1.16.
---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5367#discussion_r164266897
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ---
@@ -984,6 +987,63 @@ object ScalarOperators {
}
}
+ def generateDot(codeGenerator: CodeGenerator,
+ dot: RexCall,
+ record: GeneratedExpression,
+ subField: GeneratedExpression)
+ : GeneratedExpression = {
+ val nullTerm = newName("isNull")
+ val resultTerm = newName("result")
+ val resultType = FlinkTypeFactory.toTypeInfo(dot.getType)
+ val resultTypeTerm = boxedTypeTermForTypeInfo(resultType)
+ dot.operands.get(0).getType match {
+ case crdt: CompositeRelDataType => {
+ val fieldName = dot.operands.get(1).asInstanceOf[RexLiteral]
+ .getValue.asInstanceOf[NlsString].getValue
+ if (crdt.compositeType.isInstanceOf[TupleTypeInfo[_]]) {
+ return GeneratedExpression(resultTerm, nullTerm,
+ s"""
+ |${record.code}
+ |${subField.code}
+ |${resultTypeTerm} $resultTerm =
+ | (${resultTypeTerm}) ${record.resultTerm}.productElement(
--- End diff --
NPE will be thrown if ${record.resultTerm} is null
---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5367#discussion_r164266900
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ---
@@ -984,6 +987,63 @@ object ScalarOperators {
}
}
+ def generateDot(codeGenerator: CodeGenerator,
+ dot: RexCall,
+ record: GeneratedExpression,
+ subField: GeneratedExpression)
+ : GeneratedExpression = {
+ val nullTerm = newName("isNull")
+ val resultTerm = newName("result")
+ val resultType = FlinkTypeFactory.toTypeInfo(dot.getType)
+ val resultTypeTerm = boxedTypeTermForTypeInfo(resultType)
+ dot.operands.get(0).getType match {
+ case crdt: CompositeRelDataType => {
+ val fieldName = dot.operands.get(1).asInstanceOf[RexLiteral]
+ .getValue.asInstanceOf[NlsString].getValue
+ if (crdt.compositeType.isInstanceOf[TupleTypeInfo[_]]) {
+ return GeneratedExpression(resultTerm, nullTerm,
+ s"""
+ |${record.code}
+ |${subField.code}
+ |${resultTypeTerm} $resultTerm =
+ | (${resultTypeTerm}) ${record.resultTerm}.productElement(
+ | ${fieldName.substring(1).toInt} - 1);
+ |boolean $nullTerm =${resultTerm} == null;
+ |""".stripMargin, resultType)
+ } else if (crdt.compositeType.isInstanceOf[CaseClassTypeInfo[_]]) {
+ return GeneratedExpression(resultTerm, nullTerm,
+ s"""
+ |${record.code}
+ |${resultTypeTerm} $resultTerm =
+ | (${resultTypeTerm}) ${record.resultTerm}.${fieldName}();
+ |boolean $nullTerm =${resultTerm} == null;
+ |""".stripMargin, resultType)
+ } else if (crdt.compositeType.isInstanceOf[PojoTypeInfo[_]]) {
+ return GeneratedExpression(resultTerm, nullTerm,
+ s"""
+ |${record.code}
+ |${resultTypeTerm} $resultTerm =
+ | (${resultTypeTerm}) ${record.resultTerm}.${fieldName};
--- End diff --
NPE will be thrown
---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/5367#discussion_r165038256
--- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CalcTest.scala ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class CalcTest extends TableTestBase {
+
+ @Test
+ def testArrayElement(): Unit = {
+ val util = streamTestUtil()
+ util.addTable[(Long, Array[(String, Int)])]("MyTable", 'a, 'b)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select",
+ "a",
+ "DOT(ITEM(b, 1), '_1') AS b11"
--- End diff --
Actually we don't need a full table test for this. As it only tests a scalar operation. Can you move the test to `CompositeAccessTest` and test it for all APIs.
---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/5367
---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5367#discussion_r164266907
--- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala ---
@@ -469,6 +473,148 @@ class SqlITCase extends StreamingWithStateTestBase {
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+ @Test
+ def testArrayElementAtFromTableForTuple(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val data = List(
+ (1, Array((12, 45), (2, 5))),
--- End diff --
1. Add null tuple input test
2. Add nested tuple input test
---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5367#discussion_r164266896
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ---
@@ -984,6 +987,63 @@ object ScalarOperators {
}
}
+ def generateDot(codeGenerator: CodeGenerator,
--- End diff --
Arguments of the method should be indented.
---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5367#discussion_r165560235
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ---
@@ -984,6 +987,63 @@ object ScalarOperators {
}
}
+ def generateDot(codeGenerator: CodeGenerator,
--- End diff --
But I agree I should refactor the code in visitFieldAccess() to reuse it as much as possible.
---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/5367#discussion_r165039357
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ---
@@ -984,6 +987,63 @@ object ScalarOperators {
}
}
+ def generateDot(codeGenerator: CodeGenerator,
--- End diff --
I think this method is not really necessary. We already have logic for accessing composite types (see `visitFieldAccess()`). Maybe we just have to make the methods there a bit more generic.
---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5367#discussion_r164266903
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ---
@@ -984,6 +987,63 @@ object ScalarOperators {
}
}
+ def generateDot(codeGenerator: CodeGenerator,
+ dot: RexCall,
+ record: GeneratedExpression,
+ subField: GeneratedExpression)
+ : GeneratedExpression = {
+ val nullTerm = newName("isNull")
+ val resultTerm = newName("result")
+ val resultType = FlinkTypeFactory.toTypeInfo(dot.getType)
+ val resultTypeTerm = boxedTypeTermForTypeInfo(resultType)
+ dot.operands.get(0).getType match {
+ case crdt: CompositeRelDataType => {
+ val fieldName = dot.operands.get(1).asInstanceOf[RexLiteral]
+ .getValue.asInstanceOf[NlsString].getValue
+ if (crdt.compositeType.isInstanceOf[TupleTypeInfo[_]]) {
+ return GeneratedExpression(resultTerm, nullTerm,
+ s"""
+ |${record.code}
+ |${subField.code}
+ |${resultTypeTerm} $resultTerm =
+ | (${resultTypeTerm}) ${record.resultTerm}.productElement(
+ | ${fieldName.substring(1).toInt} - 1);
+ |boolean $nullTerm =${resultTerm} == null;
+ |""".stripMargin, resultType)
+ } else if (crdt.compositeType.isInstanceOf[CaseClassTypeInfo[_]]) {
+ return GeneratedExpression(resultTerm, nullTerm,
+ s"""
+ |${record.code}
+ |${resultTypeTerm} $resultTerm =
+ | (${resultTypeTerm}) ${record.resultTerm}.${fieldName}();
+ |boolean $nullTerm =${resultTerm} == null;
+ |""".stripMargin, resultType)
+ } else if (crdt.compositeType.isInstanceOf[PojoTypeInfo[_]]) {
+ return GeneratedExpression(resultTerm, nullTerm,
+ s"""
+ |${record.code}
+ |${resultTypeTerm} $resultTerm =
+ | (${resultTypeTerm}) ${record.resultTerm}.${fieldName};
+ |boolean $nullTerm =${resultTerm} == null;
+ |""".stripMargin, resultType)
+ } else if (crdt.compositeType.isInstanceOf[RowTypeInfo]) {
+ val fieldIndex = dot.operands.get(0).getType.asInstanceOf[CompositeRelDataType]
+ .compositeType.getFieldIndex(fieldName)
+ return GeneratedExpression(resultTerm, nullTerm,
+ s"""
+ |${record.code}
+ |${resultTypeTerm} $resultTerm =
+ | (${resultTypeTerm}) ${record.resultTerm}.getField(${fieldIndex});
--- End diff --
NPE will be thrown
---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5367#discussion_r165527528
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ---
@@ -984,6 +987,63 @@ object ScalarOperators {
}
}
+ def generateDot(codeGenerator: CodeGenerator,
--- End diff --
In Calcite, once it sees array element access, the subsequent field access is translated into DOT RexCall, not RexFieldAccess. Therefore, we need to a custom handling for the DOT RexCall.
---
[GitHub] flink issue #5367: [FLINK-7923][Table API & SQL] Support field access of com...
Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on the issue:
https://github.com/apache/flink/pull/5367
@hequn8128 @twalthr could you please take another look? I've refactored the code to reuse the generateFieldAccess() code, also added null test. Nested tuple wont work now due to https://issues.apache.org/jira/browse/CALCITE-2162. I've submitted a PR to fix it in Calcite.
---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5367#discussion_r165560261
--- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CalcTest.scala ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class CalcTest extends TableTestBase {
+
+ @Test
+ def testArrayElement(): Unit = {
+ val util = streamTestUtil()
+ util.addTable[(Long, Array[(String, Int)])]("MyTable", 'a, 'b)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select",
+ "a",
+ "DOT(ITEM(b, 1), '_1') AS b11"
--- End diff --
Will fix it.
---
[GitHub] flink issue #5367: [FLINK-7923][Table API & SQL] Support field access of com...
Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on the issue:
https://github.com/apache/flink/pull/5367
@hequn8128 , thanks for the review. Where do you think we can add it in sql.md?
---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5367#discussion_r164266898
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ---
@@ -984,6 +987,63 @@ object ScalarOperators {
}
}
+ def generateDot(codeGenerator: CodeGenerator,
+ dot: RexCall,
+ record: GeneratedExpression,
+ subField: GeneratedExpression)
+ : GeneratedExpression = {
+ val nullTerm = newName("isNull")
+ val resultTerm = newName("result")
+ val resultType = FlinkTypeFactory.toTypeInfo(dot.getType)
+ val resultTypeTerm = boxedTypeTermForTypeInfo(resultType)
+ dot.operands.get(0).getType match {
+ case crdt: CompositeRelDataType => {
+ val fieldName = dot.operands.get(1).asInstanceOf[RexLiteral]
+ .getValue.asInstanceOf[NlsString].getValue
+ if (crdt.compositeType.isInstanceOf[TupleTypeInfo[_]]) {
+ return GeneratedExpression(resultTerm, nullTerm,
+ s"""
+ |${record.code}
+ |${subField.code}
+ |${resultTypeTerm} $resultTerm =
+ | (${resultTypeTerm}) ${record.resultTerm}.productElement(
+ | ${fieldName.substring(1).toInt} - 1);
+ |boolean $nullTerm =${resultTerm} == null;
+ |""".stripMargin, resultType)
+ } else if (crdt.compositeType.isInstanceOf[CaseClassTypeInfo[_]]) {
+ return GeneratedExpression(resultTerm, nullTerm,
+ s"""
+ |${record.code}
+ |${resultTypeTerm} $resultTerm =
+ | (${resultTypeTerm}) ${record.resultTerm}.${fieldName}();
--- End diff --
NPE will be thrown
---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5367#discussion_r164266892
--- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CalcTest.scala ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class CalcTest extends TableTestBase {
--- End diff --
Remove redundant blank between `CalcTest` and `extends`
---
[GitHub] flink issue #5367: [FLINK-7923][Table API & SQL] Support field access of com...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:
https://github.com/apache/flink/pull/5367
Btw I will remove the null tests because fields of tuples and case classes are not allowed to be null. The serializers would throw an exception.
---
[GitHub] flink issue #5367: [FLINK-7923][Table API & SQL] Support field access of com...
Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on the issue:
https://github.com/apache/flink/pull/5367
@twalthr rebased, could you please take another look? Thanks a lot.
---
[GitHub] flink issue #5367: [FLINK-7923][Table API & SQL] Support field access of com...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:
https://github.com/apache/flink/pull/5367
Thanks for the update @suez1224. The changes look good. I will merge this...
---