You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/26 10:07:36 UTC
[flink-table-store] branch release-0.2 updated: [FLINK-28689] Optimize Spark documentation to Catalog and Dataset
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.2 by this push:
new fbb58071 [FLINK-28689] Optimize Spark documentation to Catalog and Dataset
fbb58071 is described below
commit fbb5807127d7d190c5631f3e006ea6845395897a
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Jul 26 18:06:35 2022 +0800
[FLINK-28689] Optimize Spark documentation to Catalog and Dataset
This closes #241
---
docs/content/docs/engines/spark.md | 55 ++++++-------
.../flink/table/store/spark/SparkReadITCase.java | 96 +++++++++++-----------
2 files changed, 71 insertions(+), 80 deletions(-)
diff --git a/docs/content/docs/engines/spark.md b/docs/content/docs/engines/spark.md
index 582e3433..7582d9e9 100644
--- a/docs/content/docs/engines/spark.md
+++ b/docs/content/docs/engines/spark.md
@@ -50,12 +50,12 @@ Alternatively, you can copy `flink-table-store-spark-{{< version >}}.jar` under
## Catalog
-The following command registers the Table Store's Spark catalog with the name `table_store`:
+The following command registers the Table Store's Spark catalog with the name `tablestore`:
```bash
spark-sql ... \
- --conf spark.sql.catalog.table_store=org.apache.flink.table.store.spark.SparkCatalog \
- --conf spark.sql.catalog.table_store.warehouse=file:/tmp/warehouse
+ --conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog \
+ --conf spark.sql.catalog.tablestore.warehouse=file:/tmp/warehouse
```
Some extra configurations are needed if your Table Store Catalog uses the Hive
@@ -63,29 +63,24 @@ Metastore (No extra configuration is required for read-only).
```bash
spark-sql ... \
- --conf spark.sql.catalog.table_store=org.apache.flink.table.store.spark.SparkCatalog \
- --conf spark.sql.catalog.table_store.warehouse=file:/tmp/warehouse \
- --conf spark.sql.catalog.table_store.metastore=hive \
- --conf spark.sql.catalog.table_store.uri=thrift://...
+ --conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog \
+ --conf spark.sql.catalog.tablestore.warehouse=file:/tmp/warehouse \
+ --conf spark.sql.catalog.tablestore.metastore=hive \
+ --conf spark.sql.catalog.tablestore.uri=thrift://...
```
-## Create Temporary View
-
-Use the `CREATE TEMPORARY VIEW` command to create a Spark mapping table on top of
-an existing Table Store table if you don't want to use Table Store Catalog.
+## Query Table
```sql
-CREATE TEMPORARY VIEW myTable
-USING tablestore
-OPTIONS (
- path "file:/tmp/warehouse/default.db/myTable"
-)
+SELECT * FROM tablestore.default.myTable;
```
-## Query Table
+## DataSet
-```sql
-SELECT * FROM table_store.default.myTable;
+You can load a mapping table as DataSet on top of an existing Table Store table if you don't want to use Table Store Catalog.
+
+```scala
+val dataset = spark.read.format("tablestore").load("file:/tmp/warehouse/default.db/myTable")
```
## DDL Statements
@@ -94,14 +89,14 @@ SELECT * FROM table_store.default.myTable;
```sql
CREATE TABLE [IF NOT EXISTS] table_identifier
[ ( col_name1[:] col_type1 [ COMMENT col_comment1 ], ... ) ]
-[ USING table_store ]
+[ USING tablestore ]
[ COMMENT table_comment ]
[ PARTITIONED BY ( col_name1, col_name2, ... ) ]
[ TBLPROPERTIES ( key1=val1, key2=val2, ... ) ]
```
For example, create an order table with `order_id` as primary key and partitioned by `dt, hh`.
```sql
-CREATE TABLE table_store.default.OrderTable (
+CREATE TABLE tablestore.default.OrderTable (
order_id BIGINT NOT NULL comment 'biz order id',
buyer_id BIGINT NOT NULL COMMENT 'buyer id',
coupon_info ARRAY<STRING> NOT NULL COMMENT 'coupon info',
@@ -116,7 +111,7 @@ TBLPROPERTIES ('foo' = 'bar', 'primary-key' = 'order_id,dt,hh')
__Note:__
- Primary key feature is supported via table properties, and composite primary key is delimited with comma.
- Partition fields should be predefined, complex partition such like `PARTITIONED BY ( col_name2[:] col_type2 [ COMMENT col_comment2 ], ... )` is not supported.
-- For Spark 3.0, `CREATE TABLE USING table_store` is required.
+- For Spark 3.0, `CREATE TABLE USING tablestore` is required.
{{< /hint >}}
### Alter Table
@@ -130,29 +125,29 @@ SET TBLPROPERTIES ( key1=val1 )
- Change/add table properties
```sql
-ALTER TABLE table_store.default.OrderTable SET TBLPROPERTIES (
+ALTER TABLE tablestore.default.OrderTable SET TBLPROPERTIES (
'write-buffer-size'='256 MB'
)
```
- Remove a table property
```sql
-ALTER TABLE table_store.default.OrderTable UNSET TBLPROPERTIES ('write-buffer-size')
+ALTER TABLE tablestore.default.OrderTable UNSET TBLPROPERTIES ('write-buffer-size')
```
- Add a new column
```sql
-ALTER TABLE table_store.default.OrderTable ADD COLUMNS (buy_count INT)
+ALTER TABLE tablestore.default.OrderTable ADD COLUMNS (buy_count INT)
```
- Change column nullability
```sql
-ALTER TABLE table_store.default.OrderTable ALTER COLUMN coupon_info DROP NOT NULL
+ALTER TABLE tablestore.default.OrderTable ALTER COLUMN coupon_info DROP NOT NULL
```
- Change column comment
```sql
-ALTER TABLE table_store.default.OrderTable ALTER COLUMN buy_count COMMENT 'buy count'
+ALTER TABLE tablestore.default.OrderTable ALTER COLUMN buy_count COMMENT 'buy count'
```
{{< hint info >}}
@@ -163,7 +158,7 @@ __Note:__
### Drop Table
```sql
-DROP TABLE table_store.default.OrderTable
+DROP TABLE tablestore.default.OrderTable
```
{{< hint warning >}}
__Attention__: Drop a table will delete both metadata and files on the disk.
@@ -172,13 +167,13 @@ __Attention__: Drop a table will delete both metadata and files on the disk.
### Create Namespace
```sql
-CREATE NAMESPACE [IF NOT EXISTS] table_store.bar
+CREATE NAMESPACE [IF NOT EXISTS] tablestore.bar
```
### Drop Namespace
```sql
-DROP NAMESPACE table_store.bar
+DROP NAMESPACE tablestore.bar
```
{{< hint warning >}}
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 1326ae27..93056d9f 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -83,8 +83,8 @@ public class SparkReadITCase {
assertThat(warehouse.delete()).isTrue();
warehousePath = new Path("file:" + warehouse);
spark = SparkSession.builder().master("local[2]").getOrCreate();
- spark.conf().set("spark.sql.catalog.table_store", SparkCatalog.class.getName());
- spark.conf().set("spark.sql.catalog.table_store.warehouse", warehousePath.toString());
+ spark.conf().set("spark.sql.catalog.tablestore", SparkCatalog.class.getName());
+ spark.conf().set("spark.sql.catalog.tablestore.warehouse", warehousePath.toString());
// flink sink
tablePath1 = new Path(warehousePath, "default.db/t1");
@@ -204,44 +204,42 @@ public class SparkReadITCase {
@Test
public void testNormal() {
- innerTestSimpleType(
- spark.read().format("tablestore").option("path", tablePath1.toString()).load());
+ innerTestSimpleType(spark.read().format("tablestore").load(tablePath1.toString()));
- innerTestNestedType(
- spark.read().format("tablestore").option("path", tablePath2.toString()).load());
+ innerTestNestedType(spark.read().format("tablestore").load(tablePath2.toString()));
}
@Test
public void testFilterPushDown() {
innerTestSimpleTypeFilterPushDown(
- spark.read().format("tablestore").option("path", tablePath1.toString()).load());
+ spark.read().format("tablestore").load(tablePath1.toString()));
innerTestNestedTypeFilterPushDown(
- spark.read().format("tablestore").option("path", tablePath2.toString()).load());
+ spark.read().format("tablestore").load(tablePath2.toString()));
}
@Test
public void testCatalogNormal() {
- innerTestSimpleType(spark.table("table_store.default.t1"));
+ innerTestSimpleType(spark.table("tablestore.default.t1"));
- innerTestNestedType(spark.table("table_store.default.t2"));
+ innerTestNestedType(spark.table("tablestore.default.t2"));
}
@Test
public void testCatalogFilterPushDown() {
- innerTestSimpleTypeFilterPushDown(spark.table("table_store.default.t1"));
+ innerTestSimpleTypeFilterPushDown(spark.table("tablestore.default.t1"));
- innerTestNestedTypeFilterPushDown(spark.table("table_store.default.t2"));
+ innerTestNestedTypeFilterPushDown(spark.table("tablestore.default.t2"));
}
@Test
public void testSetAndRemoveOption() {
- spark.sql("ALTER TABLE table_store.default.t1 SET TBLPROPERTIES('xyc' 'unknown1')");
+ spark.sql("ALTER TABLE tablestore.default.t1 SET TBLPROPERTIES('xyc' 'unknown1')");
Map<String, String> options = schema1().options();
assertThat(options).containsEntry("xyc", "unknown1");
- spark.sql("ALTER TABLE table_store.default.t1 UNSET TBLPROPERTIES('xyc')");
+ spark.sql("ALTER TABLE tablestore.default.t1 UNSET TBLPROPERTIES('xyc')");
options = schema1().options();
assertThat(options).doesNotContainKey("xyc");
@@ -249,7 +247,7 @@ public class SparkReadITCase {
assertThatThrownBy(
() ->
spark.sql(
- "ALTER TABLE table_store.default.t1 SET TBLPROPERTIES('primary-key' = 'a')"))
+ "ALTER TABLE tablestore.default.t1 SET TBLPROPERTIES('primary-key' = 'a')"))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessageContaining("Alter primary key is not supported");
}
@@ -262,9 +260,9 @@ public class SparkReadITCase {
testHelper1.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
testHelper1.commit();
- spark.sql("ALTER TABLE table_store.default.testAddColumn ADD COLUMN d STRING");
+ spark.sql("ALTER TABLE tablestore.default.testAddColumn ADD COLUMN d STRING");
- Dataset<Row> table = spark.table("table_store.default.testAddColumn");
+ Dataset<Row> table = spark.table("tablestore.default.testAddColumn");
List<Row> results = table.collectAsList();
assertThat(results.toString()).isEqualTo("[[1,2,1,null], [5,6,3,null]]");
@@ -283,8 +281,8 @@ public class SparkReadITCase {
testHelper1.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
testHelper1.commit();
- spark.sql("ALTER TABLE table_store.default.testAlterColumnType ALTER COLUMN a TYPE BIGINT");
- innerTestSimpleType(spark.table("table_store.default.testAlterColumnType"));
+ spark.sql("ALTER TABLE tablestore.default.testAlterColumnType ALTER COLUMN a TYPE BIGINT");
+ innerTestSimpleType(spark.table("tablestore.default.testAlterColumnType"));
}
@Test
@@ -300,30 +298,30 @@ public class SparkReadITCase {
.isFalse();
// note: for Spark, it is illegal to change nullable column to non-nullable
- spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN a DROP NOT NULL");
+ spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN a DROP NOT NULL");
assertThat(fieldIsNullable(getField(schema2(), 0))).isTrue();
- spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN b DROP NOT NULL");
+ spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN b DROP NOT NULL");
assertThat(fieldIsNullable(getField(schema2(), 1))).isTrue();
- spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN c DROP NOT NULL");
+ spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c DROP NOT NULL");
assertThat(fieldIsNullable(getField(schema2(), 2))).isTrue();
- spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN c.c1 DROP NOT NULL");
+ spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1 DROP NOT NULL");
assertThat(fieldIsNullable(getNestedField(getField(schema2(), 2), 0))).isTrue();
- spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN c.c1.c12 DROP NOT NULL");
+ spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1.c12 DROP NOT NULL");
assertThat(fieldIsNullable(getNestedField(getNestedField(getField(schema2(), 2), 0), 1)))
.isTrue();
}
@Test
public void testAlterPrimaryKeyNullability() {
- spark.sql("USE table_store");
+ spark.sql("USE tablestore");
spark.sql(
"CREATE TABLE default.testAlterPkNullability (\n"
+ "a BIGINT,\n"
- + "b STRING) USING table_store\n"
+ + "b STRING) USING tablestore\n"
+ "COMMENT 'table comment'\n"
+ "TBLPROPERTIES ('primary-key' = 'a')");
assertThatThrownBy(
@@ -339,11 +337,10 @@ public class SparkReadITCase {
public void testAlterTableColumnComment() {
assertThat(getField(schema1(), 0).description()).isNull();
- spark.sql("ALTER TABLE table_store.default.t1 ALTER COLUMN a COMMENT 'a new comment'");
+ spark.sql("ALTER TABLE tablestore.default.t1 ALTER COLUMN a COMMENT 'a new comment'");
assertThat(getField(schema1(), 0).description()).isEqualTo("a new comment");
- spark.sql(
- "ALTER TABLE table_store.default.t1 ALTER COLUMN a COMMENT 'yet another comment'");
+ spark.sql("ALTER TABLE tablestore.default.t1 ALTER COLUMN a COMMENT 'yet another comment'");
assertThat(getField(schema1(), 0).description()).isEqualTo("yet another comment");
assertThat(getField(schema2(), 2).description()).isEqualTo("comment about c");
@@ -356,13 +353,13 @@ public class SparkReadITCase {
.isNull();
spark.sql(
- "ALTER TABLE table_store.default.t2 ALTER COLUMN c COMMENT 'yet another comment about c'");
- spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN c.c1 COMMENT 'a nested type'");
- spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN c.c2 COMMENT 'a bigint type'");
+ "ALTER TABLE tablestore.default.t2 ALTER COLUMN c COMMENT 'yet another comment about c'");
+ spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1 COMMENT 'a nested type'");
+ spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c2 COMMENT 'a bigint type'");
spark.sql(
- "ALTER TABLE table_store.default.t2 ALTER COLUMN c.c1.c11 COMMENT 'a double type'");
+ "ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1.c11 COMMENT 'a double type'");
spark.sql(
- "ALTER TABLE table_store.default.t2 ALTER COLUMN c.c1.c12 COMMENT 'a boolean array'");
+ "ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1.c12 COMMENT 'a boolean array'");
assertThat(getField(schema2(), 2).description()).isEqualTo("yet another comment about c");
assertThat(getNestedField(getField(schema2(), 2), 0).description())
@@ -377,11 +374,11 @@ public class SparkReadITCase {
@Test
public void testCreateTableWithNullablePk() {
- spark.sql("USE table_store");
+ spark.sql("USE tablestore");
spark.sql(
"CREATE TABLE default.PkTable (\n"
+ "a BIGINT,\n"
- + "b STRING) USING table_store\n"
+ + "b STRING) USING tablestore\n"
+ "COMMENT 'table comment'\n"
+ "TBLPROPERTIES ('primary-key' = 'a')");
Path tablePath = new Path(warehousePath, "default.db/PkTable");
@@ -391,14 +388,14 @@ public class SparkReadITCase {
@Test
public void testCreateTableWithInvalidPk() {
- spark.sql("USE table_store");
+ spark.sql("USE tablestore");
assertThatThrownBy(
() ->
spark.sql(
"CREATE TABLE default.PartitionedPkTable (\n"
+ "a BIGINT,\n"
+ "b STRING,\n"
- + "c DOUBLE) USING table_store\n"
+ + "c DOUBLE) USING tablestore\n"
+ "COMMENT 'table comment'\n"
+ "PARTITIONED BY (b)"
+ "TBLPROPERTIES ('primary-key' = 'a')"))
@@ -409,14 +406,14 @@ public class SparkReadITCase {
@Test
public void testCreateTableWithNonexistentPk() {
- spark.sql("USE table_store");
+ spark.sql("USE tablestore");
assertThatThrownBy(
() ->
spark.sql(
"CREATE TABLE default.PartitionedPkTable (\n"
+ "a BIGINT,\n"
+ "b STRING,\n"
- + "c DOUBLE) USING table_store\n"
+ + "c DOUBLE) USING tablestore\n"
+ "COMMENT 'table comment'\n"
+ "PARTITIONED BY (b)"
+ "TBLPROPERTIES ('primary-key' = 'd')"))
@@ -427,14 +424,14 @@ public class SparkReadITCase {
@Test
public void testCreateTableWithNonexistentPartition() {
- spark.sql("USE table_store");
+ spark.sql("USE tablestore");
assertThatThrownBy(
() ->
spark.sql(
"CREATE TABLE default.PartitionedPkTable (\n"
+ "a BIGINT,\n"
+ "b STRING,\n"
- + "c DOUBLE) USING table_store\n"
+ + "c DOUBLE) USING tablestore\n"
+ "COMMENT 'table comment'\n"
+ "PARTITIONED BY (d)"
+ "TBLPROPERTIES ('primary-key' = 'a')"))
@@ -454,7 +451,7 @@ public class SparkReadITCase {
private void innerTest(String tableName, boolean hasPk, boolean partitioned, boolean appendOnly)
throws Exception {
- spark.sql("USE table_store");
+ spark.sql("USE tablestore");
String ddlTemplate =
"CREATE TABLE default.%s (\n"
+ "order_id BIGINT NOT NULL comment 'order_id',\n"
@@ -462,7 +459,7 @@ public class SparkReadITCase {
+ "coupon_info ARRAY<STRING> NOT NULL COMMENT 'coupon_info',\n"
+ "order_amount DOUBLE NOT NULL COMMENT 'order_amount',\n"
+ "dt STRING NOT NULL COMMENT 'dt',\n"
- + "hh STRING NOT NULL COMMENT 'hh') USING table_store\n"
+ + "hh STRING NOT NULL COMMENT 'hh') USING tablestore\n"
+ "COMMENT 'table comment'\n"
+ "%s\n"
+ "TBLPROPERTIES (%s)";
@@ -590,8 +587,7 @@ public class SparkReadITCase {
BinaryStringData.fromString("12")));
testHelper.commit();
- Dataset<Row> dataset =
- spark.read().format("tablestore").option("path", tablePath.toString()).load();
+ Dataset<Row> dataset = spark.read().format("tablestore").load(tablePath.toString());
assertThat(dataset.select("order_id", "buyer_id", "dt").collectAsList().toString())
.isEqualTo("[[1,10,2022-07-20]]");
assertThat(dataset.select("coupon_info").collectAsList().toString())
@@ -601,19 +597,19 @@ public class SparkReadITCase {
assertThat(
spark.sql(
String.format(
- "SHOW TABLES IN table_store.default LIKE '%s'",
+ "SHOW TABLES IN tablestore.default LIKE '%s'",
tableName))
.select("namespace", "tableName")
.collectAsList()
.toString())
.isEqualTo(String.format("[[default,%s]]", tableName));
- spark.sql(String.format("DROP TABLE table_store.default.%s", tableName));
+ spark.sql(String.format("DROP TABLE tablestore.default.%s", tableName));
assertThat(
spark.sql(
String.format(
- "SHOW TABLES IN table_store.default LIKE '%s'",
+ "SHOW TABLES IN tablestore.default LIKE '%s'",
tableName))
.select("namespace", "tableName")
.collectAsList()
@@ -626,7 +622,7 @@ public class SparkReadITCase {
@Test
public void testCreateAndDropNamespace() {
// create namespace
- spark.sql("USE table_store");
+ spark.sql("USE tablestore");
spark.sql("CREATE NAMESPACE bar");
assertThatThrownBy(() -> spark.sql("CREATE NAMESPACE bar"))