You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/27 13:29:08 UTC

[GitHub] [flink] fsk119 commented on a diff in pull request #21504: [FLINK-22316][table] Support MODIFY column/constraint/watermark for ALTER TABLE statement

fsk119 commented on code in PR #21504:
URL: https://github.com/apache/flink/pull/21504#discussion_r1057674344


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala:
##########
@@ -607,6 +607,195 @@ class TableEnvironmentTest {
     checkData(expectedResult.iterator(), tableResult.collect())
   }
 
+  @Test
+  def testAlterTableModifyColumn(): Unit = {
+    val statement =

Review Comment:
   nit: These negative cases is much same in the `table.q`. I think we can keep one...



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -417,6 +422,92 @@ void checkColumnExists(String columnName) {
         }
     }
 
+    private static class ModifySchemaConverter extends SchemaConverter {
+
+        ModifySchemaConverter(
+                Schema originalSchema,
+                FlinkTypeFactory typeFactory,
+                SqlValidator sqlValidator,
+                Consumer<SqlTableConstraint> constraintValidator,
+                Function<SqlNode, String> escapeExpressions,
+                SchemaResolver schemaResolver) {
+            super(
+                    originalSchema,
+                    typeFactory,
+                    sqlValidator,
+                    constraintValidator,
+                    escapeExpressions,
+                    schemaResolver);
+        }
+
+        @Override
+        void checkColumnExists(String columnName) {
+            if (!sortedColumnNames.contains(columnName)) {
+                throw new ValidationException(
+                        String.format(
+                                "%sTry to modify a column `%s` which does not exist in the table.",
+                                EX_MSG_PREFIX, columnName));
+            }
+        }
+
+        @Override
+        void checkPrimaryKeyExists() {
+            if (primaryKey == null) {
+                throw new ValidationException(
+                        String.format(
+                                "%sThe base table does not define any primary key constraint. You might "
+                                        + "want to add a new one.",
+                                EX_MSG_PREFIX));
+            }
+        }
+
+        @Override
+        void checkWatermarkExists() {
+            if (watermarkSpec == null) {
+                throw new ValidationException(
+                        String.format(
+                                "%sThe base table does not define any watermark. You might "
+                                        + "want to add a new one.",
+                                EX_MSG_PREFIX));
+            }
+        }
+
+        @Override
+        Optional<Integer> getColumnPosition(SqlTableColumnPosition columnPosition) {
+            if (columnPosition.isFirstColumn() || columnPosition.isAfterReferencedColumn()) {
+                sortedColumnNames.remove(columnPosition.getColumn().getName().getSimple());
+                return super.getColumnPosition(columnPosition);
+            }
+            return Optional.empty();
+        }
+
+        @Override
+        Schema.UnresolvedPhysicalColumn convertPhysicalColumn(
+                SqlTableColumn.SqlRegularColumn physicalColumn) {
+            Schema.UnresolvedPhysicalColumn newColumn = super.convertPhysicalColumn(physicalColumn);
+            String columnName = newColumn.getName();
+            // preserves the primary key's nullability
+            if (primaryKey != null && primaryKey.getColumnNames().contains(columnName)) {
+                newColumn =
+                        new Schema.UnresolvedPhysicalColumn(
+                                columnName,
+                                newColumn.getDataType().notNull(),
+                                newColumn.getComment().orElse(null));
+            }

Review Comment:
   Why not mark the column is not null when building the final schema with the final pk?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org