You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/25 09:20:10 UTC

[flink-table-store] branch release-0.2 updated: [FLINK-28666] Alter spark table's primary key should throw exception

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.2 by this push:
     new eb1bb9a2 [FLINK-28666] Alter spark table's primary key should throw exception
eb1bb9a2 is described below

commit eb1bb9a2f89d0b01a8fdb6809bb95bc521204ad6
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Mon Jul 25 17:19:32 2022 +0800

    [FLINK-28666] Alter spark table's primary key should throw exception
    
    This closes #234
---
 .../org/apache/flink/table/store/spark/SparkCatalog.java   | 11 ++++++++++-
 .../apache/flink/table/store/spark/SparkReadITCase.java    | 14 ++++++++++++++
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index 76e49a88..d4624b1c 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -241,9 +241,12 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
     private SchemaChange toSchemaChange(TableChange change) {
         if (change instanceof SetProperty) {
             SetProperty set = (SetProperty) change;
+            validateAlterProperty(set.property());
             return SchemaChange.setOption(set.property(), set.value());
         } else if (change instanceof RemoveProperty) {
-            return SchemaChange.removeOption(((RemoveProperty) change).property());
+            RemoveProperty remove = (RemoveProperty) change;
+            validateAlterProperty(remove.property());
+            return SchemaChange.removeOption(remove.property());
         } else if (change instanceof AddColumn) {
             AddColumn add = (AddColumn) change;
             validateAlterNestedField(add.fieldNames());
@@ -300,6 +303,12 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
         }
     }
 
+    private void validateAlterProperty(String alterKey) {
+        if (PRIMARY_KEY_IDENTIFIER.equals(alterKey)) {
+            throw new UnsupportedOperationException("Alter primary key is not supported");
+        }
+    }
+
     private boolean isValidateNamespace(String[] namespace) {
         return namespace.length == 1;
     }
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 7c24b427..67bfa8eb 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -239,6 +239,13 @@ public class SparkReadITCase {
 
         options = schema1().options();
         assertThat(options).doesNotContainKey("xyc");
+
+        assertThatThrownBy(
+                        () ->
+                                spark.sql(
+                                        "ALTER TABLE table_store.default.t1 SET TBLPROPERTIES('primary-key' = 'a')"))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("Alter primary key is not supported");
     }
 
     @Test
@@ -366,6 +373,13 @@ public class SparkReadITCase {
                 .isInstanceOf(NoSuchNamespaceException.class)
                 .hasMessageContaining("Namespace 'foo' not found");
 
+        assertThatThrownBy(
+                        () ->
+                                spark.sql(
+                                        "ALTER TABLE default.MyTable UNSET TBLPROPERTIES('primary-key')"))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("Alter primary key is not supported");
+
         Path tablePath = new Path(warehousePath, "default.db/MyTable");
         TableSchema schema = FileStoreTableFactory.create(tablePath).schema();
         assertThat(schema.fields())