You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/26 10:06:40 UTC

[flink-table-store] branch master updated: [FLINK-28689] Optimize Spark documentation to Catalog and Dataset

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new c62d7218 [FLINK-28689] Optimize Spark documentation to Catalog and Dataset
c62d7218 is described below

commit c62d721865e08d4f119b118461829261c8f4819b
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Jul 26 18:06:35 2022 +0800

    [FLINK-28689] Optimize Spark documentation to Catalog and Dataset
    
    This closes #241
---
 docs/content/docs/engines/spark.md                 | 55 ++++++-------
 .../flink/table/store/spark/SparkReadITCase.java   | 96 +++++++++++-----------
 2 files changed, 71 insertions(+), 80 deletions(-)

diff --git a/docs/content/docs/engines/spark.md b/docs/content/docs/engines/spark.md
index 582e3433..7582d9e9 100644
--- a/docs/content/docs/engines/spark.md
+++ b/docs/content/docs/engines/spark.md
@@ -50,12 +50,12 @@ Alternatively, you can copy `flink-table-store-spark-{{< version >}}.jar` under
 
 ## Catalog
 
-The following command registers the Table Store's Spark catalog with the name `table_store`:
+The following command registers the Table Store's Spark catalog with the name `tablestore`:
 
 ```bash
 spark-sql ... \
-    --conf spark.sql.catalog.table_store=org.apache.flink.table.store.spark.SparkCatalog \
-    --conf spark.sql.catalog.table_store.warehouse=file:/tmp/warehouse
+    --conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog \
+    --conf spark.sql.catalog.tablestore.warehouse=file:/tmp/warehouse
 ```
 
 Some extra configurations are needed if your Table Store Catalog uses the Hive
@@ -63,29 +63,24 @@ Metastore (No extra configuration is required for read-only).
 
 ```bash
 spark-sql ... \
-    --conf spark.sql.catalog.table_store=org.apache.flink.table.store.spark.SparkCatalog \
-    --conf spark.sql.catalog.table_store.warehouse=file:/tmp/warehouse \
-    --conf spark.sql.catalog.table_store.metastore=hive \
-    --conf spark.sql.catalog.table_store.uri=thrift://...
+    --conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog \
+    --conf spark.sql.catalog.tablestore.warehouse=file:/tmp/warehouse \
+    --conf spark.sql.catalog.tablestore.metastore=hive \
+    --conf spark.sql.catalog.tablestore.uri=thrift://...
 ```
 
-## Create Temporary View
-
-Use the `CREATE TEMPORARY VIEW` command to create a Spark mapping table on top of
-an existing Table Store table if you don't want to use Table Store Catalog.
+## Query Table
 
 ```sql
-CREATE TEMPORARY VIEW myTable
-USING tablestore
-OPTIONS (
-  path "file:/tmp/warehouse/default.db/myTable"
-)
+SELECT * FROM tablestore.default.myTable;
 ```
 
-## Query Table
+## DataSet
 
-```sql
-SELECT * FROM table_store.default.myTable;
+You can load a mapping table as DataSet on top of an existing Table Store table if you don't want to use Table Store Catalog.
+
+```scala
+val dataset = spark.read.format("tablestore").load("file:/tmp/warehouse/default.db/myTable")
 ```
 
 ## DDL Statements
@@ -94,14 +89,14 @@ SELECT * FROM table_store.default.myTable;
 ```sql
 CREATE TABLE [IF NOT EXISTS] table_identifier 
 [ ( col_name1[:] col_type1 [ COMMENT col_comment1 ], ... ) ]
-[ USING table_store ]    
+[ USING tablestore ]    
 [ COMMENT table_comment ]
 [ PARTITIONED BY ( col_name1, col_name2, ... ) ]
 [ TBLPROPERTIES ( key1=val1, key2=val2, ... ) ]       
 ```
 For example, create an order table with `order_id` as primary key and partitioned by `dt, hh`.
 ```sql
