You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/22 04:53:46 UTC

[flink-table-store] branch master updated: [FLINK-28638] Restrict ALTER TABLE from setting write-mode

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new af6cee84 [FLINK-28638] Restrict ALTER TABLE from setting write-mode
af6cee84 is described below

commit af6cee84076476b73d677d5a15f74852565e94c4
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Fri Jul 22 12:53:41 2022 +0800

    [FLINK-28638] Restrict ALTER TABLE from setting write-mode
    
    This closes #231
---
 .../table/store/connector/SchemaChangeITCase.java    | 20 ++++++++++++++++++--
 .../flink/table/store/file/schema/SchemaManager.java |  6 ++++--
 2 files changed, 22 insertions(+), 4 deletions(-)

diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/SchemaChangeITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/SchemaChangeITCase.java
index 61e28bd9..dcb38b34 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/SchemaChangeITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/SchemaChangeITCase.java
@@ -53,12 +53,28 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
         assertThatThrownBy(() -> sql("ALTER TABLE T1 SET ('bucket-key' = 'c')"))
                 .getRootCause()
                 .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessage("Change bucket key is not supported yet.");
+                .hasMessage("Change bucket-key is not supported yet.");
 
         sql("CREATE TABLE T2 (a STRING, b STRING, c STRING) WITH ('bucket-key' = 'c')");
         assertThatThrownBy(() -> sql("ALTER TABLE T2 RESET ('bucket-key')"))
                 .getRootCause()
                 .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessage("Change bucket key is not supported yet.");
+                .hasMessage("Change bucket-key is not supported yet.");
+    }
+
+    @Test
+    public void testSetAndResetWriteMode() throws Exception {
+        sql("CREATE TABLE T1 (a STRING, b STRING, c STRING)");
+
+        assertThatThrownBy(() -> sql("ALTER TABLE T1 SET ('write-mode' = 'append-only')"))
+                .getRootCause()
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage("Change write-mode is not supported yet.");
+
+        sql("CREATE TABLE T2 (a STRING, b STRING, c STRING) WITH ('write-mode' = 'append-only')");
+        assertThatThrownBy(() -> sql("ALTER TABLE T2 RESET ('write-mode')"))
+                .getRootCause()
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage("Change write-mode is not supported yet.");
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index 88d173c1..7dcec7f9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -50,6 +50,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.store.CoreOptions.BUCKET_KEY;
+import static org.apache.flink.table.store.CoreOptions.WRITE_MODE;
 import static org.apache.flink.table.store.file.utils.FileUtils.listVersionedFiles;
 
 /** Schema Manager to manage schema versions. */
@@ -313,8 +314,9 @@ public class SchemaManager implements Serializable {
     }
 
     private void checkAlterTableOption(String key) {
-        if (BUCKET_KEY.key().equals(key)) {
-            throw new UnsupportedOperationException("Change bucket key is not supported yet.");
+        if (BUCKET_KEY.key().equals(key) || WRITE_MODE.key().equals(key)) {
+            throw new UnsupportedOperationException(
+                    String.format("Change %s is not supported yet.", key));
         }
     }
 }