You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/22 04:08:56 UTC

[flink-table-store] branch release-0.2 updated: [FLINK-28535] Support create/drop namespace/table for SparkCatalog

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.2 by this push:
     new 24fd3da3 [FLINK-28535] Support create/drop namespace/table for SparkCatalog
24fd3da3 is described below

commit 24fd3da3b94ba4271dbf9de77103c295ac0414c3
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Fri Jul 22 12:07:45 2022 +0800

    [FLINK-28535] Support create/drop namespace/table for SparkCatalog
    
    This closes #230
---
 docs/content/docs/development/create-table.md      |  17 +-
 docs/content/docs/engines/spark.md                 |  93 +++++++++-
 .../flink/table/store/spark/SparkCatalog.java      | 189 +++++++++++++++------
 .../table/store/spark/SimpleTableTestHelper.java   |  14 +-
 .../flink/table/store/spark/SparkReadITCase.java   | 143 ++++++++++++++++
 5 files changed, 380 insertions(+), 76 deletions(-)

diff --git a/docs/content/docs/development/create-table.md b/docs/content/docs/development/create-table.md
index 73385a41..baee2568 100644
--- a/docs/content/docs/development/create-table.md
+++ b/docs/content/docs/development/create-table.md
@@ -75,12 +75,9 @@ CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
 ```
 
 {{< hint info >}}
-__Note:__ To ensure the uniqueness of the primary key, the
-primary key must contain the partition field.
-{{< /hint >}}
-
-{{< hint info >}}
-__Note:__ Metadata column is not supported yet.
+__Note:__ 
+- To ensure the uniqueness of the primary key, the primary key must contain the partition field.
+- Metadata column is not supported yet.
 {{< /hint >}}
 
 This will create a directory under `${warehouse}/${database_name}.db/${table_name}`.
@@ -191,10 +188,11 @@ CREATE TABLE MyTable (
 );
 ```
 
+{{< hint info >}}
 __Note:__
 - If users do not specify the bucket key explicitly, the primary key (if present) or the whole row is used as bucket key.
 - Bucket key cannot be changed once the table is created. `ALTER TALBE SET ('bucket-key' = ...)` or `ALTER TABLE RESET ('bucket-key')` will throw exception.
-
+{{< /hint >}}
 
 
 The number of buckets is very important as it determines the
@@ -270,11 +268,12 @@ For example, the inputs:
 Output:
 - <1, 25.2, 10, 'This is a book'>
 
+{{< hint info >}}
 __Note:__
 - Partial update is only supported for table with primary key.
 - Partial update is not supported for streaming consuming.
 - It is best not to have NULL values in the fields, NULL will not overwrite data.
-
+{{< /hint >}}
 
 ## Append-only Table
 
@@ -295,10 +294,12 @@ CREATE TABLE IF NOT EXISTS T1 (
     'bucket' = '1' --specify the total number of buckets
 )
 ```
+{{< hint info >}}
 __Note:__
 - By definition, users cannot define primary keys on an append-only table.
 - Append-only table is different from a change-log table which does not define primary keys. 
   For the latter, updating or deleting the whole row is accepted, although no primary key is present.
+{{< /hint >}}
 
 ### Query Append-only Table
 
diff --git a/docs/content/docs/engines/spark.md b/docs/content/docs/engines/spark.md
index 83531c1d..a2bac76c 100644
--- a/docs/content/docs/engines/spark.md
+++ b/docs/content/docs/engines/spark.md
@@ -88,22 +88,99 @@ OPTIONS (
 SELECT * FROM table_store.default.myTable;
 ```
 
-## DDL
+## DDL Statements
 
