You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/01 07:45:09 UTC

[flink-table-store] branch master updated: [FLINK-28339] Introduce SparkCatalog in table store

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new e303ec7a [FLINK-28339] Introduce SparkCatalog in table store
e303ec7a is described below

commit e303ec7a913601389dc323d9dc719f9c937f6ee2
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Fri Jul 1 15:45:04 2022 +0800

    [FLINK-28339] Introduce SparkCatalog in table store
    
    This closes #185
---
 .../flink/table/store/connector/FlinkCatalog.java  |   2 +-
 .../table/store/connector/FlinkCatalogFactory.java |  51 +------
 .../flink/table/store/file/catalog/Catalog.java    |  16 +-
 .../table/store/file/catalog/CatalogFactory.java   |  55 +++++++
 .../store/file/catalog/FileSystemCatalog.java      |   2 +-
 .../table/store/table/FileStoreTableFactory.java   |  23 ++-
 .../apache/flink/table/store/hive/HiveCatalog.java |   2 +-
 .../flink/table/store/spark/SparkCatalog.java      | 161 +++++++++++++++++++++
 .../flink/table/store/spark/SparkReadITCase.java   |  97 ++++++++-----
 9 files changed, 311 insertions(+), 98 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
index 56840aa3..6077e57d 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
@@ -140,7 +140,7 @@ public class FlinkCatalog extends AbstractCatalog {
             throws TableNotExistException, CatalogException {
         TableSchema schema;
         try {
-            schema = catalog.getTable(tablePath);
+            schema = catalog.getTableSchema(tablePath);
         } catch (Catalog.TableNotExistException e) {
             throw new TableNotExistException(getName(), e.tablePath());
         }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
index 6021260f..8ff157da 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
@@ -23,30 +23,15 @@ import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.store.file.catalog.CatalogFactory;
-import org.apache.flink.table.store.file.catalog.FileSystemCatalogFactory;
-import org.apache.flink.util.Preconditions;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
-import java.util.ServiceLoader;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 /** Factory for {@link FlinkCatalog}. */
 public class FlinkCatalogFactory implements org.apache.flink.table.factories.CatalogFactory {
 
     public static final String IDENTIFIER = "table-store";
 
-    public static final ConfigOption<String> METASTORE =
-            ConfigOptions.key("metastore")
-                    .stringType()
-                    .defaultValue(FileSystemCatalogFactory.IDENTIFIER);
-    private static final ConfigOption<String> WAREHOUSE =
-            ConfigOptions.key("warehouse")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("The warehouse root path of catalog.");
     public static final ConfigOption<String> DEFAULT_DATABASE =
             ConfigOptions.key("default-database").stringType().defaultValue("default");
 
@@ -74,41 +59,7 @@ public class FlinkCatalogFactory implements org.apache.flink.table.factories.Cat
     }
 
     public static FlinkCatalog createCatalog(String catalogName, ReadableConfig options) {
-        // manual validation
-        // because different catalog types may have different options
-        // we can't list them all in the optionalOptions() method
-        String warehouse =
-                Preconditions.checkNotNull(
-                        options.get(WAREHOUSE),
-                        "Table store '" + WAREHOUSE.key() + "' path must be set");
-
-        String metastore = options.get(METASTORE);
-        List<CatalogFactory> factories = new ArrayList<>();
-        ServiceLoader.load(CatalogFactory.class, Thread.currentThread().getContextClassLoader())
-                .iterator()
-                .forEachRemaining(
-                        f -> {
-                            if (f.identifier().equals(metastore)) {
-                                factories.add(f);
-                            }
-                        });
-        if (factories.size() != 1) {
-            throw new RuntimeException(
-                    "Found "
-                            + factories.size()
-                            + " classes implementing "
-                            + CatalogFactory.class.getName()
-                            + " with metastore "
-                            + metastore
-                            + ". They are:\n"
-                            + factories.stream()
-                                    .map(t -> t.getClass().getName())
-                                    .collect(Collectors.joining("\n")));
-        }
-
         return new FlinkCatalog(
-                factories.get(0).create(warehouse, options),
-                catalogName,
-                options.get(DEFAULT_DATABASE));
+                CatalogFactory.createCatalog(options), catalogName, options.get(DEFAULT_DATABASE));
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
index 65c099f4..a7db7694 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
@@ -22,6 +22,8 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
 
 import java.util.List;
 
@@ -92,10 +94,22 @@ public interface Catalog extends AutoCloseable {
      * Return a {@link TableSchema} identified by the given {@link ObjectPath}.
      *
      * @param tablePath Path of the table
+     * @return The requested table schema
+     * @throws TableNotExistException if the target does not exist
+     */
+    TableSchema getTableSchema(ObjectPath tablePath) throws TableNotExistException;
+
+    /**
+     * Return a {@link FileStoreTable} identified by the given {@link ObjectPath}.
+     *
+     * @param tablePath Path of the table
      * @return The requested table
      * @throws TableNotExistException if the target does not exist
      */
-    TableSchema getTable(ObjectPath tablePath) throws TableNotExistException;
+    default FileStoreTable getTable(ObjectPath tablePath) throws TableNotExistException {
+        TableSchema tableSchema = getTableSchema(tablePath);
+        return FileStoreTableFactory.create(getTableLocation(tablePath), tableSchema);
+    }
 
     /**
      * Check if a table exists in this catalog.
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
index 5e93a3ed..403c8be0 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
@@ -18,7 +18,15 @@
 
 package org.apache.flink.table.store.file.catalog;
 
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
 
 /** Factory to create {@link Catalog}. Each factory should have a unique identifier. */
 public interface CatalogFactory {
@@ -26,4 +34,51 @@ public interface CatalogFactory {
     String identifier();
 
     Catalog create(String warehouse, ReadableConfig options);
+
+    ConfigOption<String> METASTORE =
+            ConfigOptions.key("metastore")
+                    .stringType()
+                    .defaultValue(FileSystemCatalogFactory.IDENTIFIER);
+
+    ConfigOption<String> WAREHOUSE =
+            ConfigOptions.key("warehouse")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The warehouse root path of catalog.");
+
+    static Catalog createCatalog(ReadableConfig options) {
+        // manual validation
+        // because different catalog types may have different options
+        // we can't list them all in the optionalOptions() method
+        String warehouse =
+                Preconditions.checkNotNull(
+                        options.get(WAREHOUSE),
+                        "Table store '" + WAREHOUSE.key() + "' path must be set");
+
+        String metastore = options.get(METASTORE);
+        List<CatalogFactory> factories = new ArrayList<>();
+        ServiceLoader.load(CatalogFactory.class, Thread.currentThread().getContextClassLoader())
+                .iterator()
+                .forEachRemaining(
+                        f -> {
+                            if (f.identifier().equals(metastore)) {
+                                factories.add(f);
+                            }
+                        });
+        if (factories.size() != 1) {
+            throw new RuntimeException(
+                    "Found "
+                            + factories.size()
+                            + " classes implementing "
+                            + CatalogFactory.class.getName()
+                            + " with metastore "
+                            + metastore
+                            + ". They are:\n"
+                            + factories.stream()
+                                    .map(t -> t.getClass().getName())
+                                    .collect(Collectors.joining("\n")));
+        }
+
+        return factories.get(0).create(warehouse, options);
+    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
index 451ad8f5..6c16366a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
@@ -106,7 +106,7 @@ public class FileSystemCatalog extends AbstractCatalog {
     }
 
     @Override
-    public TableSchema getTable(ObjectPath tablePath) throws TableNotExistException {
+    public TableSchema getTableSchema(ObjectPath tablePath) throws TableNotExistException {
         Path path = getTableLocation(tablePath);
         return new SchemaManager(path)
                 .latest()
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
index e94c38ff..82a1a039 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
@@ -25,8 +25,6 @@ import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.UUID;
 
 import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
@@ -46,9 +44,8 @@ public class FileStoreTableFactory {
 
     public static FileStoreTable create(Configuration conf, String user) {
         Path tablePath = FileStoreOptions.path(conf);
-        SchemaManager schemaManager = new SchemaManager(tablePath);
         TableSchema tableSchema =
-                schemaManager
+                new SchemaManager(tablePath)
                         .latest()
                         .orElseThrow(
                                 () ->
@@ -56,13 +53,23 @@ public class FileStoreTableFactory {
                                                 "Schema file not found in location "
                                                         + tablePath
                                                         + ". Please create table first."));
+        return create(tablePath, tableSchema, conf, user);
+    }
 
+    public static FileStoreTable create(Path tablePath, TableSchema tableSchema) {
+        return create(tablePath, tableSchema, new Configuration(), UUID.randomUUID().toString());
+    }
+
+    public static FileStoreTable create(
+            Path tablePath, TableSchema tableSchema, Configuration dynamicOptions, String user) {
         // merge dynamic options into schema.options
-        Map<String, String> newOptions = new HashMap<>(tableSchema.options());
-        newOptions.putAll(conf.toMap());
-        tableSchema = tableSchema.copy(newOptions);
+        Configuration newOptions = Configuration.fromMap(tableSchema.options());
+        dynamicOptions.toMap().forEach(newOptions::setString);
+        newOptions.set(PATH, tablePath.toString());
+        tableSchema = tableSchema.copy(newOptions.toMap());
 
-        if (conf.get(FileStoreOptions.WRITE_MODE) == WriteMode.APPEND_ONLY) {
+        SchemaManager schemaManager = new SchemaManager(tablePath);
+        if (newOptions.get(FileStoreOptions.WRITE_MODE) == WriteMode.APPEND_ONLY) {
             return new AppendOnlyFileStoreTable(tablePath, schemaManager, tableSchema, user);
         } else {
             if (tableSchema.primaryKeys().isEmpty()) {
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
index 50a3f635..1ad4327d 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
@@ -149,7 +149,7 @@ public class HiveCatalog extends AbstractCatalog {
     }
 
     @Override
-    public TableSchema getTable(ObjectPath tablePath) throws TableNotExistException {
+    public TableSchema getTableSchema(ObjectPath tablePath) throws TableNotExistException {
         if (isTableStoreTableNotExisted(tablePath)) {
             throw new TableNotExistException(tablePath);
         }
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
new file mode 100644
index 00000000..ae8e2f51
--- /dev/null
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.store.file.catalog.Catalog;
+import org.apache.flink.table.store.file.catalog.CatalogFactory;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.NamespaceChange;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/** Spark {@link TableCatalog} for table store. */
+public class SparkCatalog implements TableCatalog, SupportsNamespaces {
+
+    private String name = null;
+    private Catalog catalog = null;
+
+    @Override
+    public void initialize(String name, CaseInsensitiveStringMap options) {
+        this.name = name;
+        this.catalog = CatalogFactory.createCatalog(Configuration.fromMap(options));
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public String[][] listNamespaces() {
+        List<String> databases = catalog.listDatabases();
+        String[][] namespaces = new String[databases.size()][];
+        for (int i = 0; i < databases.size(); i++) {
+            namespaces[i] = new String[] {databases.get(i)};
+        }
+        return namespaces;
+    }
+
+    @Override
+    public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException {
+        if (namespace.length == 0) {
+            return listNamespaces();
+        }
+        if (!isValidateNamespace(namespace)) {
+            throw new NoSuchNamespaceException(namespace);
+        }
+        return new String[0][];
+    }
+
+    @Override
+    public Map<String, String> loadNamespaceMetadata(String[] namespace) {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException {
+        Preconditions.checkArgument(
+                isValidateNamespace(namespace),
+                "Missing database in namespace: %s",
+                Arrays.toString(namespace));
+
+        try {
+            return catalog.listTables(namespace[0]).stream()
+                    .map(table -> Identifier.of(namespace, table))
+                    .toArray(Identifier[]::new);
+        } catch (Catalog.DatabaseNotExistException e) {
+            throw new NoSuchNamespaceException(namespace);
+        }
+    }
+
+    @Override
+    public Table loadTable(Identifier ident) throws NoSuchTableException {
+        if (!isValidateNamespace(ident.namespace())) {
+            throw new NoSuchTableException(ident);
+        }
+
+        try {
+            return new SparkTable(
+                    catalog.getTable(new ObjectPath(ident.namespace()[0], ident.name())));
+        } catch (Catalog.TableNotExistException e) {
+            throw new NoSuchTableException(ident);
+        }
+    }
+
+    @Override
+    public void renameTable(Identifier oldIdent, Identifier newIdent) {
+        throw new UnsupportedOperationException();
+    }
+
+    private boolean isValidateNamespace(String[] namespace) {
+        return namespace.length == 1;
+    }
+
+    // --------------------- unsupported methods ----------------------------
+
+    @Override
+    public void createNamespace(String[] namespace, Map<String, String> metadata) {
+        throw new UnsupportedOperationException("Create namespace in Spark is not supported yet.");
+    }
+
+    @Override
+    public void alterNamespace(String[] namespace, NamespaceChange... changes) {
+        throw new UnsupportedOperationException("Alter namespace in Spark is not supported yet.");
+    }
+
+    @Override
+    public boolean dropNamespace(String[] namespace) {
+        throw new UnsupportedOperationException("Drop namespace in Spark is not supported yet.");
+    }
+
+    @Override
+    public Table createTable(
+            Identifier ident,
+            StructType schema,
+            Transform[] partitions,
+            Map<String, String> properties) {
+        throw new UnsupportedOperationException("Create table in Spark is not supported yet.");
+    }
+
+    @Override
+    public Table alterTable(Identifier ident, TableChange... changes) {
+        throw new UnsupportedOperationException("Alter table in Spark is not supported yet.");
+    }
+
+    @Override
+    public boolean dropTable(Identifier ident) {
+        throw new UnsupportedOperationException("Drop table in Spark is not supported yet.");
+    }
+}
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 08de5ce5..26567405 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -27,15 +27,16 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.types.RowKind;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 
@@ -44,48 +45,84 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** ITCase for spark reader. */
 public class SparkReadITCase {
 
+    private static File warehouse = null;
+
     private static SparkSession spark = null;
 
-    @TempDir java.nio.file.Path tempDir;
+    private static Path tablePath1;
 
-    private Path path;
+    private static Path tablePath2;
 
-    private SimpleTableTestHelper testHelper;
+    @BeforeAll
+    public static void startMetastoreAndSpark() throws Exception {
+        warehouse = File.createTempFile("warehouse", null);
+        assertThat(warehouse.delete()).isTrue();
+        Path warehousePath = new Path("file:" + warehouse);
+        spark = SparkSession.builder().master("local[2]").getOrCreate();
+        spark.conf().set("spark.sql.catalog.table_store", SparkCatalog.class.getName());
+        spark.conf().set("spark.sql.catalog.table_store.warehouse", warehousePath.toString());
+
+        // flink sink
+        tablePath1 = new Path(warehousePath, "default.db/t1");
+        SimpleTableTestHelper testHelper1 = createTestHelper(tablePath1);
+        testHelper1.write(GenericRowData.of(1, 2L, StringData.fromString("1")));
+        testHelper1.write(GenericRowData.of(3, 4L, StringData.fromString("2")));
+        testHelper1.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
+        testHelper1.write(GenericRowData.ofKind(RowKind.DELETE, 3, 4L, StringData.fromString("2")));
+        testHelper1.commit();
+
+        tablePath2 = new Path(warehousePath, "default.db/t2");
+        SimpleTableTestHelper testHelper2 = createTestHelper(tablePath2);
+        testHelper2.write(GenericRowData.of(1, 2L, StringData.fromString("1")));
+        testHelper2.write(GenericRowData.of(3, 4L, StringData.fromString("2")));
+        testHelper2.commit();
+        testHelper2.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
+        testHelper2.write(GenericRowData.of(7, 8L, StringData.fromString("4")));
+        testHelper2.commit();
+    }
 
-    @BeforeEach
-    public void beforeEach() throws Exception {
-        this.path = new Path(tempDir.toUri().toString(), "my_table");
+    private static SimpleTableTestHelper createTestHelper(Path tablePath) throws Exception {
         RowType rowType =
                 new RowType(
                         Arrays.asList(
                                 new RowType.RowField("a", new IntType()),
                                 new RowType.RowField("b", new BigIntType()),
                                 new RowType.RowField("c", new VarCharType())));
-        testHelper = new SimpleTableTestHelper(path, rowType);
-    }
-
-    @BeforeAll
-    public static void startMetastoreAndSpark() {
-        spark = SparkSession.builder().master("local[2]").getOrCreate();
+        return new SimpleTableTestHelper(tablePath, rowType);
     }
 
     @AfterAll
-    public static void stopMetastoreAndSpark() {
+    public static void stopMetastoreAndSpark() throws IOException {
+        if (warehouse != null && warehouse.exists()) {
+            FileUtils.deleteDirectory(warehouse);
+        }
         spark.stop();
         spark = null;
     }
 
     @Test
-    public void testNormal() throws Exception {
-        testHelper.write(GenericRowData.of(1, 2L, StringData.fromString("1")));
-        testHelper.write(GenericRowData.of(3, 4L, StringData.fromString("2")));
-        testHelper.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
-        testHelper.write(GenericRowData.ofKind(RowKind.DELETE, 3, 4L, StringData.fromString("2")));
-        testHelper.commit();
+    public void testNormal() {
+        innerTestNormal(
+                spark.read().format("tablestore").option("path", tablePath1.toString()).load());
+    }
 
-        Dataset<Row> dataset =
-                spark.read().format("tablestore").option("path", path.toString()).load();
+    @Test
+    public void testFilterPushDown() {
+        innerTestFilterPushDown(
+                spark.read().format("tablestore").option("path", tablePath2.toString()).load());
+    }
 
+    @Test
+    public void testCatalogNormal() {
+        innerTestNormal(spark.table("table_store.default.t1"));
+    }
+
+    @Test
+    public void testCatalogFilterPushDown() {
+        innerTestFilterPushDown(spark.table("table_store.default.t2"));
+    }
+
+    private void innerTestNormal(Dataset<Row> dataset) {
         List<Row> results = dataset.collectAsList();
         assertThat(results.toString()).isEqualTo("[[1,2,1], [5,6,3]]");
 
@@ -96,19 +133,7 @@ public class SparkReadITCase {
         assertThat(results.toString()).isEqualTo("[[8]]");
     }
 
-    @Test
-    public void testFilterPushDown() throws Exception {
-        testHelper.write(GenericRowData.of(1, 2L, StringData.fromString("1")));
-        testHelper.write(GenericRowData.of(3, 4L, StringData.fromString("2")));
-        testHelper.commit();
-
-        testHelper.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
-        testHelper.write(GenericRowData.of(7, 8L, StringData.fromString("4")));
-        testHelper.commit();
-
-        Dataset<Row> dataset =
-                spark.read().format("tablestore").option("path", path.toString()).load();
-
+    private void innerTestFilterPushDown(Dataset<Row> dataset) {
         List<Row> results = dataset.filter("a < 4").select("a", "c").collectAsList();
         assertThat(results.toString()).isEqualTo("[[1,1], [3,2]]");
     }