You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2023/01/04 01:44:59 UTC

[flink-table-store] 03/03: [FLINK-30522] Fix 'SHOW TBLPROPERTIES' can't read properties of table in Spark3

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.3
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit eeb09ca128553d77e9f8886b335e6048cd498179
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Wed Jan 4 09:42:59 2023 +0800

    [FLINK-30522] Fix 'SHOW TBLPROPERTIES' can't read properties of table in Spark3
    
    This closes #451
---
 .../org/apache/flink/table/store/CoreOptions.java  |  4 +++
 .../table/store/file/schema/SchemaManagerTest.java | 20 +++++++-----
 .../apache/flink/table/store/spark/SparkTable.java | 12 +++++++
 .../flink/table/store/spark/SparkReadITCase.java   | 18 ++++++++++
 .../store/spark/SparkSchemaEvolutionITCase.java    | 38 +++++++++++++++++++++-
 5 files changed, 83 insertions(+), 9 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 22974af1..ba61e105 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -405,6 +405,10 @@ public class CoreOptions implements Serializable {
         this.options = options;
     }
 
+    public Map<String, String> toMap() {
+        return options.toMap();
+    }
+
     public int bucket() {
         return options.get(BUCKET);
     }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
index 5d6a4f94..fb4086a1 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
@@ -110,14 +110,18 @@ public class SchemaManagerTest {
 
         assertThatThrownBy(
                         () ->
-                                manager.commitNewVersion(
-                                        new UpdateSchema(
-                                                rowType,
-                                                partitionKeys,
-                                                primaryKeys,
-                                                Collections.singletonMap(
-                                                        CoreOptions.SEQUENCE_FIELD.key(), "f4"),
-                                                "")))
+                                retryArtificialException(
+                                        () ->
+                                                manager.commitNewVersion(
+                                                        new UpdateSchema(
+                                                                rowType,
+                                                                partitionKeys,
+                                                                primaryKeys,
+                                                                Collections.singletonMap(
+                                                                        CoreOptions.SEQUENCE_FIELD
+                                                                                .key(),
+                                                                        "f4"),
+                                                                ""))))
                 .isInstanceOf(IllegalArgumentException.class)
                 .hasMessageContaining("Nonexistent sequence field: 'f4'");
     }
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
index 0984dba1..5e2aeb44 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.spark;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.table.DataTable;
 import org.apache.flink.table.store.table.SupportsPartition;
 import org.apache.flink.table.store.table.Table;
 
@@ -39,8 +40,10 @@ import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
@@ -118,6 +121,15 @@ public class SparkTable
         castToWritable(table).deleteWhere(commitUser, predicates, lockFactory);
     }
 