-`ALTER TABLE ... SET TBLPROPERTIES`
+### Create Table
 ```sql
-ALTER TABLE table_store.default.myTable SET TBLPROPERTIES (
+CREATE TABLE [IF NOT EXISTS] table_identifier 
+[ ( col_name1[:] col_type1 [ COMMENT col_comment1 ], ... ) ]
+[ USING table_store ]    
+[ COMMENT table_comment ]
+[ PARTITIONED BY ( col_name1, col_name2, ... ) ]
+[ TBLPROPERTIES ( key1=val1, key2=val2, ... ) ]       
+```
+For example, create an order table with `order_id` as primary key and partitioned by `dt, hh`.
+```sql
+CREATE TABLE table_store.default.OrderTable (
+    order_id BIGINT NOT NULL comment 'biz order id',
+    buyer_id BIGINT NOT NULL COMMENT 'buyer id',
+    coupon_info ARRAY<STRING> NOT NULL COMMENT 'coupon info',
+    order_amount DOUBLE NOT NULL COMMENT 'order amount',
+    dt STRING NOT NULL COMMENT 'yyyy-MM-dd',
+    hh STRING NOT NULL COMMENT 'HH'
+) COMMENT 'my table'
+PARTITIONED BY (dt, hh)
+TBLPROPERTIES ('foo' = 'bar', 'primary-key' = 'order_id,dt,hh')
+```
+{{< hint info >}}
+__Note:__
+- Primary key feature is supported via table properties, and composite primary key is delimited with comma.
+- Partition fields should be predefined, complex partition such like `PARTITIONED BY ( col_name2[:] col_type2 [ COMMENT col_comment2 ], ... )` is not supported.
+- For Spark 3.0, `CREATE TABLE USING table_store` is required.
+{{< /hint >}}
+
+### Alter Table
+```sql
+ALTER TABLE table_identifier   
+SET TBLPROPERTIES ( key1=val1 ) 
+    | RESET TBLPROPERTIES (key2)
+    | ADD COLUMNS ( col_name col_type [ , ... ] )
+    | { ALTER | CHNAGE } COLUMN col_name { DROP NOT NULL | COMMENT 'new_comment'}
+```
+
+- Change/add table properties
+```sql
+ALTER TABLE table_store.default.OrderTable SET TBLPROPERTIES (
     'write-buffer-size'='256 MB'
 )
 ```
 
-`ALTER TABLE ... UNSET TBLPROPERTIES`
+- Remove a table property
 ```sql
-ALTER TABLE table_store.default.myTable UNSET TBLPROPERTIES ('write-buffer-size')
+ALTER TABLE table_store.default.OrderTable UNSET TBLPROPERTIES ('write-buffer-size')
 ```
 
-`ALTER TABLE ... ADD COLUMN`
+- Add a new column
 ```sql
-ALTER TABLE table_store.default.myTable
-ADD COLUMNS (new_column STRING)
+ALTER TABLE table_store.default.OrderTable ADD COLUMNS (buy_count INT)
 ```
+
+- Change column nullability
+```sql
+ALTER TABLE table_store.default.OrderTable ALTER COLUMN coupon_info DROP NOT NULL
+```
+
+- Change column comment
+```sql
+ALTER TABLE table_store.default.OrderTable ALTER COLUMN buy_count COMMENT 'buy count'
+```
+
+{{< hint info >}}
+__Note:__
+- Spark does not support changing nullable column to nonnull column.
+{{< /hint >}}
+
+### Drop Table
+
+```sql
+DROP TABLE table_store.default.OrderTable
+```
+{{< hint warning >}}
+__Attention__: Drop a table will delete both metadata and files on the disk.
+{{< /hint >}}
+
+### Create Namespace
+
+```sql
+CREATE NAMESPACE [IF NOT EXISTS] table_store.bar
+```
+
+### Drop Namespace
+
+```sql
+DROP NAMESPACE table_store.bar
+```
+
+{{< hint warning >}}
+__Attention__: Drop a namespace will delete all table's metadata and files under this namespace on the disk.
+{{< /hint >}}
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index 5afedea4..76e49a88 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -23,10 +23,14 @@ import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.store.file.catalog.Catalog;
 import org.apache.flink.table.store.file.catalog.CatalogFactory;
 import org.apache.flink.table.store.file.schema.SchemaChange;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.NamespaceChange;
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
@@ -39,12 +43,15 @@ import org.apache.spark.sql.connector.catalog.TableChange.SetProperty;
 import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnComment;
 import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnNullability;
 import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnType;
