You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2023/01/04 01:43:04 UTC
[flink-table-store] branch master updated: [FLINK-30522] Fix 'SHOW TBLPROPERTIES' can't read properties of table in Spark3
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 545617a6 [FLINK-30522] Fix 'SHOW TBLPROPERTIES' can't read properties of table in Spark3
545617a6 is described below
commit 545617a6fcf7eefd0a3f19d7adac431074891367
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Wed Jan 4 09:42:59 2023 +0800
[FLINK-30522] Fix 'SHOW TBLPROPERTIES' can't read properties of table in Spark3
This closes #451
---
.../org/apache/flink/table/store/CoreOptions.java | 4 +++
.../table/store/file/schema/SchemaManagerTest.java | 20 +++++++-----
.../apache/flink/table/store/spark/SparkTable.java | 12 +++++++
.../flink/table/store/spark/SparkReadITCase.java | 18 ++++++++++
.../store/spark/SparkSchemaEvolutionITCase.java | 38 +++++++++++++++++++++-
5 files changed, 83 insertions(+), 9 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 22974af1..ba61e105 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -405,6 +405,10 @@ public class CoreOptions implements Serializable {
this.options = options;
}
+ public Map<String, String> toMap() {
+ return options.toMap();
+ }
+
public int bucket() {
return options.get(BUCKET);
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
index 5d6a4f94..fb4086a1 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
@@ -110,14 +110,18 @@ public class SchemaManagerTest {
assertThatThrownBy(
() ->
- manager.commitNewVersion(
- new UpdateSchema(
- rowType,
- partitionKeys,
- primaryKeys,
- Collections.singletonMap(
- CoreOptions.SEQUENCE_FIELD.key(), "f4"),
- "")))
+ retryArtificialException(
+ () ->
+ manager.commitNewVersion(
+ new UpdateSchema(
+ rowType,
+ partitionKeys,
+ primaryKeys,
+ Collections.singletonMap(
+ CoreOptions.SEQUENCE_FIELD
+ .key(),
+ "f4"),
+ ""))))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Nonexistent sequence field: 'f4'");
}
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
index 0984dba1..5e2aeb44 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.spark;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.table.DataTable;
import org.apache.flink.table.store.table.SupportsPartition;
import org.apache.flink.table.store.table.Table;
@@ -39,8 +40,10 @@ import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -118,6 +121,15 @@ public class SparkTable
castToWritable(table).deleteWhere(commitUser, predicates, lockFactory);
}
+ @Override
+ public Map<String, String> properties() {
+ if (table instanceof DataTable) {
+ return ((DataTable) table).options().toMap();
+ } else {
+ return Collections.emptyMap();
+ }
+ }
+
private static org.apache.flink.table.store.table.SupportsWrite castToWritable(Table table) {
if (!(table instanceof org.apache.flink.table.store.table.SupportsWrite)) {
throw new UnsupportedOperationException(
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 1c6f8caf..2d2cba1f 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -147,6 +147,24 @@ public class SparkReadITCase extends SparkReadTestBase {
.isEqualTo("[[a,bigint,], [b,string,], [,,], [# Partitioning,,], [Part 0,a,]]");
}
+ @Test
+ public void testShowTableProperties() {
+ spark.sql("USE tablestore");
+ spark.sql(
+ "CREATE TABLE default.tbl (\n"
+ + "a INT\n"
+ + ") TBLPROPERTIES (\n"
+ + "'k1' = 'v1',\n"
+ + "'k2' = 'v2'"
+ + ")");
+
+ assertThat(
+ spark.sql("SHOW TBLPROPERTIES default.tbl").collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .contains("[k1,v1]", "[k2,v2]");
+ }
+
@Test
public void testCreateTableWithInvalidPk() {
spark.sql("USE tablestore");
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
index fb88cc88..22a096dd 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
@@ -93,6 +93,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
+ " `a` INT NOT NULL,\n"
+ " `b` BIGINT,\n"
+ " `c` STRING)\n"
+ + buildTableProperties("default.db/testAddNotNullColumn")
+ "]]");
assertThatThrownBy(
@@ -120,6 +121,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
+ " `a` INT NOT NULL,\n"
+ " `b` BIGINT,\n"
+ " `c` STRING)\n"
+ + buildTableProperties("default.db/testRenameColumn")
+ "]]");
Dataset<Row> table1 = spark.table("tablestore.default.testRenameColumn");
List<Row> results = table1.select("a", "c").collectAsList();
@@ -135,6 +137,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
+ " `aa` INT NOT NULL,\n"
+ " `b` BIGINT,\n"
+ " `c` STRING)\n"
+ + buildTableProperties("default.db/testRenameColumn")
+ "]]");
Dataset<Row> table2 = spark.table("tablestore.default.testRenameColumn");
results = table2.select("aa", "c").collectAsList();
@@ -163,7 +166,14 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
"[[CREATE TABLE testRenamePartitionKey (\n"
+ " `a` BIGINT,\n"
+ " `b` STRING)\n"
+ + "USING tablestore\n"
+ "PARTITIONED BY (a)\n"
+ + "COMMENT 'table comment'\n"
+ + "TBLPROPERTIES(\n"
+ + " 'foo' = 'bar',\n"
+ + String.format(
+ " 'path' = '%s/%s')\n",
+ warehousePath, "default.db/testRenamePartitionKey")
+ "]]");
assertThatThrownBy(
@@ -194,6 +204,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
+ " `a` INT NOT NULL,\n"
+ " `b` BIGINT,\n"
+ " `c` STRING)\n"
+ + buildTableProperties("default.db/testDropSingleColumn")
+ "]]");
spark.sql("ALTER TABLE tablestore.default.testDropSingleColumn DROP COLUMN a");
@@ -206,6 +217,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
"[[CREATE TABLE testDropSingleColumn (\n"
+ " `b` BIGINT,\n"
+ " `c` STRING)\n"
+ + buildTableProperties("default.db/testDropSingleColumn")
+ "]]");
Dataset<Row> table = spark.table("tablestore.default.testDropSingleColumn");
@@ -226,6 +238,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
+ " `a` INT NOT NULL,\n"
+ " `b` BIGINT,\n"
+ " `c` STRING)\n"
+ + buildTableProperties("default.db/testDropColumns")
+ "]]");
spark.sql("ALTER TABLE tablestore.default.testDropColumns DROP COLUMNS a, b");
@@ -233,7 +246,11 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
List<Row> afterRename =
spark.sql("SHOW CREATE TABLE tablestore.default.testDropColumns").collectAsList();
assertThat(afterRename.toString())
- .isEqualTo("[[CREATE TABLE testDropColumns (\n" + " `c` STRING)\n" + "]]");
+ .isEqualTo(
+ "[[CREATE TABLE testDropColumns (\n"
+ + " `c` STRING)\n"
+ + buildTableProperties("default.db/testDropColumns")
+ + "]]");
}
@Test
@@ -255,7 +272,14 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
"[[CREATE TABLE testDropPartitionKey (\n"
+ " `a` BIGINT,\n"
+ " `b` STRING)\n"
+ + "USING tablestore\n"
+ "PARTITIONED BY (a)\n"
+ + "COMMENT 'table comment'\n"
+ + "TBLPROPERTIES(\n"
+ + " 'foo' = 'bar',\n"
+ + String.format(
+ " 'path' = '%s/%s')\n",
+ warehousePath, "default.db/testDropPartitionKey")
+ "]]");
assertThatThrownBy(
@@ -288,7 +312,13 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
"[[CREATE TABLE testDropPrimaryKey (\n"
+ " `a` BIGINT NOT NULL,\n"
+ " `b` STRING NOT NULL)\n"
+ + "USING tablestore\n"
+ "PARTITIONED BY (a)\n"
+ + "COMMENT 'table comment'\n"
+ + "TBLPROPERTIES(\n"
+ + String.format(
+ " 'path' = '%s/%s')\n",
+ warehousePath, "default.db/testDropPrimaryKey")
+ "]]");
assertThatThrownBy(
@@ -533,4 +563,10 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
.toString())
.isEqualTo("[[15,18,17,16], [19,22,21,20]]");
}
+
+ private String buildTableProperties(String tablePath) {
+ return String.format(
+ "TBLPROPERTIES(\n" + " 'file.format' = 'avro',\n" + " 'path' = '%s/%s')\n",
+ warehousePath, tablePath);
+ }
}