You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/04 09:46:26 UTC

[GitHub] [flink-table-store] zjureel commented on a diff in pull request #347: [FLINK-27847] Support rename/drop column in SchemaManager

zjureel commented on code in PR #347:
URL: https://github.com/apache/flink-table-store/pull/347#discussion_r1013810662


##########
flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java:
##########
@@ -281,6 +281,176 @@ public void testAddColumn() throws Exception {
         assertThat(results.toString()).isEqualTo("[[8]]");
     }
 
+    @Test
+    public void testRenameColumn() throws Exception {
+        Path tablePath = new Path(warehousePath, "default.db/testRenameColumn");
+        createTestHelper(tablePath);
+
+        List<Row> beforeRename =
+                spark.sql("SHOW CREATE TABLE tablestore.default.testRenameColumn").collectAsList();
+        assertThat(beforeRename.toString())
+                .isEqualTo(
+                        "[[CREATE TABLE testRenameColumn (\n"
+                                + "  `a` INT NOT NULL,\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `c` STRING)\n"
+                                + "]]");
+
+        spark.sql("ALTER TABLE tablestore.default.testRenameColumn RENAME COLUMN a to aa");
+
+        List<Row> afterRename =
+                spark.sql("SHOW CREATE TABLE tablestore.default.testRenameColumn").collectAsList();
+        assertThat(afterRename.toString())
+                .isEqualTo(
+                        "[[CREATE TABLE testRenameColumn (\n"
+                                + "  `aa` INT NOT NULL,\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `c` STRING)\n"
+                                + "]]");
+    }
+
+    @Test
+    public void testRenamePartitionKey() {
+        spark.sql("USE tablestore");
+        spark.sql(
+                "CREATE TABLE default.testRenamePartitionKey (\n"
+                        + "a BIGINT,\n"
+                        + "b STRING) USING tablestore\n"
+                        + "COMMENT 'table comment'\n"
+                        + "PARTITIONED BY (a)\n"
+                        + "TBLPROPERTIES ('foo' = 'bar')");
+
+        List<Row> beforeRename =
+                spark.sql("SHOW CREATE TABLE tablestore.default.testRenamePartitionKey")
+                        .collectAsList();
+        assertThat(beforeRename.toString())
+                .isEqualTo(
+                        "[[CREATE TABLE testRenamePartitionKey (\n"
+                                + "  `a` BIGINT,\n"
+                                + "  `b` STRING)\n"
+                                + "PARTITIONED BY (a)\n"
+                                + "]]");
+
+        assertThatThrownBy(
+                        () ->
+                                spark.sql(
+                                        "ALTER TABLE tablestore.default.testRenamePartitionKey RENAME COLUMN a to aa"))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessage("java.lang.UnsupportedOperationException: Cannot rename partition key");
+    }
+
+    @Test
+    public void testDropSingleColumn() throws Exception {
+        Path tablePath = new Path(warehousePath, "default.db/testDropSingleColumn");
+        createTestHelper(tablePath);
+
+        List<Row> beforeRename =
+                spark.sql("SHOW CREATE TABLE tablestore.default.testDropSingleColumn")
+                        .collectAsList();
+        assertThat(beforeRename.toString())
+                .isEqualTo(
+                        "[[CREATE TABLE testDropSingleColumn (\n"
+                                + "  `a` INT NOT NULL,\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `c` STRING)\n"
+                                + "]]");
+
+        spark.sql("ALTER TABLE tablestore.default.testDropSingleColumn DROP COLUMN a");
+
+        List<Row> afterRename =
+                spark.sql("SHOW CREATE TABLE tablestore.default.testDropSingleColumn")
+                        .collectAsList();
+        assertThat(afterRename.toString())
+                .isEqualTo(
+                        "[[CREATE TABLE testDropSingleColumn (\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `c` STRING)\n"
+                                + "]]");
+    }
+
+    @Test
+    public void testDropColumns() throws Exception {

Review Comment:
   Read data will fail in this PR. I found this issue [https://issues.apache.org/jira/browse/FLINK-27846](FLINK-27846) before I fix FLINK-27847. Should we fix read data in FLINK-27847 or in FLINK-27846? What do you think of it? HTX



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org