+    @Override
+    public Map<String, String> properties() {
+        if (table instanceof DataTable) {
+            return ((DataTable) table).options().toMap();
+        } else {
+            return Collections.emptyMap();
+        }
+    }
+
     private static org.apache.flink.table.store.table.SupportsWrite castToWritable(Table table) {
         if (!(table instanceof org.apache.flink.table.store.table.SupportsWrite)) {
             throw new UnsupportedOperationException(
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 1c6f8caf..2d2cba1f 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -147,6 +147,24 @@ public class SparkReadITCase extends SparkReadTestBase {
                 .isEqualTo("[[a,bigint,], [b,string,], [,,], [# Partitioning,,], [Part 0,a,]]");
     }
 
+    @Test
+    public void testShowTableProperties() {
+        spark.sql("USE tablestore");
+        spark.sql(
+                "CREATE TABLE default.tbl (\n"
+                        + "a INT\n"
+                        + ") TBLPROPERTIES (\n"
+                        + "'k1' = 'v1',\n"
+                        + "'k2' = 'v2'"
+                        + ")");
+
+        assertThat(
+                        spark.sql("SHOW TBLPROPERTIES default.tbl").collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .contains("[k1,v1]", "[k2,v2]");
+    }
+
     @Test
     public void testCreateTableWithInvalidPk() {
         spark.sql("USE tablestore");
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
index fb88cc88..22a096dd 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
@@ -93,6 +93,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
                                 + "  `a` INT NOT NULL,\n"
                                 + "  `b` BIGINT,\n"
                                 + "  `c` STRING)\n"
+                                + buildTableProperties("default.db/testAddNotNullColumn")
                                 + "]]");
 
         assertThatThrownBy(
@@ -120,6 +121,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
                                 + "  `a` INT NOT NULL,\n"
                                 + "  `b` BIGINT,\n"
                                 + "  `c` STRING)\n"
+                                + buildTableProperties("default.db/testRenameColumn")
                                 + "]]");
         Dataset<Row> table1 = spark.table("tablestore.default.testRenameColumn");
         List<Row> results = table1.select("a", "c").collectAsList();
@@ -135,6 +137,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
                                 + "  `aa` INT NOT NULL,\n"
                                 + "  `b` BIGINT,\n"
                                 + "  `c` STRING)\n"
+                                + buildTableProperties("default.db/testRenameColumn")
                                 + "]]");
         Dataset<Row> table2 = spark.table("tablestore.default.testRenameColumn");
         results = table2.select("aa", "c").collectAsList();
@@ -163,7 +166,14 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
                         "[[CREATE TABLE testRenamePartitionKey (\n"
                                 + "  `a` BIGINT,\n"
                                 + "  `b` STRING)\n"
+                                + "USING tablestore\n"
                                 + "PARTITIONED BY (a)\n"
+                                + "COMMENT 'table comment'\n"
+                                + "TBLPROPERTIES(\n"
+                                + "  'foo' = 'bar',\n"
+                                + String.format(
+                                        "  'path' = '%s/%s')\n",
+                                        warehousePath, "default.db/testRenamePartitionKey")
                                 + "]]");
 
         assertThatThrownBy(
@@ -194,6 +204,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
                                 + "  `a` INT NOT NULL,\n"
                                 + "  `b` BIGINT,\n"
                                 + "  `c` STRING)\n"
+                                + buildTableProperties("default.db/testDropSingleColumn")
                                 + "]]");
 
         spark.sql("ALTER TABLE tablestore.default.testDropSingleColumn DROP COLUMN a");
@@ -206,6 +217,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
                         "[[CREATE TABLE testDropSingleColumn (\n"
                                 + "  `b` BIGINT,\n"
                                 + "  `c` STRING)\n"
+                                + buildTableProperties("default.db/testDropSingleColumn")
                                 + "]]");
 
         Dataset<Row> table = spark.table("tablestore.default.testDropSingleColumn");
@@ -226,6 +238,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
                                 + "  `a` INT NOT NULL,\n"
                                 + "  `b` BIGINT,\n"
                                 + "  `c` STRING)\n"
+                                + buildTableProperties("default.db/testDropColumns")
                                 + "]]");
 
         spark.sql("ALTER TABLE tablestore.default.testDropColumns DROP COLUMNS a, b");
@@ -233,7 +246,11 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
         List<Row> afterRename =
                 spark.sql("SHOW CREATE TABLE tablestore.default.testDropColumns").collectAsList();
         assertThat(afterRename.toString())
-                .isEqualTo("[[CREATE TABLE testDropColumns (\n" + "  `c` STRING)\n" + "]]");
+                .isEqualTo(
+                        "[[CREATE TABLE testDropColumns (\n"
+                                + "  `c` STRING)\n"
+                                + buildTableProperties("default.db/testDropColumns")
+                                + "]]");
     }
 
     @Test
@@ -255,7 +272,14 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
                         "[[CREATE TABLE testDropPartitionKey (\n"
                                 + "  `a` BIGINT,\n"
                                 + "  `b` STRING)\n"
+                                + "USING tablestore\n"
                                 + "PARTITIONED BY (a)\n"
+                                + "COMMENT 'table comment'\n"
+                                + "TBLPROPERTIES(\n"
+                                + "  'foo' = 'bar',\n"
+                                + String.format(
+                                        "  'path' = '%s/%s')\n",
+                                        warehousePath, "default.db/testDropPartitionKey")
                                 + "]]");
 
         assertThatThrownBy(
@@ -288,7 +312,13 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
                         "[[CREATE TABLE testDropPrimaryKey (\n"
                                 + "  `a` BIGINT NOT NULL,\n"
                                 + "  `b` STRING NOT NULL)\n"
+                                + "USING tablestore\n"
                                 + "PARTITIONED BY (a)\n"
+                                + "COMMENT 'table comment'\n"
+                                + "TBLPROPERTIES(\n"
+                                + String.format(
+                                        "  'path' = '%s/%s')\n",
+                                        warehousePath, "default.db/testDropPrimaryKey")
                                 + "]]");
 
         assertThatThrownBy(
@@ -533,4 +563,10 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
                                 .toString())
                 .isEqualTo("[[15,18,17,16], [19,22,21,20]]");
     }
+
+    private String buildTableProperties(String tablePath) {
+        return String.format(
+                "TBLPROPERTIES(\n" + "  'file.format' = 'avro',\n" + "  'path' = '%s/%s')\n",
+                warehousePath, tablePath);
+    }
 }