You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/26 02:00:10 UTC

[flink-table-store] branch release-0.2 updated: [FLINK-28668] Fail to create Spark table without primary key

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.2 by this push:
     new a8559f9e [FLINK-28668] Fail to create Spark table without primary key
a8559f9e is described below

commit a8559f9e159ee821982e93bd7ca7fbd80ff76936
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Tue Jul 26 09:59:33 2022 +0800

    [FLINK-28668] Fail to create Spark table without primary key
    
    This closes #236
---
 .../flink/table/store/spark/SparkCatalog.java      |  11 +-
 .../flink/table/store/spark/SparkReadITCase.java   | 182 ++++++++++++++-------
 2 files changed, 132 insertions(+), 61 deletions(-)

diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index d4624b1c..e64150a5 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -284,14 +284,19 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
                                 }));
         Map<String, String> normalizedProperties = new HashMap<>(properties);
         normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER);
+        String pkAsString = properties.get(PRIMARY_KEY_IDENTIFIER);
+        List<String> primaryKeys =
+                pkAsString == null
+                        ? Collections.emptyList()
+                        : Arrays.stream(pkAsString.split(","))
+                                .map(String::trim)
+                                .collect(Collectors.toList());
         return new UpdateSchema(
                 (RowType) toFlinkType(schema),
                 Arrays.stream(partitions)
                         .map(partition -> partition.references()[0].describe())
                         .collect(Collectors.toList()),
-                Arrays.stream(properties.getOrDefault(PRIMARY_KEY_IDENTIFIER, "").split(","))
-                        .map(String::trim)
-                        .collect(Collectors.toList()),
+                primaryKeys,
                 normalizedProperties,
                 properties.getOrDefault(TableCatalog.PROP_COMMENT, ""));
     }
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 67bfa8eb..95fcfafa 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.data.binary.BinaryStringData;
 import org.apache.flink.table.store.file.schema.ArrayDataType;
 import org.apache.flink.table.store.file.schema.AtomicDataType;
 import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.DataType;
 import org.apache.flink.table.store.file.schema.RowDataType;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.table.FileStoreTableFactory;