+import org.apache.spark.sql.connector.expressions.FieldReference;
+import org.apache.spark.sql.connector.expressions.NamedReference;
 import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -54,6 +61,8 @@ import static org.apache.flink.table.store.spark.SparkTypeUtils.toFlinkType;
 /** Spark {@link TableCatalog} for table store. */
 public class SparkCatalog implements TableCatalog, SupportsNamespaces {
 
+    private static final String PRIMARY_KEY_IDENTIFIER = "primary-key";
+
     private String name = null;
     private Catalog catalog = null;
 
@@ -68,6 +77,20 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
         return name;
     }
 
+    @Override
+    public void createNamespace(String[] namespace, Map<String, String> metadata)
+            throws NamespaceAlreadyExistsException {
+        Preconditions.checkArgument(
+                isValidateNamespace(namespace),
+                "Namespace %s is not valid",
+                Arrays.toString(namespace));
+        try {
+            catalog.createDatabase(namespace[0], false);
+        } catch (Catalog.DatabaseAlreadyExistException e) {
+            throw new NamespaceAlreadyExistsException(namespace);
+        }
+    }
+
     @Override
     public String[][] listNamespaces() {
         List<String> databases = catalog.listDatabases();
@@ -86,12 +109,67 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
         if (!isValidateNamespace(namespace)) {
             throw new NoSuchNamespaceException(namespace);
         }
-        return new String[0][];
+        if (catalog.databaseExists(namespace[0])) {
+            return new String[0][];
+        }
+        throw new NoSuchNamespaceException(namespace);
     }
 
     @Override
-    public Map<String, String> loadNamespaceMetadata(String[] namespace) {
-        return Collections.emptyMap();
+    public Map<String, String> loadNamespaceMetadata(String[] namespace)
+            throws NoSuchNamespaceException {
+        Preconditions.checkArgument(
+                isValidateNamespace(namespace),
+                "Namespace %s is not valid",
+                Arrays.toString(namespace));
+        if (catalog.databaseExists(namespace[0])) {
+            return Collections.emptyMap();
+        }
+        throw new NoSuchNamespaceException(namespace);
+    }
+
+    /**
+     * Drop a namespace from the catalog, recursively dropping all objects within the namespace.
+     * This interface implementation only supports the Spark 3.0, 3.1 and 3.2.
+     *
+     * <p>If the catalog implementation does not support this operation, it may throw {@link
+     * UnsupportedOperationException}.
+     *
+     * @param namespace a multi-part namespace
+     * @return true if the namespace was dropped
+     * @throws UnsupportedOperationException If drop is not a supported operation
+     */
+    public boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException {
+        return dropNamespace(namespace, false);
+    }
+
+    /**
+     * Drop a namespace from the catalog with cascade mode, recursively dropping all objects within
+     * the namespace if cascade is true. This interface implementation supports the Spark 3.3+.
+     *
+     * <p>If the catalog implementation does not support this operation, it may throw {@link
+     * UnsupportedOperationException}.
+     *
+     * @param namespace a multi-part namespace
+     * @param cascade When true, deletes all objects under the namespace
+     * @return true if the namespace was dropped
+     * @throws UnsupportedOperationException If drop is not a supported operation
+     */
+    public boolean dropNamespace(String[] namespace, boolean cascade)
+            throws NoSuchNamespaceException {
+        Preconditions.checkArgument(
+                isValidateNamespace(namespace),
+                "Namespace %s is not valid",
+                Arrays.toString(namespace));
+        try {
+            catalog.dropDatabase(namespace[0], false, cascade);
+            return true;
+        } catch (Catalog.DatabaseNotExistException e) {
+            throw new NoSuchNamespaceException(namespace);
+        } catch (Catalog.DatabaseNotEmptyException e) {
+            throw new UnsupportedOperationException(
+                    String.format("Namespace %s is not empty", Arrays.toString(namespace)));
+        }
     }
 
     @Override
@@ -100,7 +178,6 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
                 isValidateNamespace(namespace),
                 "Missing database in namespace: %s",
                 Arrays.toString(namespace));
-
         try {
             return catalog.listTables(namespace[0]).stream()
                     .map(table -> Identifier.of(namespace, table))
@@ -131,6 +208,36 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
         }
     }
 
