You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "lincoln-lil (via GitHub)" <gi...@apache.org> on 2023/03/17 01:43:02 UTC

[GitHub] [flink] lincoln-lil opened a new pull request, #22197: [FLINK-31487][table-planner] Add targetColumns to DynamicTableSink#Context

lincoln-lil opened a new pull request, #22197:
URL: https://github.com/apache/flink/pull/22197

   ## What is the purpose of the change
   The internal SinkRuntimeProviderContext will support new constructor with targetColumns param, this can be used by connectors to recognize the user-specified column list.
   
   Note: currently nested columns in column list of an insert/update statement is unsupported (as described in [FLINK-31301](https://issues.apache.org/jira/browse/FLINK-31301) & [FLINK-31344](https://issues.apache.org/jira/browse/FLINK-31344)), so we can make this pr support simple columns first and then support nested columns after the two issues been fixed.
   
   ## Brief change log
   * add new getTargetColumns to DynamicTableSink#Context
   * add targetColumns info to related relnodes and execnodes
   * add related tests
   
   ## Verifying this change
   * add partial insert releated case include ut & it
   
   ## Does this pull request potentially affect one of the following parts:
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
     - The serializers: (no )
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (java-docs)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #22197: [FLINK-31487][table-planner] Add targetColumns to DynamicTableSink#Context

Posted by "lincoln-lil (via GitHub)" <gi...@apache.org>.
lincoln-lil commented on code in PR #22197:
URL: https://github.com/apache/flink/pull/22197#discussion_r1141026773


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala:
##########
@@ -163,6 +192,21 @@ class PartialInsertTest(isBatch: Boolean) extends TableTestBase {
         "SELECT a,b,c,d,e,123 FROM MyTable"
     )
   }
+
+  @Test
+  def testPartialInsert(): Unit = {

Review Comment:
   Ok



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java:
##########
@@ -121,6 +135,11 @@ public QueryOperation getChild() {
         return child;
     }
 
+    /** return an empty array when no column list specified. */

Review Comment:
   Good catch! comments should be updated here since the unnecessarily 'int[0][]' was replaced by null.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala:
##########
@@ -401,6 +404,15 @@ object PreValidateReWriter {
     SqlUtil.newContextException(pos, e)
   }
 
+  private def validateUnsupportedCompositeColumn(id: SqlIdentifier): Unit = {
+    assert(id != null)
+    if (!id.isSimple) {
+      val pos = id.getParserPosition
+      // TODO add accurate msg s"column name must be a simple identifier, composite column name '${id.toString}' is not supported yet"

Review Comment:
   The length check seems ok when running style check, I didn't find a suitable resource for the error message when do poc, and will see if we can get rid of the TODO.  



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java:
##########
@@ -1558,8 +1572,16 @@ private Operation convertUpdate(SqlUpdate sqlUpdate) {
                         catalogManager.qualifyIdentifier(unresolvedTableIdentifier));
         // get query
         PlannerQueryOperation queryOperation = new PlannerQueryOperation(tableModify);
+        // TODO
+        List<String> updateColumnList =
+                sqlUpdate.getTargetColumnList().stream()
+                        .map(c -> ((SqlIdentifier) c).getSimple())
+                        .collect(Collectors.toList());
         return new SinkModifyOperation(
-                contextResolvedTable, queryOperation, SinkModifyOperation.ModifyType.UPDATE);
+                contextResolvedTable,
+                queryOperation,
+                null, // targetColumns

Review Comment:
   yes, here should be updateColumnList, seems some local changes get lost.. I'll update it.



##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml:
##########


Review Comment:
   I think manually reorder the test xml file will add more cost for maintaining since the auto generated mechanism doesn't care about the method order, and once we add some new tests in the future then just re-generate it is ok, WDYT? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] luoyuxia commented on pull request #22197: [FLINK-31487][table-planner] Add targetColumns to DynamicTableSink#Context

Posted by "luoyuxia (via GitHub)" <gi...@apache.org>.
luoyuxia commented on PR #22197:
URL: https://github.com/apache/flink/pull/22197#issuecomment-1475509547

   @lincoln-lil Thanks for updating. LGTM once the resolve conflict.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil merged pull request #22197: [FLINK-31487][table-planner] Add targetColumns to DynamicTableSink#Context

Posted by "lincoln-lil (via GitHub)" <gi...@apache.org>.
lincoln-lil merged PR #22197:
URL: https://github.com/apache/flink/pull/22197


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] luoyuxia commented on a diff in pull request #22197: [FLINK-31487][table-planner] Add targetColumns to DynamicTableSink#Context

Posted by "luoyuxia (via GitHub)" <gi...@apache.org>.
luoyuxia commented on code in PR #22197:
URL: https://github.com/apache/flink/pull/22197#discussion_r1139990233


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java:
##########
@@ -167,6 +168,26 @@ interface Context {
          * @see LogicalType#supportsOutputConversion(Class)
          */
         DataStructureConverter createDataStructureConverter(DataType consumedDataType);
+
+        /**
+         * Returns an {@link Optional} array of column index paths related to user specified target
+         * column list or {@link Optional#empty()} when not specified. The array indices are 0-based
+         * and support composite columns within (possibly nested) structures.
+         *
+         * <p>This information comes from the column list of the DML clause, e.g., for a sink table
+         * t1 which schema is: {@code a STRING, b ROW < b1 INT, b2 STRING>, c BIGINT}
+         *
+         * <ul>
+         *   <li>insert: 'insert into t1(a, b.b2) ...', the column list will be 'a, b.b2', and will
+         *       return {@code [[0], [1, 1]]}. The statement 'insert into target select ...' without
+         *       specifying a column list will return {@link Optional#empty()}.
+         *   <li>update: 'update target set a=1, b.b1=2 where ...', the column list will be 'a,

Review Comment:
   nit
   ```suggestion
            *   <li>update: 'update t1 set a=1, b.b1=2 where ...', the column list will be 'a,
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java:
##########
@@ -167,6 +168,26 @@ interface Context {
          * @see LogicalType#supportsOutputConversion(Class)
          */
         DataStructureConverter createDataStructureConverter(DataType consumedDataType);
+
+        /**
+         * Returns an {@link Optional} array of column index paths related to user specified target
+         * column list or {@link Optional#empty()} when not specified. The array indices are 0-based
+         * and support composite columns within (possibly nested) structures.
+         *
+         * <p>This information comes from the column list of the DML clause, e.g., for a sink table
+         * t1 which schema is: {@code a STRING, b ROW < b1 INT, b2 STRING>, c BIGINT}
+         *
+         * <ul>
+         *   <li>insert: 'insert into t1(a, b.b2) ...', the column list will be 'a, b.b2', and will
+         *       return {@code [[0], [1, 1]]}. The statement 'insert into target select ...' without
+         *       specifying a column list will return {@link Optional#empty()}.
+         *   <li>update: 'update target set a=1, b.b1=2 where ...', the column list will be 'a,
+         *       b.b1', will return {@code [[0], [1, 0]]}.

Review Comment:
   ```suggestion
            *       b.b1', and will return {@code [[0], [1, 0]]}.
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java:
##########
@@ -167,6 +168,26 @@ interface Context {
          * @see LogicalType#supportsOutputConversion(Class)
          */
         DataStructureConverter createDataStructureConverter(DataType consumedDataType);
+
+        /**
+         * Returns an {@link Optional} array of column index paths related to user specified target
+         * column list or {@link Optional#empty()} when not specified. The array indices are 0-based
+         * and support composite columns within (possibly nested) structures.
+         *
+         * <p>This information comes from the column list of the DML clause, e.g., for a sink table
+         * t1 which schema is: {@code a STRING, b ROW < b1 INT, b2 STRING>, c BIGINT}
+         *
+         * <ul>
+         *   <li>insert: 'insert into t1(a, b.b2) ...', the column list will be 'a, b.b2', and will
+         *       return {@code [[0], [1, 1]]}. The statement 'insert into target select ...' without

Review Comment:
   ```suggestion
            *       return {@code [[0], [1, 1]]}. The statement 'insert into t1 select ...' without
   ```
   ?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java:
##########
@@ -121,6 +135,11 @@ public QueryOperation getChild() {
         return child;
     }
 
+    /** return an empty array when no column list specified. */

Review Comment:
   From the comments, seems like it should be 
   ```
   if (targetColumns == null) {
   return new int[0][0];;
   } else {
   return targetColumns
   }
   ```
   ?
   



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java:
##########
@@ -1558,8 +1572,16 @@ private Operation convertUpdate(SqlUpdate sqlUpdate) {
                         catalogManager.qualifyIdentifier(unresolvedTableIdentifier));
         // get query
         PlannerQueryOperation queryOperation = new PlannerQueryOperation(tableModify);
+        // TODO

Review Comment:
   It'll be better we can also add the jira [FLINK-31344](https://issues.apache.org/jira/browse/FLINK-31344) track in here



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java:
##########
@@ -1942,6 +1959,15 @@ public Optional<Integer> getParallelism() {
             }
         }
 
+        private static int[] getIndexArray(List<String> targetColumns, String[] allColumns) {

Review Comment:
   Seems this method is not used?



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala:
##########
@@ -163,6 +192,21 @@ class PartialInsertTest(isBatch: Boolean) extends TableTestBase {
         "SELECT a,b,c,d,e,123 FROM MyTable"
     )
   }
+
+  @Test
+  def testPartialInsert(): Unit = {

Review Comment:
   `testPartialInsertWithGroupby`?



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala:
##########
@@ -401,6 +404,15 @@ object PreValidateReWriter {
     SqlUtil.newContextException(pos, e)
   }
 
+  private def validateUnsupportedCompositeColumn(id: SqlIdentifier): Unit = {
+    assert(id != null)
+    if (!id.isSimple) {
+      val pos = id.getParserPosition
+      // TODO add accurate msg s"column name must be a simple identifier, composite column name '${id.toString}' is not supported yet"

Review Comment:
   got error from my IDEA `File line length exceeds 100 characters. `



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala:
##########
@@ -401,6 +404,15 @@ object PreValidateReWriter {
     SqlUtil.newContextException(pos, e)
   }
 
+  private def validateUnsupportedCompositeColumn(id: SqlIdentifier): Unit = {
+    assert(id != null)
+    if (!id.isSimple) {
+      val pos = id.getParserPosition
+      // TODO add accurate msg s"column name must be a simple identifier, composite column name '${id.toString}' is not supported yet"

Review Comment:
   Seems like we miss this todo?



##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml:
##########


Review Comment:
   In the previos implemtation, I'm intended to make the method's order in this file match with the order in `RowLevelUpdateTest. Now, I found it's out of order.  Do you think it matter?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java:
##########
@@ -1558,8 +1572,16 @@ private Operation convertUpdate(SqlUpdate sqlUpdate) {
                         catalogManager.qualifyIdentifier(unresolvedTableIdentifier));
         // get query
         PlannerQueryOperation queryOperation = new PlannerQueryOperation(tableModify);
+        // TODO
+        List<String> updateColumnList =
+                sqlUpdate.getTargetColumnList().stream()
+                        .map(c -> ((SqlIdentifier) c).getSimple())
+                        .collect(Collectors.toList());
         return new SinkModifyOperation(
-                contextResolvedTable, queryOperation, SinkModifyOperation.ModifyType.UPDATE);
+                contextResolvedTable,
+                queryOperation,
+                null, // targetColumns

Review Comment:
   pass `updateColumnList`?



##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSinkTest.xml:
##########
@@ -16,6 +16,15 @@ See the License for the specific language governing permissions and
 limitations under the License.
 -->
 <Root>
+  <TestCase name="testManagedTableSinkWithEnableCheckpointing">

Review Comment:
   Why add this? Seems the previous code miss the this for the test `org.apache.flink.table.planner.plan.batch.sql#testManagedTableSinkWithEnableCheckpointing`.
    But I'm really curious about why the previous code can pass test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] luoyuxia commented on a diff in pull request #22197: [FLINK-31487][table-planner] Add targetColumns to DynamicTableSink#Context

Posted by "luoyuxia (via GitHub)" <gi...@apache.org>.
luoyuxia commented on code in PR #22197:
URL: https://github.com/apache/flink/pull/22197#discussion_r1141551040


##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml:
##########


Review Comment:
   Make sense to me. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on pull request #22197: [FLINK-31487][table-planner] Add targetColumns to DynamicTableSink#Context

Posted by "lincoln-lil (via GitHub)" <gi...@apache.org>.
lincoln-lil commented on PR #22197:
URL: https://github.com/apache/flink/pull/22197#issuecomment-1475811433

   @luoyuxia I've rebased lastest master and updated the pr, PTAL when you have time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #22197: [FLINK-31487][table-planner] Add targetColumns to DynamicTableSink#Context

Posted by "flinkbot (via GitHub)" <gi...@apache.org>.
flinkbot commented on PR #22197:
URL: https://github.com/apache/flink/pull/22197#issuecomment-1472992082

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "cf28dc6603d3041ec88c7e8f137324201916032f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "cf28dc6603d3041ec88c7e8f137324201916032f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * cf28dc6603d3041ec88c7e8f137324201916032f UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org