You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "luoyuxia (via GitHub)" <gi...@apache.org> on 2023/03/17 12:02:47 UTC

[GitHub] [flink] luoyuxia commented on a diff in pull request #22197: [FLINK-31487][table-planner] Add targetColumns to DynamicTableSink#Context

luoyuxia commented on code in PR #22197:
URL: https://github.com/apache/flink/pull/22197#discussion_r1139990233


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java:
##########
@@ -167,6 +168,26 @@ interface Context {
          * @see LogicalType#supportsOutputConversion(Class)
          */
         DataStructureConverter createDataStructureConverter(DataType consumedDataType);
+
+        /**
+         * Returns an {@link Optional} array of column index paths related to user specified target
+         * column list or {@link Optional#empty()} when not specified. The array indices are 0-based
+         * and support composite columns within (possibly nested) structures.
+         *
+         * <p>This information comes from the column list of the DML clause, e.g., for a sink table
+         * t1 which schema is: {@code a STRING, b ROW < b1 INT, b2 STRING>, c BIGINT}
+         *
+         * <ul>
+         *   <li>insert: 'insert into t1(a, b.b2) ...', the column list will be 'a, b.b2', and will
+         *       return {@code [[0], [1, 1]]}. The statement 'insert into target select ...' without
+         *       specifying a column list will return {@link Optional#empty()}.
+         *   <li>update: 'update target set a=1, b.b1=2 where ...', the column list will be 'a,

Review Comment:
   nit
   ```suggestion
            *   <li>update: 'update t1 set a=1, b.b1=2 where ...', the column list will be 'a,
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java:
##########
@@ -167,6 +168,26 @@ interface Context {
          * @see LogicalType#supportsOutputConversion(Class)
          */
         DataStructureConverter createDataStructureConverter(DataType consumedDataType);
+
+        /**
+         * Returns an {@link Optional} array of column index paths related to user specified target
+         * column list or {@link Optional#empty()} when not specified. The array indices are 0-based
+         * and support composite columns within (possibly nested) structures.
+         *
+         * <p>This information comes from the column list of the DML clause, e.g., for a sink table
+         * t1 which schema is: {@code a STRING, b ROW < b1 INT, b2 STRING>, c BIGINT}
+         *
+         * <ul>
+         *   <li>insert: 'insert into t1(a, b.b2) ...', the column list will be 'a, b.b2', and will
+         *       return {@code [[0], [1, 1]]}. The statement 'insert into target select ...' without
+         *       specifying a column list will return {@link Optional#empty()}.
+         *   <li>update: 'update target set a=1, b.b1=2 where ...', the column list will be 'a,
+         *       b.b1', will return {@code [[0], [1, 0]]}.

Review Comment:
   ```suggestion
            *       b.b1', and will return {@code [[0], [1, 0]]}.
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java:
##########
@@ -167,6 +168,26 @@ interface Context {
          * @see LogicalType#supportsOutputConversion(Class)
          */
         DataStructureConverter createDataStructureConverter(DataType consumedDataType);
+
+        /**
+         * Returns an {@link Optional} array of column index paths related to user specified target
+         * column list or {@link Optional#empty()} when not specified. The array indices are 0-based
+         * and support composite columns within (possibly nested) structures.
+         *
+         * <p>This information comes from the column list of the DML clause, e.g., for a sink table
+         * t1 which schema is: {@code a STRING, b ROW < b1 INT, b2 STRING>, c BIGINT}
+         *
+         * <ul>
+         *   <li>insert: 'insert into t1(a, b.b2) ...', the column list will be 'a, b.b2', and will
+         *       return {@code [[0], [1, 1]]}. The statement 'insert into target select ...' without

Review Comment:
   ```suggestion
            *       return {@code [[0], [1, 1]]}. The statement 'insert into t1 select ...' without
   ```
   ?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java:
##########
@@ -121,6 +135,11 @@ public QueryOperation getChild() {
         return child;
     }
 
+    /** return an empty array when no column list specified. */

Review Comment:
   From the comments, seems like it should be 
   ```
   if (targetColumns == null) {
   return new int[0][0];;
   } else {
   return targetColumns
   }
   ```
   ?
   



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java:
##########
@@ -1558,8 +1572,16 @@ private Operation convertUpdate(SqlUpdate sqlUpdate) {
                         catalogManager.qualifyIdentifier(unresolvedTableIdentifier));
         // get query
         PlannerQueryOperation queryOperation = new PlannerQueryOperation(tableModify);
+        // TODO

Review Comment:
   It'll be better we can also add the jira [FLINK-31344](https://issues.apache.org/jira/browse/FLINK-31344) track in here



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java:
##########
@@ -1942,6 +1959,15 @@ public Optional<Integer> getParallelism() {
             }
         }
 