@@ -52,8 +53,12 @@ import org.junit.jupiter.api.Test;
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -353,22 +358,75 @@ public class SparkReadITCase {
 
     @Test
     public void testCreateAndDropTable() throws Exception {
+        innerTest("MyTable1", true, true, false);
+        innerTest("MyTable2", true, false, false);
+        innerTest("MyTable3", false, false, false);
+        innerTest("MyTable4", false, false, true);
+        innerTest("MyTable5", false, true, false);
+        innerTest("MyTable6", false, true, true);
+    }
+
+    private void innerTest(String tableName, boolean hasPk, boolean partitioned, boolean appendOnly)
+            throws Exception {
         spark.sql("USE table_store");
+        String ddlTemplate =
+                "CREATE TABLE default.%s (\n"
+                        + "order_id BIGINT NOT NULL comment 'order_id',\n"
+                        + "buyer_id BIGINT NOT NULL COMMENT 'buyer_id',\n"
+                        + "coupon_info ARRAY<STRING> NOT NULL COMMENT 'coupon_info',\n"
+                        + "order_amount DOUBLE NOT NULL COMMENT 'order_amount',\n"
+                        + "dt STRING NOT NULL COMMENT 'dt',\n"
+                        + "hh STRING NOT NULL COMMENT 'hh') USING table_store\n"
+                        + "COMMENT 'table comment'\n"
+                        + "%s\n"
+                        + "TBLPROPERTIES (%s)";
+        Map<String, String> tableProperties = new HashMap<>();
+        tableProperties.put("foo", "bar");
+        List<String> columns =
+                Arrays.asList("order_id", "buyer_id", "coupon_info", "order_amount", "dt", "hh");
+        List<DataType> types =
+                Arrays.asList(
+                        new AtomicDataType(new BigIntType(false)),
+                        new AtomicDataType(new BigIntType(false)),
+                        new ArrayDataType(
+                                false,
+                                new AtomicDataType(new VarCharType(true, VarCharType.MAX_LENGTH))),
+                        new AtomicDataType(new DoubleType(false)),
+                        new AtomicDataType(new VarCharType(false, VarCharType.MAX_LENGTH)),
+                        new AtomicDataType(new VarCharType(false, VarCharType.MAX_LENGTH)));
+        List<DataField> fields =
+                IntStream.range(0, columns.size())
+                        .boxed()
+                        .map(i -> new DataField(i, columns.get(i), types.get(i), columns.get(i)))
+                        .collect(Collectors.toList());
+        String partitionStr = "";
+        if (hasPk) {
+            tableProperties.put("primary-key", partitioned ? "order_id,dt,hh" : "order_id");
+        }
+        if (appendOnly) {
+            tableProperties.put("write-mode", "append-only");
+        }
+        if (partitioned) {
+            partitionStr = "PARTITIONED BY (dt, hh)";
+        }
+
         String ddl =
-                "CREATE TABLE default.MyTable (\n"
-                        + "order_id BIGINT NOT NULL comment 'biz order id',\n"
-                        + "buyer_id BIGINT NOT NULL COMMENT 'buyer id',\n"
-                        + "coupon_info ARRAY<STRING> NOT NULL COMMENT 'coupon info',\n"
-                        + "order_amount DOUBLE NOT NULL COMMENT 'order amount',\n"
-                        + "dt STRING NOT NULL COMMENT 'yyyy-MM-dd',\n"
-                        + "hh STRING NOT NULL COMMENT 'HH') USING table_store\n"
-                        + "COMMENT 'my table'\n"
-                        + "PARTITIONED BY (dt, hh)\n"
-                        + "TBLPROPERTIES ('foo' = 'bar', 'primary-key' = 'order_id,dt,hh')";
+                String.format(
+                        ddlTemplate,
+                        tableName,
+                        partitionStr,
+                        tableProperties.entrySet().stream()
+                                .map(
+                                        entry ->
+                                                String.format(
+                                                        "'%s' = '%s'",
+                                                        entry.getKey(), entry.getValue()))
+                                .collect(Collectors.joining(", ")));
+
         spark.sql(ddl);
         assertThatThrownBy(() -> spark.sql(ddl))
                 .isInstanceOf(TableAlreadyExistsException.class)
-                .hasMessageContaining("Table default.MyTable already exists");
+                .hasMessageContaining(String.format("Table default.%s already exists", tableName));
         assertThatThrownBy(() -> spark.sql(ddl.replace("default", "foo")))
                 .isInstanceOf(NoSuchNamespaceException.class)
                 .hasMessageContaining("Namespace 'foo' not found");
@@ -376,60 +434,62 @@ public class SparkReadITCase {
         assertThatThrownBy(
                         () ->
                                 spark.sql(
-                                        "ALTER TABLE default.MyTable UNSET TBLPROPERTIES('primary-key')"))
+                                        String.format(
+                                                "ALTER TABLE default.%s UNSET TBLPROPERTIES('primary-key')",
+                                                tableName)))
                 .isInstanceOf(UnsupportedOperationException.class)
                 .hasMessageContaining("Alter primary key is not supported");
+        assertThatThrownBy(
+                        () ->
+                                spark.sql(
+                                        String.format(
+                                                "ALTER TABLE default.%s SET TBLPROPERTIES('write-mode' = 'append-only')",
+                                                tableName)))
+                .getRootCause()
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("Change write-mode is not supported yet");
 
-        Path tablePath = new Path(warehousePath, "default.db/MyTable");
+        Path tablePath = new Path(warehousePath, String.format("default.db/%s", tableName));
         TableSchema schema = FileStoreTableFactory.create(tablePath).schema();
-        assertThat(schema.fields())
-                .containsExactly(
-                        new DataField(
-                                0,
-                                "order_id",
-                                new AtomicDataType(new BigIntType(false)),
-                                "biz order id"),
-                        new DataField(
-                                1,
-                                "buyer_id",
-                                new AtomicDataType(new BigIntType(false)),
-                                "buyer id"),
-                        new DataField(
-                                2,
-                                "coupon_info",
-                                new ArrayDataType(
-                                        false,
-                                        new AtomicDataType(
-                                                new VarCharType(true, VarCharType.MAX_LENGTH))),
-                                "coupon info"),
-                        new DataField(
-                                3,
-                                "order_amount",
-                                new AtomicDataType(new DoubleType(false)),
-                                "order amount"),
-                        new DataField(
-                                4,
-                                "dt",
-                                new AtomicDataType(new VarCharType(false, VarCharType.MAX_LENGTH)),
-                                "yyyy-MM-dd"),
-                        new DataField(
-                                5,
-                                "hh",
-                                new AtomicDataType(new VarCharType(false, VarCharType.MAX_LENGTH)),
-                                "HH"));
+        assertThat(schema.fields()).containsExactlyElementsOf(fields);
         assertThat(schema.options()).containsEntry("foo", "bar");
         assertThat(schema.options()).doesNotContainKey("primary-key");
-        assertThat(schema.primaryKeys()).containsExactly("order_id", "dt", "hh");
-        assertThat(schema.trimmedPrimaryKeys()).containsOnly("order_id");
-        assertThat(schema.partitionKeys()).containsExactly("dt", "hh");
-        assertThat(schema.comment()).isEqualTo("my table");
+
+        if (hasPk) {
+            if (partitioned) {
+                assertThat(schema.primaryKeys()).containsExactly("order_id", "dt", "hh");
+            } else {
+                assertThat(schema.primaryKeys()).containsExactly("order_id");
+            }
+            assertThat(schema.trimmedPrimaryKeys()).containsOnly("order_id");
+        } else {
+            assertThat(schema.primaryKeys()).isEmpty();
+        }
+
+        if (partitioned) {
+            assertThat(schema.partitionKeys()).containsExactly("dt", "hh");
+        } else {
+            assertThat(schema.partitionKeys()).isEmpty();
+        }
+
+        if (appendOnly) {
+            assertThat(schema.options()).containsEntry("write-mode", "append-only");
+        } else {
+            assertThat(schema.options()).doesNotContainEntry("write-mode", "append-only");
+        }
+
+        assertThat(schema.comment()).isEqualTo("table comment");
 
         SimpleTableTestHelper testHelper =
                 new SimpleTableTestHelper(
                         tablePath,
                         schema.logicalRowType(),
-                        Arrays.asList("dt", "hh"),
-                        Arrays.asList("order_id", "dt", "hh"));
+                        partitioned ? Arrays.asList("dt", "hh") : Collections.emptyList(),
+                        hasPk
+                                ? partitioned
+                                        ? Arrays.asList("order_id", "dt", "hh")
+                                        : Collections.singletonList("order_id")
+                                : Collections.emptyList());
         testHelper.write(
                 GenericRowData.of(
                         1L,
@@ -453,16 +513,22 @@ public class SparkReadITCase {
 
         // test drop table
         assertThat(
-                        spark.sql("SHOW TABLES IN table_store.default LIKE 'MyTable'")
+                        spark.sql(
+                                        String.format(
+                                                "SHOW TABLES IN table_store.default LIKE '%s'",
+                                                tableName))
                                 .select("namespace", "tableName")
                                 .collectAsList()
                                 .toString())
-                .isEqualTo("[[default,MyTable]]");
+                .isEqualTo(String.format("[[default,%s]]", tableName));
 
-        spark.sql("DROP TABLE table_store.default.MyTable");
+        spark.sql(String.format("DROP TABLE table_store.default.%s", tableName));
 
         assertThat(
-                        spark.sql("SHOW TABLES IN table_store.default LIKE 'MyTable'")
+                        spark.sql(
+                                        String.format(
+                                                "SHOW TABLES IN table_store.default LIKE '%s'",
+                                                tableName))
                                 .select("namespace", "tableName")
                                 .collectAsList()
                                 .toString())