You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/25 09:20:10 UTC
[flink-table-store] branch release-0.2 updated: [FLINK-28666] Alter spark table's primary key should throw exception
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.2 by this push:
new eb1bb9a2 [FLINK-28666] Alter spark table's primary key should throw exception
eb1bb9a2 is described below
commit eb1bb9a2f89d0b01a8fdb6809bb95bc521204ad6
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Mon Jul 25 17:19:32 2022 +0800
[FLINK-28666] Alter spark table's primary key should throw exception
This closes #234
---
.../org/apache/flink/table/store/spark/SparkCatalog.java | 11 ++++++++++-
.../apache/flink/table/store/spark/SparkReadITCase.java | 14 ++++++++++++++
2 files changed, 24 insertions(+), 1 deletion(-)
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index 76e49a88..d4624b1c 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -241,9 +241,12 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
private SchemaChange toSchemaChange(TableChange change) {
if (change instanceof SetProperty) {
SetProperty set = (SetProperty) change;
+ validateAlterProperty(set.property());
return SchemaChange.setOption(set.property(), set.value());
} else if (change instanceof RemoveProperty) {
- return SchemaChange.removeOption(((RemoveProperty) change).property());
+ RemoveProperty remove = (RemoveProperty) change;
+ validateAlterProperty(remove.property());
+ return SchemaChange.removeOption(remove.property());
} else if (change instanceof AddColumn) {
AddColumn add = (AddColumn) change;
validateAlterNestedField(add.fieldNames());
@@ -300,6 +303,12 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
}
}
+ private void validateAlterProperty(String alterKey) {
+ if (PRIMARY_KEY_IDENTIFIER.equals(alterKey)) {
+ throw new UnsupportedOperationException("Alter primary key is not supported");
+ }
+ }
+
private boolean isValidateNamespace(String[] namespace) {
return namespace.length == 1;
}
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 7c24b427..67bfa8eb 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -239,6 +239,13 @@ public class SparkReadITCase {
options = schema1().options();
assertThat(options).doesNotContainKey("xyc");
+
+ assertThatThrownBy(
+ () ->
+ spark.sql(
+ "ALTER TABLE table_store.default.t1 SET TBLPROPERTIES('primary-key' = 'a')"))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("Alter primary key is not supported");
}
@Test
@@ -366,6 +373,13 @@ public class SparkReadITCase {
.isInstanceOf(NoSuchNamespaceException.class)
.hasMessageContaining("Namespace 'foo' not found");
+ assertThatThrownBy(
+ () ->
+ spark.sql(
+ "ALTER TABLE default.MyTable UNSET TBLPROPERTIES('primary-key')"))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("Alter primary key is not supported");
+
Path tablePath = new Path(warehousePath, "default.db/MyTable");
TableSchema schema = FileStoreTableFactory.create(tablePath).schema();
assertThat(schema.fields())