+        private static int[] getIndexArray(List<String> targetColumns, String[] allColumns) {

Review Comment:
   Seems this method is not used?



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala:
##########
@@ -163,6 +192,21 @@ class PartialInsertTest(isBatch: Boolean) extends TableTestBase {
         "SELECT a,b,c,d,e,123 FROM MyTable"
     )
   }
+
+  @Test
+  def testPartialInsert(): Unit = {

Review Comment:
   `testPartialInsertWithGroupby`?



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala:
##########
@@ -401,6 +404,15 @@ object PreValidateReWriter {
     SqlUtil.newContextException(pos, e)
   }
 
+  private def validateUnsupportedCompositeColumn(id: SqlIdentifier): Unit = {
+    assert(id != null)
+    if (!id.isSimple) {
+      val pos = id.getParserPosition
+      // TODO add accurate msg s"column name must be a simple identifier, composite column name '${id.toString}' is not supported yet"

Review Comment:
   got error from my IDEA `File line length exceeds 100 characters. `



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala:
##########
@@ -401,6 +404,15 @@ object PreValidateReWriter {
     SqlUtil.newContextException(pos, e)
   }
 
+  private def validateUnsupportedCompositeColumn(id: SqlIdentifier): Unit = {
+    assert(id != null)
+    if (!id.isSimple) {
+      val pos = id.getParserPosition
+      // TODO add accurate msg s"column name must be a simple identifier, composite column name '${id.toString}' is not supported yet"

Review Comment:
   Seems like we miss this todo?



##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml:
##########


Review Comment:
   In the previos implemtation, I'm intended to make the method's order in this file match with the order in `RowLevelUpdateTest. Now, I found it's out of order.  Do you think it matter?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java:
##########
@@ -1558,8 +1572,16 @@ private Operation convertUpdate(SqlUpdate sqlUpdate) {
                         catalogManager.qualifyIdentifier(unresolvedTableIdentifier));
         // get query
         PlannerQueryOperation queryOperation = new PlannerQueryOperation(tableModify);
+        // TODO
+        List<String> updateColumnList =
+                sqlUpdate.getTargetColumnList().stream()
+                        .map(c -> ((SqlIdentifier) c).getSimple())
+                        .collect(Collectors.toList());
         return new SinkModifyOperation(
-                contextResolvedTable, queryOperation, SinkModifyOperation.ModifyType.UPDATE);
+                contextResolvedTable,
+                queryOperation,
+                null, // targetColumns

Review Comment:
   pass `updateColumnList`?



##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSinkTest.xml:
##########
@@ -16,6 +16,15 @@ See the License for the specific language governing permissions and
 limitations under the License.
 -->
 <Root>
+  <TestCase name="testManagedTableSinkWithEnableCheckpointing">

Review Comment:
   Why add this? Seems the previous code miss the this for the test `org.apache.flink.table.planner.plan.batch.sql#testManagedTableSinkWithEnableCheckpointing`.
    But I'm really curious about why the previous code can pass test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org