-CREATE TABLE table_store.default.OrderTable (
+CREATE TABLE tablestore.default.OrderTable (
     order_id BIGINT NOT NULL comment 'biz order id',
     buyer_id BIGINT NOT NULL COMMENT 'buyer id',
     coupon_info ARRAY<STRING> NOT NULL COMMENT 'coupon info',
@@ -116,7 +111,7 @@ TBLPROPERTIES ('foo' = 'bar', 'primary-key' = 'order_id,dt,hh')
 __Note:__
 - Primary key feature is supported via table properties, and composite primary key is delimited with comma.
 - Partition fields should be predefined, complex partition such like `PARTITIONED BY ( col_name2[:] col_type2 [ COMMENT col_comment2 ], ... )` is not supported.
-- For Spark 3.0, `CREATE TABLE USING table_store` is required.
+- For Spark 3.0, `CREATE TABLE USING tablestore` is required.
 {{< /hint >}}
 
 ### Alter Table
@@ -130,29 +125,29 @@ SET TBLPROPERTIES ( key1=val1 )
 
 - Change/add table properties
 ```sql
-ALTER TABLE table_store.default.OrderTable SET TBLPROPERTIES (
+ALTER TABLE tablestore.default.OrderTable SET TBLPROPERTIES (
     'write-buffer-size'='256 MB'
 )
 ```
 
 - Remove a table property
 ```sql
-ALTER TABLE table_store.default.OrderTable UNSET TBLPROPERTIES ('write-buffer-size')
+ALTER TABLE tablestore.default.OrderTable UNSET TBLPROPERTIES ('write-buffer-size')
 ```
 
 - Add a new column
 ```sql
-ALTER TABLE table_store.default.OrderTable ADD COLUMNS (buy_count INT)
+ALTER TABLE tablestore.default.OrderTable ADD COLUMNS (buy_count INT)
 ```
 
 - Change column nullability
 ```sql
-ALTER TABLE table_store.default.OrderTable ALTER COLUMN coupon_info DROP NOT NULL
+ALTER TABLE tablestore.default.OrderTable ALTER COLUMN coupon_info DROP NOT NULL
 ```
 
 - Change column comment
 ```sql
-ALTER TABLE table_store.default.OrderTable ALTER COLUMN buy_count COMMENT 'buy count'
+ALTER TABLE tablestore.default.OrderTable ALTER COLUMN buy_count COMMENT 'buy count'
 ```
 
 {{< hint info >}}
@@ -163,7 +158,7 @@ __Note:__
 ### Drop Table
 
 ```sql
-DROP TABLE table_store.default.OrderTable
+DROP TABLE tablestore.default.OrderTable
 ```
 {{< hint warning >}}
 __Attention__: Drop a table will delete both metadata and files on the disk.
@@ -172,13 +167,13 @@ __Attention__: Drop a table will delete both metadata and files on the disk.
 ### Create Namespace
 
 ```sql
-CREATE NAMESPACE [IF NOT EXISTS] table_store.bar
+CREATE NAMESPACE [IF NOT EXISTS] tablestore.bar
 ```
 
 ### Drop Namespace
 
 ```sql
-DROP NAMESPACE table_store.bar
+DROP NAMESPACE tablestore.bar
 ```
 
 {{< hint warning >}}
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 1326ae27..93056d9f 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -83,8 +83,8 @@ public class SparkReadITCase {
         assertThat(warehouse.delete()).isTrue();
         warehousePath = new Path("file:" + warehouse);
         spark = SparkSession.builder().master("local[2]").getOrCreate();
-        spark.conf().set("spark.sql.catalog.table_store", SparkCatalog.class.getName());
-        spark.conf().set("spark.sql.catalog.table_store.warehouse", warehousePath.toString());
+        spark.conf().set("spark.sql.catalog.tablestore", SparkCatalog.class.getName());
+        spark.conf().set("spark.sql.catalog.tablestore.warehouse", warehousePath.toString());
 
         // flink sink
         tablePath1 = new Path(warehousePath, "default.db/t1");
@@ -204,44 +204,42 @@ public class SparkReadITCase {
 
     @Test
     public void testNormal() {
-        innerTestSimpleType(
-                spark.read().format("tablestore").option("path", tablePath1.toString()).load());
+        innerTestSimpleType(spark.read().format("tablestore").load(tablePath1.toString()));
 
-        innerTestNestedType(
-                spark.read().format("tablestore").option("path", tablePath2.toString()).load());
+        innerTestNestedType(spark.read().format("tablestore").load(tablePath2.toString()));
     }
 
     @Test
     public void testFilterPushDown() {
         innerTestSimpleTypeFilterPushDown(
-                spark.read().format("tablestore").option("path", tablePath1.toString()).load());
+                spark.read().format("tablestore").load(tablePath1.toString()));
 
         innerTestNestedTypeFilterPushDown(
-                spark.read().format("tablestore").option("path", tablePath2.toString()).load());
+                spark.read().format("tablestore").load(tablePath2.toString()));
     }
 
     @Test
     public void testCatalogNormal() {
-        innerTestSimpleType(spark.table("table_store.default.t1"));
+        innerTestSimpleType(spark.table("tablestore.default.t1"));
 
-        innerTestNestedType(spark.table("table_store.default.t2"));
+        innerTestNestedType(spark.table("tablestore.default.t2"));
     }
 
     @Test
     public void testCatalogFilterPushDown() {
-        innerTestSimpleTypeFilterPushDown(spark.table("table_store.default.t1"));
+        innerTestSimpleTypeFilterPushDown(spark.table("tablestore.default.t1"));
 
-        innerTestNestedTypeFilterPushDown(spark.table("table_store.default.t2"));
+        innerTestNestedTypeFilterPushDown(spark.table("tablestore.default.t2"));
     }
 
     @Test
     public void testSetAndRemoveOption() {
-        spark.sql("ALTER TABLE table_store.default.t1 SET TBLPROPERTIES('xyc' 'unknown1')");
+        spark.sql("ALTER TABLE tablestore.default.t1 SET TBLPROPERTIES('xyc' 'unknown1')");
 
         Map<String, String> options = schema1().options();
         assertThat(options).containsEntry("xyc", "unknown1");
 
-        spark.sql("ALTER TABLE table_store.default.t1 UNSET TBLPROPERTIES('xyc')");
+        spark.sql("ALTER TABLE tablestore.default.t1 UNSET TBLPROPERTIES('xyc')");
 
         options = schema1().options();
         assertThat(options).doesNotContainKey("xyc");
@@ -249,7 +247,7 @@ public class SparkReadITCase {
         assertThatThrownBy(
                         () ->
                                 spark.sql(
-                                        "ALTER TABLE table_store.default.t1 SET TBLPROPERTIES('primary-key' = 'a')"))
+                                        "ALTER TABLE tablestore.default.t1 SET TBLPROPERTIES('primary-key' = 'a')"))
                 .isInstanceOf(UnsupportedOperationException.class)
                 .hasMessageContaining("Alter primary key is not supported");
     }
@@ -262,9 +260,9 @@ public class SparkReadITCase {
         testHelper1.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
         testHelper1.commit();
 
-        spark.sql("ALTER TABLE table_store.default.testAddColumn ADD COLUMN d STRING");
+        spark.sql("ALTER TABLE tablestore.default.testAddColumn ADD COLUMN d STRING");
 
-        Dataset<Row> table = spark.table("table_store.default.testAddColumn");
+        Dataset<Row> table = spark.table("tablestore.default.testAddColumn");
         List<Row> results = table.collectAsList();
         assertThat(results.toString()).isEqualTo("[[1,2,1,null], [5,6,3,null]]");
 
@@ -283,8 +281,8 @@ public class SparkReadITCase {
         testHelper1.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
         testHelper1.commit();
 
-        spark.sql("ALTER TABLE table_store.default.testAlterColumnType ALTER COLUMN a TYPE BIGINT");
-        innerTestSimpleType(spark.table("table_store.default.testAlterColumnType"));
+        spark.sql("ALTER TABLE tablestore.default.testAlterColumnType ALTER COLUMN a TYPE BIGINT");
+        innerTestSimpleType(spark.table("tablestore.default.testAlterColumnType"));
     }
 
     @Test
@@ -300,30 +298,30 @@ public class SparkReadITCase {
                 .isFalse();
 
         // note: for Spark, it is illegal to change nullable column to non-nullable
-        spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN a DROP NOT NULL");
+        spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN a DROP NOT NULL");
         assertThat(fieldIsNullable(getField(schema2(), 0))).isTrue();
 
-        spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN b DROP NOT NULL");
+        spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN b DROP NOT NULL");
         assertThat(fieldIsNullable(getField(schema2(), 1))).isTrue();
 
-        spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN c DROP NOT NULL");
+        spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c DROP NOT NULL");
         assertThat(fieldIsNullable(getField(schema2(), 2))).isTrue();
 
-        spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN c.c1 DROP NOT NULL");
+        spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1 DROP NOT NULL");
         assertThat(fieldIsNullable(getNestedField(getField(schema2(), 2), 0))).isTrue();
 
-        spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN c.c1.c12 DROP NOT NULL");
+        spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1.c12 DROP NOT NULL");
         assertThat(fieldIsNullable(getNestedField(getNestedField(getField(schema2(), 2), 0), 1)))
                 .isTrue();
     }
 
     @Test
     public void testAlterPrimaryKeyNullability() {
-        spark.sql("USE table_store");
+        spark.sql("USE tablestore");
         spark.sql(
                 "CREATE TABLE default.testAlterPkNullability (\n"
                         + "a BIGINT,\n"
-                        + "b STRING) USING table_store\n"
+                        + "b STRING) USING tablestore\n"
                         + "COMMENT 'table comment'\n"
                         + "TBLPROPERTIES ('primary-key' = 'a')");
         assertThatThrownBy(
@@ -339,11 +337,10 @@ public class SparkReadITCase {
     public void testAlterTableColumnComment() {
         assertThat(getField(schema1(), 0).description()).isNull();
 
-        spark.sql("ALTER TABLE table_store.default.t1 ALTER COLUMN a COMMENT 'a new comment'");
+        spark.sql("ALTER TABLE tablestore.default.t1 ALTER COLUMN a COMMENT 'a new comment'");
         assertThat(getField(schema1(), 0).description()).isEqualTo("a new comment");
 
-        spark.sql(
-                "ALTER TABLE table_store.default.t1 ALTER COLUMN a COMMENT 'yet another comment'");
+        spark.sql("ALTER TABLE tablestore.default.t1 ALTER COLUMN a COMMENT 'yet another comment'");
         assertThat(getField(schema1(), 0).description()).isEqualTo("yet another comment");
 
         assertThat(getField(schema2(), 2).description()).isEqualTo("comment about c");
@@ -356,13 +353,13 @@ public class SparkReadITCase {
                 .isNull();
 
         spark.sql(
-                "ALTER TABLE table_store.default.t2 ALTER COLUMN c COMMENT 'yet another comment about c'");
-        spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN c.c1 COMMENT 'a nested type'");
-        spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN c.c2 COMMENT 'a bigint type'");
+                "ALTER TABLE tablestore.default.t2 ALTER COLUMN c COMMENT 'yet another comment about c'");
+        spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1 COMMENT 'a nested type'");
+        spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c2 COMMENT 'a bigint type'");
         spark.sql(
-                "ALTER TABLE table_store.default.t2 ALTER COLUMN c.c1.c11 COMMENT 'a double type'");
+                "ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1.c11 COMMENT 'a double type'");
         spark.sql(
-                "ALTER TABLE table_store.default.t2 ALTER COLUMN c.c1.c12 COMMENT 'a boolean array'");
+                "ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1.c12 COMMENT 'a boolean array'");
 
         assertThat(getField(schema2(), 2).description()).isEqualTo("yet another comment about c");
         assertThat(getNestedField(getField(schema2(), 2), 0).description())
@@ -377,11 +374,11 @@ public class SparkReadITCase {
 
     @Test
     public void testCreateTableWithNullablePk() {
-        spark.sql("USE table_store");
+        spark.sql("USE tablestore");
         spark.sql(
                 "CREATE TABLE default.PkTable (\n"
                         + "a BIGINT,\n"
-                        + "b STRING) USING table_store\n"
+                        + "b STRING) USING tablestore\n"
                         + "COMMENT 'table comment'\n"
                         + "TBLPROPERTIES ('primary-key' = 'a')");
         Path tablePath = new Path(warehousePath, "default.db/PkTable");
@@ -391,14 +388,14 @@ public class SparkReadITCase {
 
     @Test
     public void testCreateTableWithInvalidPk() {
-        spark.sql("USE table_store");
+        spark.sql("USE tablestore");
         assertThatThrownBy(
                         () ->
                                 spark.sql(
                                         "CREATE TABLE default.PartitionedPkTable (\n"
                                                 + "a BIGINT,\n"
                                                 + "b STRING,\n"
-                                                + "c DOUBLE) USING table_store\n"
+                                                + "c DOUBLE) USING tablestore\n"
                                                 + "COMMENT 'table comment'\n"
                                                 + "PARTITIONED BY (b)"
                                                 + "TBLPROPERTIES ('primary-key' = 'a')"))
@@ -409,14 +406,14 @@ public class SparkReadITCase {
 
     @Test
     public void testCreateTableWithNonexistentPk() {
-        spark.sql("USE table_store");
+        spark.sql("USE tablestore");
         assertThatThrownBy(
                         () ->
                                 spark.sql(
                                         "CREATE TABLE default.PartitionedPkTable (\n"
                                                 + "a BIGINT,\n"
                                                 + "b STRING,\n"
-                                                + "c DOUBLE) USING table_store\n"
+                                                + "c DOUBLE) USING tablestore\n"
                                                 + "COMMENT 'table comment'\n"
                                                 + "PARTITIONED BY (b)"
                                                 + "TBLPROPERTIES ('primary-key' = 'd')"))
@@ -427,14 +424,14 @@ public class SparkReadITCase {
 
     @Test
     public void testCreateTableWithNonexistentPartition() {
-        spark.sql("USE table_store");
+        spark.sql("USE tablestore");
         assertThatThrownBy(
                         () ->
                                 spark.sql(
                                         "CREATE TABLE default.PartitionedPkTable (\n"
                                                 + "a BIGINT,\n"
                                                 + "b STRING,\n"
-                                                + "c DOUBLE) USING table_store\n"
+                                                + "c DOUBLE) USING tablestore\n"
                                                 + "COMMENT 'table comment'\n"
                                                 + "PARTITIONED BY (d)"
                                                 + "TBLPROPERTIES ('primary-key' = 'a')"))
@@ -454,7 +451,7 @@ public class SparkReadITCase {
 
     private void innerTest(String tableName, boolean hasPk, boolean partitioned, boolean appendOnly)
             throws Exception {
-        spark.sql("USE table_store");
+        spark.sql("USE tablestore");
         String ddlTemplate =
                 "CREATE TABLE default.%s (\n"
                         + "order_id BIGINT NOT NULL comment 'order_id',\n"
@@ -462,7 +459,7 @@ public class SparkReadITCase {
                         + "coupon_info ARRAY<STRING> NOT NULL COMMENT 'coupon_info',\n"
                         + "order_amount DOUBLE NOT NULL COMMENT 'order_amount',\n"
                         + "dt STRING NOT NULL COMMENT 'dt',\n"
-                        + "hh STRING NOT NULL COMMENT 'hh') USING table_store\n"
+                        + "hh STRING NOT NULL COMMENT 'hh') USING tablestore\n"
                         + "COMMENT 'table comment'\n"
                         + "%s\n"
                         + "TBLPROPERTIES (%s)";
@@ -590,8 +587,7 @@ public class SparkReadITCase {
                         BinaryStringData.fromString("12")));
         testHelper.commit();
 
-        Dataset<Row> dataset =
-                spark.read().format("tablestore").option("path", tablePath.toString()).load();
+        Dataset<Row> dataset = spark.read().format("tablestore").load(tablePath.toString());
         assertThat(dataset.select("order_id", "buyer_id", "dt").collectAsList().toString())
                 .isEqualTo("[[1,10,2022-07-20]]");
         assertThat(dataset.select("coupon_info").collectAsList().toString())
@@ -601,19 +597,19 @@ public class SparkReadITCase {
         assertThat(
                         spark.sql(
                                         String.format(
-                                                "SHOW TABLES IN table_store.default LIKE '%s'",
+                                                "SHOW TABLES IN tablestore.default LIKE '%s'",
                                                 tableName))
                                 .select("namespace", "tableName")
                                 .collectAsList()
                                 .toString())
                 .isEqualTo(String.format("[[default,%s]]", tableName));
 
-        spark.sql(String.format("DROP TABLE table_store.default.%s", tableName));
+        spark.sql(String.format("DROP TABLE tablestore.default.%s", tableName));
 
         assertThat(
                         spark.sql(
                                         String.format(
-                                                "SHOW TABLES IN table_store.default LIKE '%s'",
+                                                "SHOW TABLES IN tablestore.default LIKE '%s'",
                                                 tableName))
                                 .select("namespace", "tableName")
                                 .collectAsList()
@@ -626,7 +622,7 @@ public class SparkReadITCase {
     @Test
     public void testCreateAndDropNamespace() {
         // create namespace
-        spark.sql("USE table_store");
+        spark.sql("USE tablestore");
         spark.sql("CREATE NAMESPACE bar");
 
         assertThatThrownBy(() -> spark.sql("CREATE NAMESPACE bar"))