You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/22 04:08:56 UTC
[flink-table-store] branch release-0.2 updated: [FLINK-28535] Support create/drop namespace/table for SparkCatalog
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.2 by this push:
new 24fd3da3 [FLINK-28535] Support create/drop namespace/table for SparkCatalog
24fd3da3 is described below
commit 24fd3da3b94ba4271dbf9de77103c295ac0414c3
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Fri Jul 22 12:07:45 2022 +0800
[FLINK-28535] Support create/drop namespace/table for SparkCatalog
This closes #230
---
docs/content/docs/development/create-table.md | 17 +-
docs/content/docs/engines/spark.md | 93 +++++++++-
.../flink/table/store/spark/SparkCatalog.java | 189 +++++++++++++++------
.../table/store/spark/SimpleTableTestHelper.java | 14 +-
.../flink/table/store/spark/SparkReadITCase.java | 143 ++++++++++++++++
5 files changed, 380 insertions(+), 76 deletions(-)
diff --git a/docs/content/docs/development/create-table.md b/docs/content/docs/development/create-table.md
index 73385a41..baee2568 100644
--- a/docs/content/docs/development/create-table.md
+++ b/docs/content/docs/development/create-table.md
@@ -75,12 +75,9 @@ CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
```
{{< hint info >}}
-__Note:__ To ensure the uniqueness of the primary key, the
-primary key must contain the partition field.
-{{< /hint >}}
-
-{{< hint info >}}
-__Note:__ Metadata column is not supported yet.
+__Note:__
+- To ensure the uniqueness of the primary key, the primary key must contain the partition field.
+- Metadata column is not supported yet.
{{< /hint >}}
This will create a directory under `${warehouse}/${database_name}.db/${table_name}`.
@@ -191,10 +188,11 @@ CREATE TABLE MyTable (
);
```
+{{< hint info >}}
__Note:__
- If users do not specify the bucket key explicitly, the primary key (if present) or the whole row is used as bucket key.
- Bucket key cannot be changed once the table is created. `ALTER TALBE SET ('bucket-key' = ...)` or `ALTER TABLE RESET ('bucket-key')` will throw exception.
-
+{{< /hint >}}
The number of buckets is very important as it determines the
@@ -270,11 +268,12 @@ For example, the inputs:
Output:
- <1, 25.2, 10, 'This is a book'>
+{{< hint info >}}
__Note:__
- Partial update is only supported for table with primary key.
- Partial update is not supported for streaming consuming.
- It is best not to have NULL values in the fields, NULL will not overwrite data.
-
+{{< /hint >}}
## Append-only Table
@@ -295,10 +294,12 @@ CREATE TABLE IF NOT EXISTS T1 (
'bucket' = '1' --specify the total number of buckets
)
```
+{{< hint info >}}
__Note:__
- By definition, users cannot define primary keys on an append-only table.
- Append-only table is different from a change-log table which does not define primary keys.
For the latter, updating or deleting the whole row is accepted, although no primary key is present.
+{{< /hint >}}
### Query Append-only Table
diff --git a/docs/content/docs/engines/spark.md b/docs/content/docs/engines/spark.md
index 83531c1d..a2bac76c 100644
--- a/docs/content/docs/engines/spark.md
+++ b/docs/content/docs/engines/spark.md
@@ -88,22 +88,99 @@ OPTIONS (
SELECT * FROM table_store.default.myTable;
```
-## DDL
+## DDL Statements
-`ALTER TABLE ... SET TBLPROPERTIES`
+### Create Table
```sql
-ALTER TABLE table_store.default.myTable SET TBLPROPERTIES (
+CREATE TABLE [IF NOT EXISTS] table_identifier
+[ ( col_name1[:] col_type1 [ COMMENT col_comment1 ], ... ) ]
+[ USING table_store ]
+[ COMMENT table_comment ]
+[ PARTITIONED BY ( col_name1, col_name2, ... ) ]
+[ TBLPROPERTIES ( key1=val1, key2=val2, ... ) ]
+```
+For example, create an order table with `order_id` as primary key and partitioned by `dt, hh`.
+```sql
+CREATE TABLE table_store.default.OrderTable (
+ order_id BIGINT NOT NULL comment 'biz order id',
+ buyer_id BIGINT NOT NULL COMMENT 'buyer id',
+ coupon_info ARRAY<STRING> NOT NULL COMMENT 'coupon info',
+ order_amount DOUBLE NOT NULL COMMENT 'order amount',
+ dt STRING NOT NULL COMMENT 'yyyy-MM-dd',
+ hh STRING NOT NULL COMMENT 'HH'
+) COMMENT 'my table'
+PARTITIONED BY (dt, hh)
+TBLPROPERTIES ('foo' = 'bar', 'primary-key' = 'order_id,dt,hh')
+```
+{{< hint info >}}
+__Note:__
+- Primary key feature is supported via table properties, and composite primary key is delimited with comma.
+- Partition fields should be predefined, complex partition such like `PARTITIONED BY ( col_name2[:] col_type2 [ COMMENT col_comment2 ], ... )` is not supported.
+- For Spark 3.0, `CREATE TABLE USING table_store` is required.
+{{< /hint >}}
+
+### Alter Table
+```sql
+ALTER TABLE table_identifier
+SET TBLPROPERTIES ( key1=val1 )
+ | RESET TBLPROPERTIES (key2)
+ | ADD COLUMNS ( col_name col_type [ , ... ] )
+ | { ALTER | CHNAGE } COLUMN col_name { DROP NOT NULL | COMMENT 'new_comment'}
+```
+
+- Change/add table properties
+```sql
+ALTER TABLE table_store.default.OrderTable SET TBLPROPERTIES (
'write-buffer-size'='256 MB'
)
```
-`ALTER TABLE ... UNSET TBLPROPERTIES`
+- Remove a table property
```sql
-ALTER TABLE table_store.default.myTable UNSET TBLPROPERTIES ('write-buffer-size')
+ALTER TABLE table_store.default.OrderTable UNSET TBLPROPERTIES ('write-buffer-size')
```
-`ALTER TABLE ... ADD COLUMN`
+- Add a new column
```sql
-ALTER TABLE table_store.default.myTable
-ADD COLUMNS (new_column STRING)
+ALTER TABLE table_store.default.OrderTable ADD COLUMNS (buy_count INT)
```
+
+- Change column nullability
+```sql
+ALTER TABLE table_store.default.OrderTable ALTER COLUMN coupon_info DROP NOT NULL
+```
+
+- Change column comment
+```sql
+ALTER TABLE table_store.default.OrderTable ALTER COLUMN buy_count COMMENT 'buy count'
+```
+
+{{< hint info >}}
+__Note:__
+- Spark does not support changing nullable column to nonnull column.
+{{< /hint >}}
+
+### Drop Table
+
+```sql
+DROP TABLE table_store.default.OrderTable
+```
+{{< hint warning >}}
+__Attention__: Drop a table will delete both metadata and files on the disk.
+{{< /hint >}}
+
+### Create Namespace
+
+```sql
+CREATE NAMESPACE [IF NOT EXISTS] table_store.bar
+```
+
+### Drop Namespace
+
+```sql
+DROP NAMESPACE table_store.bar
+```
+
+{{< hint warning >}}
+__Attention__: Drop a namespace will delete all table's metadata and files under this namespace on the disk.
+{{< /hint >}}
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index 5afedea4..76e49a88 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -23,10 +23,14 @@ import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.store.file.catalog.Catalog;
import org.apache.flink.table.store.file.catalog.CatalogFactory;
import org.apache.flink.table.store.file.schema.SchemaChange;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
+import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.NamespaceChange;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
@@ -39,12 +43,15 @@ import org.apache.spark.sql.connector.catalog.TableChange.SetProperty;
import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnComment;
import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnNullability;
import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnType;
+import org.apache.spark.sql.connector.expressions.FieldReference;
+import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -54,6 +61,8 @@ import static org.apache.flink.table.store.spark.SparkTypeUtils.toFlinkType;
/** Spark {@link TableCatalog} for table store. */
public class SparkCatalog implements TableCatalog, SupportsNamespaces {
+ private static final String PRIMARY_KEY_IDENTIFIER = "primary-key";
+
private String name = null;
private Catalog catalog = null;
@@ -68,6 +77,20 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
return name;
}
+ @Override
+ public void createNamespace(String[] namespace, Map<String, String> metadata)
+ throws NamespaceAlreadyExistsException {
+ Preconditions.checkArgument(
+ isValidateNamespace(namespace),
+ "Namespace %s is not valid",
+ Arrays.toString(namespace));
+ try {
+ catalog.createDatabase(namespace[0], false);
+ } catch (Catalog.DatabaseAlreadyExistException e) {
+ throw new NamespaceAlreadyExistsException(namespace);
+ }
+ }
+
@Override
public String[][] listNamespaces() {
List<String> databases = catalog.listDatabases();
@@ -86,12 +109,67 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
if (!isValidateNamespace(namespace)) {
throw new NoSuchNamespaceException(namespace);
}
- return new String[0][];
+ if (catalog.databaseExists(namespace[0])) {
+ return new String[0][];
+ }
+ throw new NoSuchNamespaceException(namespace);
}
@Override
- public Map<String, String> loadNamespaceMetadata(String[] namespace) {
- return Collections.emptyMap();
+ public Map<String, String> loadNamespaceMetadata(String[] namespace)
+ throws NoSuchNamespaceException {
+ Preconditions.checkArgument(
+ isValidateNamespace(namespace),
+ "Namespace %s is not valid",
+ Arrays.toString(namespace));
+ if (catalog.databaseExists(namespace[0])) {
+ return Collections.emptyMap();
+ }
+ throw new NoSuchNamespaceException(namespace);
+ }
+
+ /**
+ * Drop a namespace from the catalog, recursively dropping all objects within the namespace.
+ * This interface implementation only supports the Spark 3.0, 3.1 and 3.2.
+ *
+ * <p>If the catalog implementation does not support this operation, it may throw {@link
+ * UnsupportedOperationException}.
+ *
+ * @param namespace a multi-part namespace
+ * @return true if the namespace was dropped
+ * @throws UnsupportedOperationException If drop is not a supported operation
+ */
+ public boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException {
+ return dropNamespace(namespace, false);
+ }
+
+ /**
+ * Drop a namespace from the catalog with cascade mode, recursively dropping all objects within
+ * the namespace if cascade is true. This interface implementation supports the Spark 3.3+.
+ *
+ * <p>If the catalog implementation does not support this operation, it may throw {@link
+ * UnsupportedOperationException}.
+ *
+ * @param namespace a multi-part namespace
+ * @param cascade When true, deletes all objects under the namespace
+ * @return true if the namespace was dropped
+ * @throws UnsupportedOperationException If drop is not a supported operation
+ */
+ public boolean dropNamespace(String[] namespace, boolean cascade)
+ throws NoSuchNamespaceException {
+ Preconditions.checkArgument(
+ isValidateNamespace(namespace),
+ "Namespace %s is not valid",
+ Arrays.toString(namespace));
+ try {
+ catalog.dropDatabase(namespace[0], false, cascade);
+ return true;
+ } catch (Catalog.DatabaseNotExistException e) {
+ throw new NoSuchNamespaceException(namespace);
+ } catch (Catalog.DatabaseNotEmptyException e) {
+ throw new UnsupportedOperationException(
+ String.format("Namespace %s is not empty", Arrays.toString(namespace)));
+ }
}
@Override
@@ -100,7 +178,6 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
isValidateNamespace(namespace),
"Missing database in namespace: %s",
Arrays.toString(namespace));
-
try {
return catalog.listTables(namespace[0]).stream()
.map(table -> Identifier.of(namespace, table))
@@ -131,6 +208,36 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
}
}
+ @Override
+ public Table createTable(
+ Identifier ident,
+ StructType schema,
+ Transform[] partitions,
+ Map<String, String> properties)
+ throws TableAlreadyExistsException, NoSuchNamespaceException {
+ try {
+ catalog.createTable(
+ objectPath(ident), toUpdateSchema(schema, partitions, properties), false);
+ return loadTable(ident);
+ } catch (Catalog.TableAlreadyExistException e) {
+ throw new TableAlreadyExistsException(ident);
+ } catch (Catalog.DatabaseNotExistException e) {
+ throw new NoSuchNamespaceException(ident.namespace());
+ } catch (NoSuchTableException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean dropTable(Identifier ident) {
+ try {
+ catalog.dropTable(objectPath(ident), false);
+ return true;
+ } catch (Catalog.TableNotExistException | NoSuchTableException e) {
+ return false;
+ }
+ }
+
private SchemaChange toSchemaChange(TableChange change) {
if (change instanceof SetProperty) {
SetProperty set = (SetProperty) change;
@@ -162,6 +269,30 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
}
}
+ private UpdateSchema toUpdateSchema(
+ StructType schema, Transform[] partitions, Map<String, String> properties) {
+ Preconditions.checkArgument(
+ Arrays.stream(partitions)
+ .allMatch(
+ partition -> {
+ NamedReference[] references = partition.references();
+ return references.length == 1
+ && references[0] instanceof FieldReference;
+ }));
+ Map<String, String> normalizedProperties = new HashMap<>(properties);
+ normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER);
+ return new UpdateSchema(
+ (RowType) toFlinkType(schema),
+ Arrays.stream(partitions)
+ .map(partition -> partition.references()[0].describe())
+ .collect(Collectors.toList()),
+ Arrays.stream(properties.getOrDefault(PRIMARY_KEY_IDENTIFIER, "").split(","))
+ .map(String::trim)
+ .collect(Collectors.toList()),
+ normalizedProperties,
+ properties.getOrDefault(TableCatalog.PROP_COMMENT, ""));
+ }
+
private void validateAlterNestedField(String[] fieldNames) {
if (fieldNames.length > 1) {
throw new UnsupportedOperationException(
@@ -183,61 +314,11 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
// --------------------- unsupported methods ----------------------------
- @Override
- public void createNamespace(String[] namespace, Map<String, String> metadata) {
- throw new UnsupportedOperationException("Create namespace in Spark is not supported yet.");
- }
-
@Override
public void alterNamespace(String[] namespace, NamespaceChange... changes) {
throw new UnsupportedOperationException("Alter namespace in Spark is not supported yet.");
}
- /**
- * Drop a namespace from the catalog, recursively dropping all objects within the namespace.
- * This interface implementation only supports the Spark 3.0, 3.1 and 3.2.
- *
- * <p>If the catalog implementation does not support this operation, it may throw {@link
- * UnsupportedOperationException}.
- *
- * @param namespace a multi-part namespace
- * @return true if the namespace was dropped
- * @throws UnsupportedOperationException If drop is not a supported operation
- */
- public boolean dropNamespace(String[] namespace) {
- return dropNamespace(namespace, true);
- }
-
- /**
- * Drop a namespace from the catalog with cascade mode, recursively dropping all objects within
- * the namespace if cascade is true. This interface implementation supports the Spark 3.3+.
- *
- * <p>If the catalog implementation does not support this operation, it may throw {@link
- * UnsupportedOperationException}.
- *
- * @param namespace a multi-part namespace
- * @param cascade When true, deletes all objects under the namespace
- * @return true if the namespace was dropped
- * @throws UnsupportedOperationException If drop is not a supported operation
- */
- public boolean dropNamespace(String[] namespace, boolean cascade) {
- throw new UnsupportedOperationException("Drop namespace in Spark is not supported yet.");
- }
-
- @Override
- public Table createTable(
- Identifier ident,
- StructType schema,
- Transform[] partitions,
- Map<String, String> properties) {
- throw new UnsupportedOperationException("Create table in Spark is not supported yet.");
- }
-
- @Override
- public boolean dropTable(Identifier ident) {
- throw new UnsupportedOperationException("Drop table in Spark is not supported yet.");
- }
-
@Override
public void renameTable(Identifier oldIdent, Identifier newIdent) {
throw new UnsupportedOperationException();
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
index 705a961a..f31eeaea 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.types.logical.RowType;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -42,17 +43,18 @@ public class SimpleTableTestHelper {
private final TableCommit commit;
public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
+ this(path, rowType, Collections.emptyList(), Collections.emptyList());
+ }
+
+ public SimpleTableTestHelper(
+ Path path, RowType rowType, List<String> partitionKeys, List<String> primaryKeys)
+ throws Exception {
Map<String, String> options = new HashMap<>();
// orc is shaded, can not find shaded classes in ide
options.put(CoreOptions.FILE_FORMAT.key(), "avro");
new SchemaManager(path)
.commitNewVersion(
- new UpdateSchema(
- rowType,
- Collections.emptyList(),
- Collections.emptyList(),
- options,
- ""));
+ new UpdateSchema(rowType, partitionKeys, primaryKeys, options, ""));
Configuration conf = Configuration.fromMap(options);
conf.setString("path", path.toString());
FileStoreTable table = FileStoreTableFactory.create(conf);
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index cfc15e1e..7c24b427 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -22,6 +22,9 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.store.file.schema.ArrayDataType;
+import org.apache.flink.table.store.file.schema.AtomicDataType;
import org.apache.flink.table.store.file.schema.DataField;
import org.apache.flink.table.store.file.schema.RowDataType;
import org.apache.flink.table.store.file.schema.TableSchema;
@@ -39,6 +42,9 @@ import org.apache.commons.io.FileUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -50,6 +56,7 @@ import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** ITCase for spark reader. */
public class SparkReadITCase {
@@ -337,6 +344,142 @@ public class SparkReadITCase {
.isEqualTo("a boolean array");
}
+ @Test
+ public void testCreateAndDropTable() throws Exception {
+ spark.sql("USE table_store");
+ String ddl =
+ "CREATE TABLE default.MyTable (\n"
+ + "order_id BIGINT NOT NULL comment 'biz order id',\n"
+ + "buyer_id BIGINT NOT NULL COMMENT 'buyer id',\n"
+ + "coupon_info ARRAY<STRING> NOT NULL COMMENT 'coupon info',\n"
+ + "order_amount DOUBLE NOT NULL COMMENT 'order amount',\n"
+ + "dt STRING NOT NULL COMMENT 'yyyy-MM-dd',\n"
+ + "hh STRING NOT NULL COMMENT 'HH') USING table_store\n"
+ + "COMMENT 'my table'\n"
+ + "PARTITIONED BY (dt, hh)\n"
+ + "TBLPROPERTIES ('foo' = 'bar', 'primary-key' = 'order_id,dt,hh')";
+ spark.sql(ddl);
+ assertThatThrownBy(() -> spark.sql(ddl))
+ .isInstanceOf(TableAlreadyExistsException.class)
+ .hasMessageContaining("Table default.MyTable already exists");
+ assertThatThrownBy(() -> spark.sql(ddl.replace("default", "foo")))
+ .isInstanceOf(NoSuchNamespaceException.class)
+ .hasMessageContaining("Namespace 'foo' not found");
+
+ Path tablePath = new Path(warehousePath, "default.db/MyTable");
+ TableSchema schema = FileStoreTableFactory.create(tablePath).schema();
+ assertThat(schema.fields())
+ .containsExactly(
+ new DataField(
+ 0,
+ "order_id",
+ new AtomicDataType(new BigIntType(false)),
+ "biz order id"),
+ new DataField(
+ 1,
+ "buyer_id",
+ new AtomicDataType(new BigIntType(false)),
+ "buyer id"),
+ new DataField(
+ 2,
+ "coupon_info",
+ new ArrayDataType(
+ false,
+ new AtomicDataType(
+ new VarCharType(true, VarCharType.MAX_LENGTH))),
+ "coupon info"),
+ new DataField(
+ 3,
+ "order_amount",
+ new AtomicDataType(new DoubleType(false)),
+ "order amount"),
+ new DataField(
+ 4,
+ "dt",
+ new AtomicDataType(new VarCharType(false, VarCharType.MAX_LENGTH)),
+ "yyyy-MM-dd"),
+ new DataField(
+ 5,
+ "hh",
+ new AtomicDataType(new VarCharType(false, VarCharType.MAX_LENGTH)),
+ "HH"));
+ assertThat(schema.options()).containsEntry("foo", "bar");
+ assertThat(schema.options()).doesNotContainKey("primary-key");
+ assertThat(schema.primaryKeys()).containsExactly("order_id", "dt", "hh");
+ assertThat(schema.trimmedPrimaryKeys()).containsOnly("order_id");
+ assertThat(schema.partitionKeys()).containsExactly("dt", "hh");
+ assertThat(schema.comment()).isEqualTo("my table");
+
+ SimpleTableTestHelper testHelper =
+ new SimpleTableTestHelper(
+ tablePath,
+ schema.logicalRowType(),
+ Arrays.asList("dt", "hh"),
+ Arrays.asList("order_id", "dt", "hh"));
+ testHelper.write(
+ GenericRowData.of(
+ 1L,
+ 10L,
+ new GenericArrayData(
+ new BinaryStringData[] {
+ BinaryStringData.fromString("loyalty_discount"),
+ BinaryStringData.fromString("shipping_discount")
+ }),
+ 199.0d,
+ BinaryStringData.fromString("2022-07-20"),
+ BinaryStringData.fromString("12")));
+ testHelper.commit();
+
+ Dataset<Row> dataset =
+ spark.read().format("tablestore").option("path", tablePath.toString()).load();
+ assertThat(dataset.select("order_id", "buyer_id", "dt").collectAsList().toString())
+ .isEqualTo("[[1,10,2022-07-20]]");
+ assertThat(dataset.select("coupon_info").collectAsList().toString())
+ .isEqualTo("[[WrappedArray(loyalty_discount, shipping_discount)]]");
+
+ // test drop table
+ assertThat(
+ spark.sql("SHOW TABLES IN table_store.default LIKE 'MyTable'")
+ .select("namespace", "tableName")
+ .collectAsList()
+ .toString())
+ .isEqualTo("[[default,MyTable]]");
+
+ spark.sql("DROP TABLE table_store.default.MyTable");
+
+ assertThat(
+ spark.sql("SHOW TABLES IN table_store.default LIKE 'MyTable'")
+ .select("namespace", "tableName")
+ .collectAsList()
+ .toString())
+ .isEqualTo("[]");
+
+ assertThat(new File(tablePath.toUri())).doesNotExist();
+ }
+
+ @Test
+ public void testCreateAndDropNamespace() {
+ // create namespace
+ spark.sql("USE table_store");
+ spark.sql("CREATE NAMESPACE bar");
+
+ assertThatThrownBy(() -> spark.sql("CREATE NAMESPACE bar"))
+ .isInstanceOf(NamespaceAlreadyExistsException.class)
+ .hasMessageContaining("Namespace 'bar' already exists");
+
+ assertThat(spark.sql("SHOW NAMESPACES").collectAsList().toString())
+ .isEqualTo("[[bar], [default]]");
+
+ Path nsPath = new Path(warehousePath, "bar.db");
+ assertThat(new File(nsPath.toUri())).exists();
+
+ // drop namespace
+ spark.sql("DROP NAMESPACE bar");
+ assertThat(spark.sql("SHOW NAMESPACES").collectAsList().toString())
+ .isEqualTo("[[default]]");
+ assertThat(new File(nsPath.toUri())).doesNotExist();
+ }
+
private TableSchema schema1() {
return FileStoreTableFactory.create(tablePath1).schema();
}