+    @Override
+    public Table createTable(
+            Identifier ident,
+            StructType schema,
+            Transform[] partitions,
+            Map<String, String> properties)
+            throws TableAlreadyExistsException, NoSuchNamespaceException {
+        try {
+            catalog.createTable(
+                    objectPath(ident), toUpdateSchema(schema, partitions, properties), false);
+            return loadTable(ident);
+        } catch (Catalog.TableAlreadyExistException e) {
+            throw new TableAlreadyExistsException(ident);
+        } catch (Catalog.DatabaseNotExistException e) {
+            throw new NoSuchNamespaceException(ident.namespace());
+        } catch (NoSuchTableException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public boolean dropTable(Identifier ident) {
+        try {
+            catalog.dropTable(objectPath(ident), false);
+            return true;
+        } catch (Catalog.TableNotExistException | NoSuchTableException e) {
+            return false;
+        }
+    }
+
     private SchemaChange toSchemaChange(TableChange change) {
         if (change instanceof SetProperty) {
             SetProperty set = (SetProperty) change;
@@ -162,6 +269,30 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
         }
     }
 
+    private UpdateSchema toUpdateSchema(
+            StructType schema, Transform[] partitions, Map<String, String> properties) {
+        Preconditions.checkArgument(
+                Arrays.stream(partitions)
+                        .allMatch(
+                                partition -> {
+                                    NamedReference[] references = partition.references();
+                                    return references.length == 1
+                                            && references[0] instanceof FieldReference;
+                                }));
+        Map<String, String> normalizedProperties = new HashMap<>(properties);
+        normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER);
+        return new UpdateSchema(
+                (RowType) toFlinkType(schema),
+                Arrays.stream(partitions)
+                        .map(partition -> partition.references()[0].describe())
+                        .collect(Collectors.toList()),
+                Arrays.stream(properties.getOrDefault(PRIMARY_KEY_IDENTIFIER, "").split(","))
+                        .map(String::trim)
+                        .collect(Collectors.toList()),
+                normalizedProperties,
+                properties.getOrDefault(TableCatalog.PROP_COMMENT, ""));
+    }
+
     private void validateAlterNestedField(String[] fieldNames) {
         if (fieldNames.length > 1) {
             throw new UnsupportedOperationException(
@@ -183,61 +314,11 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
 
     // --------------------- unsupported methods ----------------------------
 
-    @Override
-    public void createNamespace(String[] namespace, Map<String, String> metadata) {
-        throw new UnsupportedOperationException("Create namespace in Spark is not supported yet.");
-    }
-
     @Override
     public void alterNamespace(String[] namespace, NamespaceChange... changes) {
         throw new UnsupportedOperationException("Alter namespace in Spark is not supported yet.");
     }
 
-    /**
-     * Drop a namespace from the catalog, recursively dropping all objects within the namespace.
-     * This interface implementation only supports the Spark 3.0, 3.1 and 3.2.
-     *
-     * <p>If the catalog implementation does not support this operation, it may throw {@link
-     * UnsupportedOperationException}.
-     *
-     * @param namespace a multi-part namespace
-     * @return true if the namespace was dropped
-     * @throws UnsupportedOperationException If drop is not a supported operation
-     */
-    public boolean dropNamespace(String[] namespace) {
-        return dropNamespace(namespace, true);
-    }
-
-    /**
-     * Drop a namespace from the catalog with cascade mode, recursively dropping all objects within
-     * the namespace if cascade is true. This interface implementation supports the Spark 3.3+.
-     *
-     * <p>If the catalog implementation does not support this operation, it may throw {@link
-     * UnsupportedOperationException}.
-     *
-     * @param namespace a multi-part namespace
-     * @param cascade When true, deletes all objects under the namespace
-     * @return true if the namespace was dropped
-     * @throws UnsupportedOperationException If drop is not a supported operation
-     */
-    public boolean dropNamespace(String[] namespace, boolean cascade) {
-        throw new UnsupportedOperationException("Drop namespace in Spark is not supported yet.");
-    }
-
-    @Override
-    public Table createTable(
-            Identifier ident,
-            StructType schema,
-            Transform[] partitions,
-            Map<String, String> properties) {
-        throw new UnsupportedOperationException("Create table in Spark is not supported yet.");
-    }
-
-    @Override
-    public boolean dropTable(Identifier ident) {
-        throw new UnsupportedOperationException("Drop table in Spark is not supported yet.");
-    }
-
     @Override
     public void renameTable(Identifier oldIdent, Identifier newIdent) {
         throw new UnsupportedOperationException();
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
index 705a961a..f31eeaea 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.types.logical.RowType;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
@@ -42,17 +43,18 @@ public class SimpleTableTestHelper {
     private final TableCommit commit;
 
     public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
+        this(path, rowType, Collections.emptyList(), Collections.emptyList());
+    }
+
+    public SimpleTableTestHelper(
+            Path path, RowType rowType, List<String> partitionKeys, List<String> primaryKeys)
+            throws Exception {
         Map<String, String> options = new HashMap<>();
         // orc is shaded, can not find shaded classes in ide
         options.put(CoreOptions.FILE_FORMAT.key(), "avro");
         new SchemaManager(path)
                 .commitNewVersion(
-                        new UpdateSchema(
-                                rowType,
-                                Collections.emptyList(),
-                                Collections.emptyList(),
-                                options,
-                                ""));
+                        new UpdateSchema(rowType, partitionKeys, primaryKeys, options, ""));
         Configuration conf = Configuration.fromMap(options);
         conf.setString("path", path.toString());
         FileStoreTable table = FileStoreTableFactory.create(conf);
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index cfc15e1e..7c24b427 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -22,6 +22,9 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.store.file.schema.ArrayDataType;
+import org.apache.flink.table.store.file.schema.AtomicDataType;
 import org.apache.flink.table.store.file.schema.DataField;
 import org.apache.flink.table.store.file.schema.RowDataType;
 import org.apache.flink.table.store.file.schema.TableSchema;
@@ -39,6 +42,9 @@ import org.apache.commons.io.FileUtils;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -50,6 +56,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** ITCase for spark reader. */
 public class SparkReadITCase {
@@ -337,6 +344,142 @@ public class SparkReadITCase {
                 .isEqualTo("a boolean array");
     }
 
+    @Test
+    public void testCreateAndDropTable() throws Exception {
+        spark.sql("USE table_store");
+        String ddl =
+                "CREATE TABLE default.MyTable (\n"
+                        + "order_id BIGINT NOT NULL comment 'biz order id',\n"
+                        + "buyer_id BIGINT NOT NULL COMMENT 'buyer id',\n"
+                        + "coupon_info ARRAY<STRING> NOT NULL COMMENT 'coupon info',\n"
+                        + "order_amount DOUBLE NOT NULL COMMENT 'order amount',\n"
+                        + "dt STRING NOT NULL COMMENT 'yyyy-MM-dd',\n"
+                        + "hh STRING NOT NULL COMMENT 'HH') USING table_store\n"
+                        + "COMMENT 'my table'\n"
+                        + "PARTITIONED BY (dt, hh)\n"
+                        + "TBLPROPERTIES ('foo' = 'bar', 'primary-key' = 'order_id,dt,hh')";
+        spark.sql(ddl);
+        assertThatThrownBy(() -> spark.sql(ddl))
+                .isInstanceOf(TableAlreadyExistsException.class)
+                .hasMessageContaining("Table default.MyTable already exists");
+        assertThatThrownBy(() -> spark.sql(ddl.replace("default", "foo")))
+                .isInstanceOf(NoSuchNamespaceException.class)
+                .hasMessageContaining("Namespace 'foo' not found");
+
+        Path tablePath = new Path(warehousePath, "default.db/MyTable");
+        TableSchema schema = FileStoreTableFactory.create(tablePath).schema();
+        assertThat(schema.fields())
+                .containsExactly(
+                        new DataField(
+                                0,
+                                "order_id",
+                                new AtomicDataType(new BigIntType(false)),
+                                "biz order id"),
+                        new DataField(
+                                1,
+                                "buyer_id",
+                                new AtomicDataType(new BigIntType(false)),
+                                "buyer id"),
+                        new DataField(
+                                2,
+                                "coupon_info",
+                                new ArrayDataType(
+                                        false,
+                                        new AtomicDataType(
+                                                new VarCharType(true, VarCharType.MAX_LENGTH))),
+                                "coupon info"),
+                        new DataField(
+                                3,
+                                "order_amount",
+                                new AtomicDataType(new DoubleType(false)),
+                                "order amount"),
+                        new DataField(
+                                4,
+                                "dt",
+                                new AtomicDataType(new VarCharType(false, VarCharType.MAX_LENGTH)),
+                                "yyyy-MM-dd"),
+                        new DataField(
+                                5,
+                                "hh",
+                                new AtomicDataType(new VarCharType(false, VarCharType.MAX_LENGTH)),
+                                "HH"));
+        assertThat(schema.options()).containsEntry("foo", "bar");
+        assertThat(schema.options()).doesNotContainKey("primary-key");
+        assertThat(schema.primaryKeys()).containsExactly("order_id", "dt", "hh");
+        assertThat(schema.trimmedPrimaryKeys()).containsOnly("order_id");
+        assertThat(schema.partitionKeys()).containsExactly("dt", "hh");
+        assertThat(schema.comment()).isEqualTo("my table");
+
+        SimpleTableTestHelper testHelper =
+                new SimpleTableTestHelper(
+                        tablePath,
+                        schema.logicalRowType(),
+                        Arrays.asList("dt", "hh"),
+                        Arrays.asList("order_id", "dt", "hh"));
+        testHelper.write(
+                GenericRowData.of(
+                        1L,
+                        10L,
+                        new GenericArrayData(
+                                new BinaryStringData[] {
+                                    BinaryStringData.fromString("loyalty_discount"),
+                                    BinaryStringData.fromString("shipping_discount")
+                                }),
+                        199.0d,
+                        BinaryStringData.fromString("2022-07-20"),
+                        BinaryStringData.fromString("12")));
+        testHelper.commit();
+
+        Dataset<Row> dataset =
+                spark.read().format("tablestore").option("path", tablePath.toString()).load();
+        assertThat(dataset.select("order_id", "buyer_id", "dt").collectAsList().toString())
+                .isEqualTo("[[1,10,2022-07-20]]");
+        assertThat(dataset.select("coupon_info").collectAsList().toString())
+                .isEqualTo("[[WrappedArray(loyalty_discount, shipping_discount)]]");
+
+        // test drop table
+        assertThat(
+                        spark.sql("SHOW TABLES IN table_store.default LIKE 'MyTable'")
+                                .select("namespace", "tableName")
+                                .collectAsList()
+                                .toString())
+                .isEqualTo("[[default,MyTable]]");
+
+        spark.sql("DROP TABLE table_store.default.MyTable");
+
+        assertThat(
+                        spark.sql("SHOW TABLES IN table_store.default LIKE 'MyTable'")
+                                .select("namespace", "tableName")
+                                .collectAsList()
+                                .toString())
+                .isEqualTo("[]");
+
+        assertThat(new File(tablePath.toUri())).doesNotExist();
+    }
+
+    @Test
+    public void testCreateAndDropNamespace() {
+        // create namespace
+        spark.sql("USE table_store");
+        spark.sql("CREATE NAMESPACE bar");
+
+        assertThatThrownBy(() -> spark.sql("CREATE NAMESPACE bar"))
+                .isInstanceOf(NamespaceAlreadyExistsException.class)
+                .hasMessageContaining("Namespace 'bar' already exists");
+
+        assertThat(spark.sql("SHOW NAMESPACES").collectAsList().toString())
+                .isEqualTo("[[bar], [default]]");
+
+        Path nsPath = new Path(warehousePath, "bar.db");
+        assertThat(new File(nsPath.toUri())).exists();
+
+        // drop namespace
+        spark.sql("DROP NAMESPACE bar");
+        assertThat(spark.sql("SHOW NAMESPACES").collectAsList().toString())
+                .isEqualTo("[[default]]");
+        assertThat(new File(nsPath.toUri())).doesNotExist();
+    }
+
     private TableSchema schema1() {
         return FileStoreTableFactory.create(tablePath1).schema();
     }