You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/26 02:00:10 UTC
[flink-table-store] branch release-0.2 updated: [FLINK-28668] Fail to create Spark table without primary key
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.2 by this push:
new a8559f9e [FLINK-28668] Fail to create Spark table without primary key
a8559f9e is described below
commit a8559f9e159ee821982e93bd7ca7fbd80ff76936
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Tue Jul 26 09:59:33 2022 +0800
[FLINK-28668] Fail to create Spark table without primary key
This closes #236
---
.../flink/table/store/spark/SparkCatalog.java | 11 +-
.../flink/table/store/spark/SparkReadITCase.java | 182 ++++++++++++++-------
2 files changed, 132 insertions(+), 61 deletions(-)
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index d4624b1c..e64150a5 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -284,14 +284,19 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
}));
Map<String, String> normalizedProperties = new HashMap<>(properties);
normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER);
+ String pkAsString = properties.get(PRIMARY_KEY_IDENTIFIER);
+ List<String> primaryKeys =
+ pkAsString == null
+ ? Collections.emptyList()
+ : Arrays.stream(pkAsString.split(","))
+ .map(String::trim)
+ .collect(Collectors.toList());
return new UpdateSchema(
(RowType) toFlinkType(schema),
Arrays.stream(partitions)
.map(partition -> partition.references()[0].describe())
.collect(Collectors.toList()),
- Arrays.stream(properties.getOrDefault(PRIMARY_KEY_IDENTIFIER, "").split(","))
- .map(String::trim)
- .collect(Collectors.toList()),
+ primaryKeys,
normalizedProperties,
properties.getOrDefault(TableCatalog.PROP_COMMENT, ""));
}
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 67bfa8eb..95fcfafa 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.store.file.schema.ArrayDataType;
import org.apache.flink.table.store.file.schema.AtomicDataType;
import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.DataType;
import org.apache.flink.table.store.file.schema.RowDataType;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.table.FileStoreTableFactory;
@@ -52,8 +53,12 @@ import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -353,22 +358,75 @@ public class SparkReadITCase {
@Test
public void testCreateAndDropTable() throws Exception {
+ innerTest("MyTable1", true, true, false);
+ innerTest("MyTable2", true, false, false);
+ innerTest("MyTable3", false, false, false);
+ innerTest("MyTable4", false, false, true);
+ innerTest("MyTable5", false, true, false);
+ innerTest("MyTable6", false, true, true);
+ }
+
+ private void innerTest(String tableName, boolean hasPk, boolean partitioned, boolean appendOnly)
+ throws Exception {
spark.sql("USE table_store");
+ String ddlTemplate =
+ "CREATE TABLE default.%s (\n"
+ + "order_id BIGINT NOT NULL comment 'order_id',\n"
+ + "buyer_id BIGINT NOT NULL COMMENT 'buyer_id',\n"
+ + "coupon_info ARRAY<STRING> NOT NULL COMMENT 'coupon_info',\n"
+ + "order_amount DOUBLE NOT NULL COMMENT 'order_amount',\n"
+ + "dt STRING NOT NULL COMMENT 'dt',\n"
+ + "hh STRING NOT NULL COMMENT 'hh') USING table_store\n"
+ + "COMMENT 'table comment'\n"
+ + "%s\n"
+ + "TBLPROPERTIES (%s)";
+ Map<String, String> tableProperties = new HashMap<>();
+ tableProperties.put("foo", "bar");
+ List<String> columns =
+ Arrays.asList("order_id", "buyer_id", "coupon_info", "order_amount", "dt", "hh");
+ List<DataType> types =
+ Arrays.asList(
+ new AtomicDataType(new BigIntType(false)),
+ new AtomicDataType(new BigIntType(false)),
+ new ArrayDataType(
+ false,
+ new AtomicDataType(new VarCharType(true, VarCharType.MAX_LENGTH))),
+ new AtomicDataType(new DoubleType(false)),
+ new AtomicDataType(new VarCharType(false, VarCharType.MAX_LENGTH)),
+ new AtomicDataType(new VarCharType(false, VarCharType.MAX_LENGTH)));
+ List<DataField> fields =
+ IntStream.range(0, columns.size())
+ .boxed()
+ .map(i -> new DataField(i, columns.get(i), types.get(i), columns.get(i)))
+ .collect(Collectors.toList());
+ String partitionStr = "";
+ if (hasPk) {
+ tableProperties.put("primary-key", partitioned ? "order_id,dt,hh" : "order_id");
+ }
+ if (appendOnly) {
+ tableProperties.put("write-mode", "append-only");
+ }
+ if (partitioned) {
+ partitionStr = "PARTITIONED BY (dt, hh)";
+ }
+
String ddl =
- "CREATE TABLE default.MyTable (\n"
- + "order_id BIGINT NOT NULL comment 'biz order id',\n"
- + "buyer_id BIGINT NOT NULL COMMENT 'buyer id',\n"
- + "coupon_info ARRAY<STRING> NOT NULL COMMENT 'coupon info',\n"
- + "order_amount DOUBLE NOT NULL COMMENT 'order amount',\n"
- + "dt STRING NOT NULL COMMENT 'yyyy-MM-dd',\n"
- + "hh STRING NOT NULL COMMENT 'HH') USING table_store\n"
- + "COMMENT 'my table'\n"
- + "PARTITIONED BY (dt, hh)\n"
- + "TBLPROPERTIES ('foo' = 'bar', 'primary-key' = 'order_id,dt,hh')";
+ String.format(
+ ddlTemplate,
+ tableName,
+ partitionStr,
+ tableProperties.entrySet().stream()
+ .map(
+ entry ->
+ String.format(
+ "'%s' = '%s'",
+ entry.getKey(), entry.getValue()))
+ .collect(Collectors.joining(", ")));
+
spark.sql(ddl);
assertThatThrownBy(() -> spark.sql(ddl))
.isInstanceOf(TableAlreadyExistsException.class)
- .hasMessageContaining("Table default.MyTable already exists");
+ .hasMessageContaining(String.format("Table default.%s already exists", tableName));
assertThatThrownBy(() -> spark.sql(ddl.replace("default", "foo")))
.isInstanceOf(NoSuchNamespaceException.class)
.hasMessageContaining("Namespace 'foo' not found");
@@ -376,60 +434,62 @@ public class SparkReadITCase {
assertThatThrownBy(
() ->
spark.sql(
- "ALTER TABLE default.MyTable UNSET TBLPROPERTIES('primary-key')"))
+ String.format(
+ "ALTER TABLE default.%s UNSET TBLPROPERTIES('primary-key')",
+ tableName)))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessageContaining("Alter primary key is not supported");
+ assertThatThrownBy(
+ () ->
+ spark.sql(
+ String.format(
+ "ALTER TABLE default.%s SET TBLPROPERTIES('write-mode' = 'append-only')",
+ tableName)))
+ .getRootCause()
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("Change write-mode is not supported yet");
- Path tablePath = new Path(warehousePath, "default.db/MyTable");
+ Path tablePath = new Path(warehousePath, String.format("default.db/%s", tableName));
TableSchema schema = FileStoreTableFactory.create(tablePath).schema();
- assertThat(schema.fields())
- .containsExactly(
- new DataField(
- 0,
- "order_id",
- new AtomicDataType(new BigIntType(false)),
- "biz order id"),
- new DataField(
- 1,
- "buyer_id",
- new AtomicDataType(new BigIntType(false)),
- "buyer id"),
- new DataField(
- 2,
- "coupon_info",
- new ArrayDataType(
- false,
- new AtomicDataType(
- new VarCharType(true, VarCharType.MAX_LENGTH))),
- "coupon info"),
- new DataField(
- 3,
- "order_amount",
- new AtomicDataType(new DoubleType(false)),
- "order amount"),
- new DataField(
- 4,
- "dt",
- new AtomicDataType(new VarCharType(false, VarCharType.MAX_LENGTH)),
- "yyyy-MM-dd"),
- new DataField(
- 5,
- "hh",
- new AtomicDataType(new VarCharType(false, VarCharType.MAX_LENGTH)),
- "HH"));
+ assertThat(schema.fields()).containsExactlyElementsOf(fields);
assertThat(schema.options()).containsEntry("foo", "bar");
assertThat(schema.options()).doesNotContainKey("primary-key");
- assertThat(schema.primaryKeys()).containsExactly("order_id", "dt", "hh");
- assertThat(schema.trimmedPrimaryKeys()).containsOnly("order_id");
- assertThat(schema.partitionKeys()).containsExactly("dt", "hh");
- assertThat(schema.comment()).isEqualTo("my table");
+
+ if (hasPk) {
+ if (partitioned) {
+ assertThat(schema.primaryKeys()).containsExactly("order_id", "dt", "hh");
+ } else {
+ assertThat(schema.primaryKeys()).containsExactly("order_id");
+ }
+ assertThat(schema.trimmedPrimaryKeys()).containsOnly("order_id");
+ } else {
+ assertThat(schema.primaryKeys()).isEmpty();
+ }
+
+ if (partitioned) {
+ assertThat(schema.partitionKeys()).containsExactly("dt", "hh");
+ } else {
+ assertThat(schema.partitionKeys()).isEmpty();
+ }
+
+ if (appendOnly) {
+ assertThat(schema.options()).containsEntry("write-mode", "append-only");
+ } else {
+ assertThat(schema.options()).doesNotContainEntry("write-mode", "append-only");
+ }
+
+ assertThat(schema.comment()).isEqualTo("table comment");
SimpleTableTestHelper testHelper =
new SimpleTableTestHelper(
tablePath,
schema.logicalRowType(),
- Arrays.asList("dt", "hh"),
- Arrays.asList("order_id", "dt", "hh"));
+ partitioned ? Arrays.asList("dt", "hh") : Collections.emptyList(),
+ hasPk
+ ? partitioned
+ ? Arrays.asList("order_id", "dt", "hh")
+ : Collections.singletonList("order_id")
+ : Collections.emptyList());
testHelper.write(
GenericRowData.of(
1L,
@@ -453,16 +513,22 @@ public class SparkReadITCase {
// test drop table
assertThat(
- spark.sql("SHOW TABLES IN table_store.default LIKE 'MyTable'")
+ spark.sql(
+ String.format(
+ "SHOW TABLES IN table_store.default LIKE '%s'",
+ tableName))
.select("namespace", "tableName")
.collectAsList()
.toString())
- .isEqualTo("[[default,MyTable]]");
+ .isEqualTo(String.format("[[default,%s]]", tableName));
- spark.sql("DROP TABLE table_store.default.MyTable");
+ spark.sql(String.format("DROP TABLE table_store.default.%s", tableName));
assertThat(
- spark.sql("SHOW TABLES IN table_store.default LIKE 'MyTable'")
+ spark.sql(
+ String.format(
+ "SHOW TABLES IN table_store.default LIKE '%s'",
+ tableName))
.select("namespace", "tableName")
.collectAsList()
.toString())