You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/05/11 08:33:48 UTC
[incubator-paimon] branch master updated: [flink] Support nullability change when altering table on flink 1.17 (#1112)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ba14cef8d [flink] Support nullability change when altering table on flink 1.17 (#1112)
ba14cef8d is described below
commit ba14cef8df1282a51cc039adb3d52eeacf239dc3
Author: GuojunLi <li...@bytedance.com>
AuthorDate: Thu May 11 16:33:43 2023 +0800
[flink] Support nullability change when altering table on flink 1.17 (#1112)
---
docs/content/how-to/altering-tables.md | 15 +++++-
.../java/org/apache/paimon/flink/FlinkCatalog.java | 62 +++++++++++++++-------
.../apache/paimon/flink/SchemaChangeITCase.java | 49 +++++++++++++++--
3 files changed, 103 insertions(+), 23 deletions(-)
diff --git a/docs/content/how-to/altering-tables.md b/docs/content/how-to/altering-tables.md
index 2e4f969d2..07b6bae41 100644
--- a/docs/content/how-to/altering-tables.md
+++ b/docs/content/how-to/altering-tables.md
@@ -288,7 +288,7 @@ ALTER TABLE my_table DROP COLUMN c1;
## Changing Column Nullability
-The following SQL sets column `coupon_info` to be nullable.
+The following SQL changes nullability of column `coupon_info`.
{{< tabs "change-nullability-example" >}}
@@ -296,7 +296,14 @@ The following SQL sets column `coupon_info` to be nullable.
```sql
CREATE TABLE my_table (id INT PRIMARY KEY NOT ENFORCED, coupon_info FLOAT NOT NULL);
+
+-- Change column `coupon_info` from NOT NULL to nullable
ALTER TABLE my_table MODIFY coupon_info FLOAT;
+
+-- Change column `coupon_info` from nullable to NOT NULL
+-- If there are NULL values already, set table option as below to drop those records silently before altering table.
+SET 'table.exec.sink.not-null-enforcer' = 'DROP';
+ALTER TABLE my_table MODIFY coupon_info FLOAT NOT NULL;
```
{{< /tab >}}
@@ -312,6 +319,12 @@ ALTER TABLE my_table ALTER COLUMN coupon_info DROP NOT NULL;
{{< /tabs >}}
+{{< hint info >}}
+
+Changing nullable column to NOT NULL is only supported by Flink currently.
+
+{{< /hint >}}
+
## Changing Column Comment
The following SQL changes comment of column `buy_count` to `buy count`.
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 9a3f7cd72..1b5e2b276 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -68,6 +68,7 @@ import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import java.util.ArrayList;
@@ -250,37 +251,58 @@ public class FlinkCatalog extends AbstractCatalog {
}
}
- private SchemaChange toSchemaChange(TableChange change) {
+ private List<SchemaChange> toSchemaChange(TableChange change) {
+ List<SchemaChange> schemaChanges = new ArrayList<>();
if (change instanceof AddColumn) {
AddColumn add = (AddColumn) change;
String comment = add.getColumn().getComment().orElse(null);
SchemaChange.Move move = getMove(add.getPosition(), add.getColumn().getName());
- return SchemaChange.addColumn(
- add.getColumn().getName(),
- LogicalTypeConversion.toDataType(
- add.getColumn().getDataType().getLogicalType()),
- comment,
- move);
+ schemaChanges.add(
+ SchemaChange.addColumn(
+ add.getColumn().getName(),
+ LogicalTypeConversion.toDataType(
+ add.getColumn().getDataType().getLogicalType()),
+ comment,
+ move));
+ return schemaChanges;
} else if (change instanceof DropColumn) {
DropColumn drop = (DropColumn) change;
- return SchemaChange.dropColumn(drop.getColumnName());
+ schemaChanges.add(SchemaChange.dropColumn(drop.getColumnName()));
+ return schemaChanges;
} else if (change instanceof ModifyColumnName) {
ModifyColumnName modify = (ModifyColumnName) change;
- return SchemaChange.renameColumn(modify.getOldColumnName(), modify.getNewColumnName());
+ schemaChanges.add(
+ SchemaChange.renameColumn(
+ modify.getOldColumnName(), modify.getNewColumnName()));
+ return schemaChanges;
} else if (change instanceof ModifyPhysicalColumnType) {
ModifyPhysicalColumnType modify = (ModifyPhysicalColumnType) change;
- return SchemaChange.updateColumnType(
- modify.getOldColumn().getName(),
- LogicalTypeConversion.toDataType(modify.getNewType().getLogicalType()));
+ LogicalType newColumnType = modify.getNewType().getLogicalType();
+ LogicalType oldColumnType = modify.getOldColumn().getDataType().getLogicalType();
+ if (newColumnType.isNullable() != oldColumnType.isNullable()) {
+ schemaChanges.add(
+ SchemaChange.updateColumnNullability(
+ new String[] {modify.getNewColumn().getName()},
+ newColumnType.isNullable()));
+ }
+ schemaChanges.add(
+ SchemaChange.updateColumnType(
+ modify.getOldColumn().getName(),
+ LogicalTypeConversion.toDataType(newColumnType)));
+ return schemaChanges;
} else if (change instanceof ModifyColumnPosition) {
ModifyColumnPosition modify = (ModifyColumnPosition) change;
SchemaChange.Move move =
getMove(modify.getNewPosition(), modify.getNewColumn().getName());
- return SchemaChange.updateColumnPosition(move);
+ schemaChanges.add(SchemaChange.updateColumnPosition(move));
+ return schemaChanges;
} else if (change instanceof TableChange.ModifyColumnComment) {
ModifyColumnComment modify = (ModifyColumnComment) change;
- return SchemaChange.updateColumnComment(
- new String[] {modify.getNewColumn().getName()}, modify.getNewComment());
+ schemaChanges.add(
+ SchemaChange.updateColumnComment(
+ new String[] {modify.getNewColumn().getName()},
+ modify.getNewComment()));
+ return schemaChanges;
} else if (change instanceof SetOption) {
SetOption setOption = (SetOption) change;
String key = setOption.getKey();
@@ -288,10 +310,12 @@ public class FlinkCatalog extends AbstractCatalog {
SchemaManager.checkAlterTablePath(key);
- return SchemaChange.setOption(key, value);
+ schemaChanges.add(SchemaChange.setOption(key, value));
+ return schemaChanges;
} else if (change instanceof ResetOption) {
ResetOption resetOption = (ResetOption) change;
- return SchemaChange.removeOption(resetOption.getKey());
+ schemaChanges.add(SchemaChange.removeOption(resetOption.getKey()));
+ return schemaChanges;
} else {
throw new UnsupportedOperationException(
"Change is not supported: " + change.getClass());
@@ -362,7 +386,9 @@ public class FlinkCatalog extends AbstractCatalog {
List<SchemaChange> changes = new ArrayList<>();
if (null != tableChanges) {
List<SchemaChange> schemaChanges =
- tableChanges.stream().map(this::toSchemaChange).collect(Collectors.toList());
+ tableChanges.stream()
+ .flatMap(tableChange -> toSchemaChange(tableChange).stream())
+ .collect(Collectors.toList());
changes.addAll(schemaChanges);
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index cf772c7a8..207b07cfc 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -20,6 +20,8 @@ package org.apache.paimon.flink;
import org.apache.paimon.testutils.assertj.AssertionUtils;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;
@@ -283,7 +285,16 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
+ " `c` VARCHAR(2147483647),\n"
+ " `d` INT,\n"
+ " `e` FLOAT NOT NULL,");
+ assertThatThrownBy(
+ () ->
+ sql(
+ "INSERT INTO T VALUES('aaa', 'bbb', 'ccc', 1, CAST(NULL AS FLOAT))"))
+ .satisfies(
+ AssertionUtils.anyCauseMatches(
+ TableException.class,
+ "Column 'e' is NOT NULL, however, a null value is being written into it."));
+ // Not null -> nullable
sql("ALTER TABLE T MODIFY e FLOAT");
result = sql("SHOW CREATE TABLE T");
assertThat(result.toString())
@@ -295,10 +306,40 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
+ " `d` INT,\n"
+ " `e` FLOAT");
- assertThatThrownBy(() -> sql("ALTER TABLE T MODIFY c STRING NOT NULL"))
- .hasMessageContaining(
- "Could not execute ALTER TABLE PAIMON.default.T\n"
- + " MODIFY `c` STRING NOT NULL");
+ // Nullable -> not null
+ sql("ALTER TABLE T MODIFY c STRING NOT NULL");
+ result = sql("SHOW CREATE TABLE T");
+ assertThat(result.toString())
+ .contains(
+ "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+ + " `a` VARCHAR(2147483647) NOT NULL,\n"
+ + " `b` VARCHAR(2147483647),\n"
+ + " `c` VARCHAR(2147483647) NOT NULL,\n"
+ + " `d` INT,\n"
+ + " `e` FLOAT");
+ assertThatThrownBy(
+ () ->
+ sql(
+ "INSERT INTO T VALUES('aaa', 'bbb', CAST(NULL AS STRING), 1, CAST(NULL AS FLOAT))"))
+ .satisfies(
+ AssertionUtils.anyCauseMatches(
+ TableException.class,
+ "Column 'c' is NOT NULL, however, a null value is being written into it."));
+
+ // Insert a null value
+ sql("INSERT INTO T VALUES('aaa', 'bbb', 'ccc', 1, CAST(NULL AS FLOAT))");
+ result = sql("select * from T");
+ assertThat(result.toString()).isEqualTo("[+I[aaa, bbb, ccc, 1, null]]");
+
+ // Then nullable -> not null
+ tEnv.getConfig()
+ .set(
+ ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER,
+ ExecutionConfigOptions.NotNullEnforcer.DROP);
+ sql("ALTER TABLE T MODIFY e FLOAT NOT NULL;\n");
+ sql("INSERT INTO T VALUES('aa2', 'bb2', 'cc2', 2, 2.5)");
+ result = sql("select * from T");
+ assertThat(result.toString()).isEqualTo("[+I[aa2, bb2, cc2, 2, 2.5]]");
}
@Test