You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/24 09:00:48 UTC

[flink-table-store] branch master updated: [FLINK-28219] Refactor TableStoreCatalog: Introduce a dedicated Catalog for table store

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new bdcf482  [FLINK-28219] Refactor TableStoreCatalog: Introduce a dedicated Catalog for table store
bdcf482 is described below

commit bdcf482531ac33b77476b7af651ef216e53525de
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Fri Jun 24 17:00:43 2022 +0800

    [FLINK-28219] Refactor TableStoreCatalog: Introduce a dedicated Catalog for table store
    
    This closes #173
---
 .../flink/table/store/connector/FlinkCatalog.java  | 190 ++++++++-
 .../table/store/connector/FlinkCatalogFactory.java |  22 +-
 .../connector/TableStoreConnectorFactory.java      |   2 +-
 .../org.apache.flink.table.factories.Factory       |   1 +
 .../store/connector/FileSystemCatalogITCase.java   |   3 +-
 .../table/store/connector/FlinkCatalogTest.java    | 411 ++++++++++++++++++++
 flink-table-store-core/pom.xml                     |  48 ---
 .../flink/table/store/file/catalog/Catalog.java    | 249 ++++++++++++
 .../store/file/catalog/FileSystemCatalog.java      | 177 ++-------
 .../org.apache.flink.table.factories.Factory       |  16 -
 .../store/file/catalog/FileSystemCatalogTest.java  |  40 --
 .../store/file/catalog/TableStoreCatalogTest.java  | 425 ---------------------
 12 files changed, 908 insertions(+), 676 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/TableStoreCatalog.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
similarity index 50%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/TableStoreCatalog.java
rename to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
index 847c1b9..232015e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/TableStoreCatalog.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
@@ -16,29 +16,208 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file.catalog;
+package org.apache.flink.table.store.connector;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.store.file.catalog.Catalog;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.flink.table.store.connector.FlinkCatalogFactory.IDENTIFIER;
+import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
 
 /** Catalog for table store. */
-public abstract class TableStoreCatalog extends AbstractCatalog {
+public class FlinkCatalog extends AbstractCatalog {
+
+    private final Catalog catalog;
 
-    public TableStoreCatalog(String name, String defaultDatabase) {
+    public FlinkCatalog(Catalog catalog, String name, String defaultDatabase) {
         super(name, defaultDatabase);
+        this.catalog = catalog;
+        try {
+            this.catalog.createDatabase(defaultDatabase, true);
+        } catch (Catalog.DatabaseAlreadyExistException ignore) {
+        }
+    }
+
+    @VisibleForTesting
+    Catalog catalog() {
+        return catalog;
+    }
+
+    @Override
+    public Optional<Factory> getFactory() {
+        return Optional.of(
+                FactoryUtil.discoverFactory(classLoader(), DynamicTableFactory.class, IDENTIFIER));
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        return catalog.listDatabases();
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException {
+        return catalog.databaseExists(databaseName);
+    }
+
+    @Override
+    public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        if (database != null) {
+            if (database.getProperties().size() > 0) {
+                throw new UnsupportedOperationException(
+                        "Create database with properties is unsupported.");
+            }
+
+            if (database.getDescription().isPresent()
+                    && !database.getDescription().get().equals("")) {
+                throw new UnsupportedOperationException(
+                        "Create database with description is unsupported.");
+            }
+        }
+
+        try {
+            catalog.createDatabase(name, ignoreIfExists);
+        } catch (Catalog.DatabaseAlreadyExistException e) {
+            throw new DatabaseAlreadyExistException(getName(), e.database());
+        }
+    }
+
+    @Override
+    public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
+            throws DatabaseNotEmptyException, DatabaseNotExistException, CatalogException {
+        try {
+            catalog.dropDatabase(name, ignoreIfNotExists, cascade);
+        } catch (Catalog.DatabaseNotEmptyException e) {
+            throw new DatabaseNotEmptyException(getName(), e.database());
+        } catch (Catalog.DatabaseNotExistException e) {
+            throw new DatabaseNotExistException(getName(), e.database());
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        try {
+            return catalog.listTables(databaseName);
+        } catch (Catalog.DatabaseNotExistException e) {
+            throw new DatabaseNotExistException(getName(), e.database());
+        }
+    }
+
+    @Override
+    public CatalogBaseTable getTable(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        TableSchema schema;
+        try {
+            schema = catalog.getTable(tablePath);
+        } catch (Catalog.TableNotExistException e) {
+            throw new TableNotExistException(getName(), e.tablePath());
+        }
+
+        CatalogTable table = schema.toUpdateSchema().toCatalogTable();
+        // add path to source and sink
+        table.getOptions().put(PATH.key(), catalog.getTableLocation(tablePath).toString());
+        return table;
+    }
+
+    @Override
+    public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+        return catalog.tableExists(tablePath);
+    }
+
+    @Override
+    public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        try {
+            catalog.dropTable(tablePath, ignoreIfNotExists);
+        } catch (Catalog.TableNotExistException e) {
+            throw new TableNotExistException(getName(), e.tablePath());
+        }
+    }
+
+    @Override
+    public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
+            throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+        try {
+            catalog.createTable(tablePath, convertTableToSchema(tablePath, table), ignoreIfExists);
+        } catch (Catalog.TableAlreadyExistException e) {
+            throw new TableAlreadyExistException(getName(), e.tablePath());
+        } catch (Catalog.DatabaseNotExistException e) {
+            throw new DatabaseNotExistException(getName(), e.database());
+        }
+    }
+
+    @Override
+    public void alterTable(
+            ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        try {
+            catalog.alterTable(
+                    tablePath, convertTableToSchema(tablePath, newTable), ignoreIfNotExists);
+        } catch (Catalog.TableNotExistException e) {
+            throw new TableNotExistException(getName(), e.tablePath());
+        }
+    }
+
+    private static ClassLoader classLoader() {
+        return FlinkCatalog.class.getClassLoader();
+    }
+
+    private UpdateSchema convertTableToSchema(ObjectPath tablePath, CatalogBaseTable baseTable) {
+        if (!(baseTable instanceof CatalogTable)) {
+            throw new UnsupportedOperationException(
+                    "Only support CatalogTable, but is: " + baseTable.getClass());
+        }
+        CatalogTable table = (CatalogTable) baseTable;
+        Map<String, String> options = table.getOptions();
+        if (options.containsKey(CONNECTOR.key())) {
+            throw new CatalogException(
+                    "Table Store Catalog only supports table store tables, not Flink connector: "
+                            + options.get(CONNECTOR.key()));
+        }
+
+        // remove table path
+        String specific = options.remove(PATH.key());
+        if (specific != null) {
+            if (!catalog.getTableLocation(tablePath).equals(new Path(specific))) {
+                throw new IllegalArgumentException(
+                        "Illegal table path in table options: " + specific);
+            }
+            table = table.copy(options);
+        }
+
+        return UpdateSchema.fromCatalogTable(table);
     }
 
     // --------------------- unsupported methods ----------------------------
@@ -49,6 +228,11 @@ public abstract class TableStoreCatalog extends AbstractCatalog {
     @Override
     public final void close() throws CatalogException {}
 
+    @Override
+    public CatalogDatabase getDatabase(String databaseName) throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public final void alterDatabase(
             String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
similarity index 75%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogFactory.java
rename to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
index c14d0d0..6937e88 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file.catalog;
+package org.apache.flink.table.store.connector;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
@@ -24,14 +24,15 @@ import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.store.file.catalog.Catalog;
 
 import java.util.HashSet;
 import java.util.Set;
 
 import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
 
-/** Factory for {@link TableStoreCatalog}. */
-public class TableStoreCatalogFactory implements CatalogFactory {
+/** Factory for {@link FlinkCatalog}. */
+public class FlinkCatalogFactory implements CatalogFactory {
 
     public static final String IDENTIFIER = "table-store";
 
@@ -65,12 +66,21 @@ public class TableStoreCatalogFactory implements CatalogFactory {
     }
 
     @Override
-    public TableStoreCatalog createCatalog(Context context) {
+    public FlinkCatalog createCatalog(Context context) {
         FactoryUtil.CatalogFactoryHelper helper =
                 FactoryUtil.createCatalogFactoryHelper(this, context);
         helper.validate();
         ReadableConfig options = helper.getOptions();
-        return new FileSystemCatalog(
-                context.getName(), new Path(options.get(WAREHOUSE)), options.get(DEFAULT_DATABASE));
+        return createCatalog(
+                new Path(options.get(WAREHOUSE)), context.getName(), options.get(DEFAULT_DATABASE));
+    }
+
+    public static FlinkCatalog createCatalog(Path warehouse, String catalogName) {
+        return createCatalog(warehouse, catalogName, DEFAULT_DATABASE.defaultValue());
+    }
+
+    public static FlinkCatalog createCatalog(
+            Path warehouse, String catalogName, String defaultDatabase) {
+        return new FlinkCatalog(Catalog.create(warehouse), catalogName, defaultDatabase);
     }
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
index 9c23f21..e5a5a4e 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.store.connector;
 
 import org.apache.flink.table.factories.DynamicTableFactory;
 
-import static org.apache.flink.table.store.file.catalog.TableStoreCatalogFactory.IDENTIFIER;
+import static org.apache.flink.table.store.connector.FlinkCatalogFactory.IDENTIFIER;
 
 /** A table store {@link DynamicTableFactory} to create source and sink. */
 public class TableStoreConnectorFactory extends AbstractTableStoreFactory {
diff --git a/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 2de717c..daec216 100644
--- a/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -15,3 +15,4 @@
 
 org.apache.flink.table.store.connector.TableStoreManagedFactory
 org.apache.flink.table.store.connector.TableStoreConnectorFactory
+org.apache.flink.table.store.connector.FlinkCatalogFactory
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java
index 642d0dd..01e8421 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.store.connector;
 
-import org.apache.flink.table.store.file.catalog.FileSystemCatalog;
 import org.apache.flink.table.store.file.utils.BlockingIterator;
 import org.apache.flink.table.store.kafka.KafkaTableTestBase;
 import org.apache.flink.types.Row;
@@ -32,7 +31,7 @@ import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** ITCase for {@link FileSystemCatalog}. */
+/** ITCase for {@link FlinkCatalog}. */
 public class FileSystemCatalogITCase extends KafkaTableTestBase {
 
     @Before
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java
new file mode 100644
index 0000000..79b8153
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link FlinkCatalog}. */
+public class FlinkCatalogTest {
+
+    @Rule public ExpectedException exception = ExpectedException.none();
+
+    private final ObjectPath path1 = new ObjectPath("db1", "t1");
+    private final ObjectPath path3 = new ObjectPath("db1", "t2");
+    private final ObjectPath nonExistDbPath = ObjectPath.fromString("non.exist");
+    private final ObjectPath nonExistObjectPath = ObjectPath.fromString("db1.nonexist");
+    private Catalog catalog;
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+    @Before
+    public void beforeEach() throws IOException {
+        String path = TEMPORARY_FOLDER.newFolder().toURI().toString();
+        catalog = FlinkCatalogFactory.createCatalog(new Path(path), "test-catalog");
+    }
+
+    private ResolvedSchema createSchema() {
+        return new ResolvedSchema(
+                Arrays.asList(
+                        Column.physical("first", DataTypes.STRING()),
+                        Column.physical("second", DataTypes.INT()),
+                        Column.physical("third", DataTypes.STRING())),
+                Collections.emptyList(),
+                null);
+    }
+
+    private CatalogTable createStreamingTable() {
+        ResolvedSchema resolvedSchema = this.createSchema();
+        CatalogTable origin =
+                CatalogTable.of(
+                        Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
+                        "test comment",
+                        Collections.emptyList(),
+                        this.getStreamingTableProperties());
+        return new ResolvedCatalogTable(origin, resolvedSchema);
+    }
+
+    private List<String> createPartitionKeys() {
+        return Arrays.asList("second", "third");
+    }
+
+    private Map<String, String> getBatchTableProperties() {
+        return new HashMap<String, String>() {
+            {
+                this.put("is_streaming", "false");
+            }
+        };
+    }
+
+    private Map<String, String> getStreamingTableProperties() {
+        return new HashMap<String, String>() {
+            {
+                this.put("is_streaming", "true");
+            }
+        };
+    }
+
+    private CatalogTable createAnotherTable() {
+        // TODO support change schema, modify it to createAnotherSchema
+        ResolvedSchema resolvedSchema = this.createSchema();
+        CatalogTable origin =
+                CatalogTable.of(
+                        Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
+                        "test comment",
+                        Collections.emptyList(),
+                        this.getBatchTableProperties());
+        return new ResolvedCatalogTable(origin, resolvedSchema);
+    }
+
+    private CatalogTable createAnotherPartitionedTable() {
+        // TODO support change schema, modify it to createAnotherSchema
+        ResolvedSchema resolvedSchema = this.createSchema();
+        CatalogTable origin =
+                CatalogTable.of(
+                        Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
+                        "test comment",
+                        this.createPartitionKeys(),
+                        this.getBatchTableProperties());
+        return new ResolvedCatalogTable(origin, resolvedSchema);
+    }
+
+    private CatalogTable createTable() {
+        ResolvedSchema resolvedSchema = this.createSchema();
+        CatalogTable origin =
+                CatalogTable.of(
+                        Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
+                        "test comment",
+                        Collections.emptyList(),
+                        this.getBatchTableProperties());
+        return new ResolvedCatalogTable(origin, resolvedSchema);
+    }
+
+    private CatalogTable createPartitionedTable() {
+        ResolvedSchema resolvedSchema = this.createSchema();
+        CatalogTable origin =
+                CatalogTable.of(
+                        Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
+                        "test comment",
+                        this.createPartitionKeys(),
+                        this.getBatchTableProperties());
+        return new ResolvedCatalogTable(origin, resolvedSchema);
+    }
+
+    @Test
+    public void testAlterTable() throws Exception {
+        catalog.createDatabase(path1.getDatabaseName(), null, false);
+        CatalogTable table = this.createTable();
+        catalog.createTable(this.path1, table, false);
+        checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
+        CatalogTable newTable = this.createAnotherTable();
+        catalog.alterTable(this.path1, newTable, false);
+        Assert.assertNotEquals(table, catalog.getTable(this.path1));
+        checkEquals(path1, newTable, (CatalogTable) catalog.getTable(this.path1));
+        catalog.dropTable(this.path1, false);
+
+        // Not support views
+    }
+
+    @Test
+    public void testListTables() throws Exception {
+        catalog.createDatabase(path1.getDatabaseName(), null, false);
+        catalog.createTable(this.path1, this.createTable(), false);
+        catalog.createTable(this.path3, this.createTable(), false);
+        Assert.assertEquals(2L, catalog.listTables("db1").size());
+
+        // Not support views
+    }
+
+    @Test
+    public void testAlterTable_differentTypedTable() {
+        // TODO support this
+    }
+
+    @Test
+    public void testCreateFlinkTable() {
+        // create a flink table
+        CatalogTable table = createTable();
+        HashMap<String, String> newOptions = new HashMap<>(table.getOptions());
+        newOptions.put("connector", "filesystem");
+        CatalogTable newTable = table.copy(newOptions);
+
+        assertThatThrownBy(() -> catalog.createTable(this.path1, newTable, false))
+                .isInstanceOf(CatalogException.class)
+                .hasMessageContaining(
+                        "Table Store Catalog only supports table store tables, not Flink connector: filesystem");
+    }
+
+    @Test
+    public void testCreateTable_Streaming() throws Exception {
+        catalog.createDatabase(path1.getDatabaseName(), null, false);
+        CatalogTable table = createStreamingTable();
+        catalog.createTable(path1, table, false);
+        checkEquals(path1, table, (CatalogTable) catalog.getTable(path1));
+    }
+
+    @Test
+    public void testAlterPartitionedTable() throws Exception {
+        catalog.createDatabase(path1.getDatabaseName(), null, false);
+        CatalogTable table = this.createPartitionedTable();
+        catalog.createTable(this.path1, table, false);
+        checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
+        CatalogTable newTable = this.createAnotherPartitionedTable();
+        catalog.alterTable(this.path1, newTable, false);
+        checkEquals(path1, newTable, (CatalogTable) catalog.getTable(this.path1));
+    }
+
+    @Test
+    public void testCreateTable_Batch() throws Exception {
+        catalog.createDatabase(path1.getDatabaseName(), null, false);
+        CatalogTable table = this.createTable();
+        catalog.createTable(this.path1, table, false);
+        CatalogBaseTable tableCreated = catalog.getTable(this.path1);
+        checkEquals(path1, table, (CatalogTable) tableCreated);
+        Assert.assertEquals("test comment", tableCreated.getDescription().get());
+        List<String> tables = catalog.listTables("db1");
+        Assert.assertEquals(1L, tables.size());
+        Assert.assertEquals(this.path1.getObjectName(), tables.get(0));
+        catalog.dropTable(this.path1, false);
+    }
+
+    @Test
+    public void testCreateTable_TableAlreadyExist_ignored() throws Exception {
+        catalog.createDatabase(path1.getDatabaseName(), null, false);
+        CatalogTable table = this.createTable();
+        catalog.createTable(this.path1, table, false);
+        checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
+        catalog.createTable(this.path1, this.createAnotherTable(), true);
+        checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
+    }
+
+    @Test
+    public void testCreatePartitionedTable_Batch() throws Exception {
+        catalog.createDatabase(path1.getDatabaseName(), null, false);
+        CatalogTable table = this.createPartitionedTable();
+        catalog.createTable(this.path1, table, false);
+        checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
+        List<String> tables = catalog.listTables("db1");
+        Assert.assertEquals(1L, tables.size());
+        Assert.assertEquals(this.path1.getObjectName(), tables.get(0));
+    }
+
+    @Test
+    public void testDropDb_DatabaseNotEmptyException() throws Exception {
+        catalog.createDatabase(path1.getDatabaseName(), null, false);
+        catalog.createTable(this.path1, this.createTable(), false);
+        this.exception.expect(DatabaseNotEmptyException.class);
+        this.exception.expectMessage("Database db1 in catalog test-catalog is not empty");
+        catalog.dropDatabase("db1", true, false);
+    }
+
+    @Test
+    public void testTableExists() throws Exception {
+        catalog.createDatabase(path1.getDatabaseName(), null, false);
+        Assert.assertFalse(catalog.tableExists(this.path1));
+        catalog.createTable(this.path1, this.createTable(), false);
+        Assert.assertTrue(catalog.tableExists(this.path1));
+    }
+
+    @Test
+    public void testAlterTable_TableNotExist_ignored() throws Exception {
+        catalog.alterTable(this.nonExistObjectPath, this.createTable(), true);
+        Assert.assertFalse(catalog.tableExists(this.nonExistObjectPath));
+    }
+
+    @Test
+    public void testDropTable_TableNotExist_ignored() throws Exception {
+        catalog.dropTable(this.nonExistObjectPath, true);
+    }
+
+    @Test
+    public void testCreateTable_TableAlreadyExistException() throws Exception {
+        catalog.createDatabase(path1.getDatabaseName(), null, false);
+        catalog.createTable(this.path1, this.createTable(), false);
+        this.exception.expect(TableAlreadyExistException.class);
+        this.exception.expectMessage("Table (or view) db1.t1 already exists in Catalog");
+        catalog.createTable(this.path1, this.createTable(), false);
+    }
+
+    @Test
+    public void testDropTable_nonPartitionedTable() throws Exception {
+        catalog.createDatabase(path1.getDatabaseName(), null, false);
+        catalog.createTable(this.path1, this.createTable(), false);
+        Assert.assertTrue(catalog.tableExists(this.path1));
+        catalog.dropTable(this.path1, false);
+        Assert.assertFalse(catalog.tableExists(this.path1));
+    }
+
+    @Test
+    public void testGetTable_TableNotExistException() throws Exception {
+        this.exception.expect(TableNotExistException.class);
+        this.exception.expectMessage("Table (or view) db1.nonexist does not exist in Catalog");
+        catalog.getTable(this.nonExistObjectPath);
+    }
+
+    @Test
+    public void testDbExists() throws Exception {
+        catalog.createDatabase(path1.getDatabaseName(), null, false);
+        catalog.createTable(this.path1, this.createTable(), false);
+        Assert.assertTrue(catalog.databaseExists("db1"));
+    }
+
+    @Test
+    public void testDropDb_DatabaseNotExist_Ignore() throws Exception {
+        catalog.dropDatabase("db1", true, false);
+    }
+
+    @Test
+    public void testAlterTable_TableNotExistException() throws Exception {
+        this.exception.expect(TableNotExistException.class);
+        this.exception.expectMessage("Table (or view) non.exist does not exist in Catalog");
+        catalog.alterTable(this.nonExistDbPath, this.createTable(), false);
+    }
+
+    @Test
+    public void testDropTable_TableNotExistException() throws Exception {
+        this.exception.expect(TableNotExistException.class);
+        this.exception.expectMessage("Table (or view) non.exist does not exist in Catalog");
+        catalog.dropTable(this.nonExistDbPath, false);
+    }
+
+    @Test
+    public void testCreateDb_Database() throws Exception {
+        catalog.createDatabase(path1.getDatabaseName(), null, false);
+        List<String> dbs = catalog.listDatabases();
+        assertThat(dbs).hasSize(2);
+        assertThat(new HashSet<>(dbs))
+                .isEqualTo(
+                        new HashSet<>(
+                                Arrays.asList(
+                                        path1.getDatabaseName(), catalog.getDefaultDatabase())));
+    }
+
+    @Test
+    public void testCreateDb_DatabaseAlreadyExistException() throws Exception {
+        catalog.createDatabase(path1.getDatabaseName(), null, false);
+
+        assertThatThrownBy(() -> catalog.createDatabase(path1.getDatabaseName(), null, false))
+                .isInstanceOf(DatabaseAlreadyExistException.class)
+                .hasMessage("Database db1 already exists in Catalog test-catalog.");
+    }
+
+    @Test
+    public void testCreateDb_DatabaseWithPropertiesException() {
+        CatalogDatabaseImpl database =
+                new CatalogDatabaseImpl(Collections.singletonMap("haa", "ccc"), null);
+        assertThatThrownBy(() -> catalog.createDatabase(path1.getDatabaseName(), database, false))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage("Create database with properties is unsupported.");
+    }
+
+    @Test
+    public void testCreateDb_DatabaseWithCommentException() {
+        CatalogDatabaseImpl database = new CatalogDatabaseImpl(Collections.emptyMap(), "haha");
+        assertThatThrownBy(() -> catalog.createDatabase(path1.getDatabaseName(), database, false))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage("Create database with description is unsupported.");
+    }
+
+    @Test
+    public void testCreateTable_DatabaseNotExistException() {
+        assertThat(catalog.databaseExists(path1.getDatabaseName())).isFalse();
+
+        assertThatThrownBy(() -> catalog.createTable(nonExistObjectPath, createTable(), false))
+                .isInstanceOf(DatabaseNotExistException.class)
+                .hasMessage("Database db1 does not exist in Catalog test-catalog.");
+    }
+
+    @Test
+    public void testDropDb_DatabaseNotExistException() {
+        assertThatThrownBy(() -> catalog.dropDatabase(path1.getDatabaseName(), false, false))
+                .isInstanceOf(DatabaseNotExistException.class)
+                .hasMessage("Database db1 does not exist in Catalog test-catalog.");
+    }
+
+    private void checkEquals(ObjectPath path, CatalogTable t1, CatalogTable t2) {
+        Path tablePath = ((FlinkCatalog) catalog).catalog().getTableLocation(path);
+        Map<String, String> options = new HashMap<>(t1.getOptions());
+        options.put("path", tablePath.toString());
+        t1 = ((ResolvedCatalogTable) t1).copy(options);
+        checkEquals(t1, t2);
+    }
+
+    private static void checkEquals(CatalogTable t1, CatalogTable t2) {
+        assertThat(t2.getTableKind()).isEqualTo(t1.getTableKind());
+        assertThat(t2.getSchema()).isEqualTo(t1.getSchema());
+        assertThat(t2.getComment()).isEqualTo(t1.getComment());
+        assertThat(t2.getPartitionKeys()).isEqualTo(t1.getPartitionKeys());
+        assertThat(t2.isPartitioned()).isEqualTo(t1.isPartitioned());
+        assertThat(t2.getOptions()).isEqualTo(t1.getOptions());
+    }
+}
diff --git a/flink-table-store-core/pom.xml b/flink-table-store-core/pom.xml
index e371d9e..b880507 100644
--- a/flink-table-store-core/pom.xml
+++ b/flink-table-store-core/pom.xml
@@ -89,54 +89,6 @@ under the License.
             <version>${flink.version}</version>
             <scope>test</scope>
         </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-common</artifactId>
-            <version>${flink.version}</version>
-            <type>test-jar</type>
-            <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>junit</groupId>
-                    <artifactId>junit</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-api-java</artifactId>
-            <version>${flink.version}</version>
-            <type>test-jar</type>
-            <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>junit</groupId>
-                    <artifactId>junit</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <version>${junit4.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.junit.vintage</groupId>
-            <artifactId>junit-vintage-engine</artifactId>
-            <version>${junit5.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>junit</groupId>
-                    <artifactId>junit</artifactId>
-                </exclusion>
-            </exclusions>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
 
     <build>
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
new file mode 100644
index 0000000..01b2366
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.catalog;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+
+import java.util.List;
+
+/**
+ * This interface is responsible for reading and writing metadata such as database/table from a
+ * table store catalog.
+ */
+public interface Catalog extends AutoCloseable {
+
+    /**
+     * Get the names of all databases in this catalog.
+     *
+     * @return a list of the names of all databases
+     */
+    List<String> listDatabases();
+
+    /**
+     * Check if a database exists in this catalog.
+     *
+     * @param databaseName Name of the database
+     * @return true if the given database exists in the catalog false otherwise
+     */
+    boolean databaseExists(String databaseName);
+
+    /**
+     * Create a database.
+     *
+     * @param name Name of the database to be created
+     * @param ignoreIfExists Flag to specify behavior when a database with the given name already
+     *     exists: if set to false, throw a DatabaseAlreadyExistException, if set to true, do
+     *     nothing.
+     * @throws DatabaseAlreadyExistException if the given database already exists and ignoreIfExists
+     *     is false
+     */
+    void createDatabase(String name, boolean ignoreIfExists) throws DatabaseAlreadyExistException;
+
+    /**
+     * Drop a database.
+     *
+     * @param name Name of the database to be dropped.
+     * @param ignoreIfNotExists Flag to specify behavior when the database does not exist: if set to
+     *     false, throw an exception, if set to true, do nothing.
+     * @param cascade Flag to specify behavior when the database contains table or function: if set
+     *     to true, delete all tables and functions in the database and then delete the database, if
+     *     set to false, throw an exception.
+     * @throws DatabaseNotEmptyException if the given database is not empty and isRestrict is true
+     */
+    void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
+            throws DatabaseNotExistException, DatabaseNotEmptyException;
+
+    /**
+     * Get names of all tables under this database. An empty list is returned if none exists.
+     *
+     * @return a list of the names of all tables in this database
+     * @throws DatabaseNotExistException if the database does not exist
+     */
+    List<String> listTables(String databaseName) throws DatabaseNotExistException;
+
+    /**
+     * Return the table location path identified by the given {@link ObjectPath}.
+     *
+     * @param tablePath Path of the table
+     * @return The requested table location
+     */
+    Path getTableLocation(ObjectPath tablePath);
+
+    /**
+     * Return a {@link TableSchema} identified by the given {@link ObjectPath}.
+     *
+     * @param tablePath Path of the table
+     * @return The requested table
+     * @throws TableNotExistException if the target does not exist
+     */
+    TableSchema getTable(ObjectPath tablePath) throws TableNotExistException;
+
+    /**
+     * Check if a table exists in this catalog.
+     *
+     * @param tablePath Path of the table
+     * @return true if the given table exists in the catalog false otherwise
+     */
+    boolean tableExists(ObjectPath tablePath);
+
+    /**
+     * Drop a table.
+     *
+     * @param tablePath Path of the table to be dropped
+     * @param ignoreIfNotExists Flag to specify behavior when the table does not exist: if set to
+     *     false, throw an exception, if set to true, do nothing.
+     * @throws TableNotExistException if the table does not exist
+     */
+    void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException;
+
+    /**
+     * Create a new table.
+     *
+     * @param tablePath path of the table to be created
+     * @param tableSchema the table definition
+     * @param ignoreIfExists flag to specify behavior when a table already exists at the given path:
+     *     if set to false, it throws a TableAlreadyExistException, if set to true, do nothing.
+     * @throws TableAlreadyExistException if table already exists and ignoreIfExists is false
+     * @throws DatabaseNotExistException if the database in tablePath doesn't exist
+     */
+    void createTable(ObjectPath tablePath, UpdateSchema tableSchema, boolean ignoreIfExists)
+            throws TableAlreadyExistException, DatabaseNotExistException;
+
+    /**
+     * Modify an existing table.
+     *
+     * @param tablePath path of the table to be modified
+     * @param newTableSchema the new table definition
+     * @param ignoreIfNotExists flag to specify behavior when the table does not exist: if set to
+     *     false, throw an exception, if set to true, do nothing.
+     * @throws TableNotExistException if the table does not exist
+     */
+    void alterTable(ObjectPath tablePath, UpdateSchema newTableSchema, boolean ignoreIfNotExists)
+            throws TableNotExistException;
+
+    /** Exception for trying to drop on a database that is not empty. */
+    class DatabaseNotEmptyException extends Exception {
+        private static final String MSG = "Database %s is not empty.";
+
+        private final String database;
+
+        public DatabaseNotEmptyException(String database, Throwable cause) {
+            super(String.format(MSG, database), cause);
+            this.database = database;
+        }
+
+        public DatabaseNotEmptyException(String database) {
+            this(database, null);
+        }
+
+        public String database() {
+            return database;
+        }
+    }
+
+    /** Exception for trying to create a database that already exists. */
+    class DatabaseAlreadyExistException extends Exception {
+        private static final String MSG = "Database %s already exists.";
+
+        private final String database;
+
+        public DatabaseAlreadyExistException(String database, Throwable cause) {
+            super(String.format(MSG, database), cause);
+            this.database = database;
+        }
+
+        public DatabaseAlreadyExistException(String database) {
+            this(database, null);
+        }
+
+        public String database() {
+            return database;
+        }
+    }
+
+    /** Exception for trying to operate on a database that doesn't exist. */
+    class DatabaseNotExistException extends Exception {
+        private static final String MSG = "Database %s does not exist.";
+
+        private final String database;
+
+        public DatabaseNotExistException(String database, Throwable cause) {
+            super(String.format(MSG, database), cause);
+            this.database = database;
+        }
+
+        public DatabaseNotExistException(String database) {
+            this(database, null);
+        }
+
+        public String database() {
+            return database;
+        }
+    }
+
+    /** Exception for trying to create a table that already exists. */
+    class TableAlreadyExistException extends Exception {
+
+        private static final String MSG = "Table %s already exists.";
+
+        private final ObjectPath tablePath;
+
+        public TableAlreadyExistException(ObjectPath tablePath) {
+            this(tablePath, null);
+        }
+
+        public TableAlreadyExistException(ObjectPath tablePath, Throwable cause) {
+            super(String.format(MSG, tablePath.getFullName()), cause);
+            this.tablePath = tablePath;
+        }
+
+        public ObjectPath tablePath() {
+            return tablePath;
+        }
+    }
+
+    /** Exception for trying to operate on a table that doesn't exist. */
+    class TableNotExistException extends Exception {
+
+        private static final String MSG = "Table %s does not exist.";
+
+        private final ObjectPath tablePath;
+
+        public TableNotExistException(ObjectPath tablePath) {
+            this(tablePath, null);
+        }
+
+        public TableNotExistException(ObjectPath tablePath, Throwable cause) {
+            super(String.format(MSG, tablePath.getFullName()), cause);
+            this.tablePath = tablePath;
+        }
+
+        public ObjectPath tablePath() {
+            return tablePath;
+        }
+    }
+
+    /** Create a {@link Catalog} from warehouse path. */
+    static Catalog create(Path warehouse) {
+        return new FileSystemCatalog(warehouse);
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
index f31a00d..d4a451e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
@@ -22,69 +22,32 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogDatabase;
-import org.apache.flink.table.catalog.CatalogDatabaseImpl;
-import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.ResolvedCatalogTable;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.table.factories.Factory;
-import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
-import org.apache.flink.util.function.RunnableWithException;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.Callable;
 
-import static org.apache.flink.table.catalog.GenericInMemoryCatalogFactoryOptions.DEFAULT_DATABASE;
-import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
-import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
-import static org.apache.flink.table.store.file.catalog.TableStoreCatalogFactory.IDENTIFIER;
 import static org.apache.flink.table.store.file.utils.FileUtils.safelyListFileStatus;
 
 /** A catalog implementation for {@link FileSystem}. */
-public class FileSystemCatalog extends TableStoreCatalog {
+public class FileSystemCatalog implements Catalog {
 
     public static final String DB_SUFFIX = ".db";
 
-    public static final CatalogDatabaseImpl DUMMY_DATABASE =
-            new CatalogDatabaseImpl(Collections.emptyMap(), null);
-
     private final FileSystem fs;
     private final Path warehouse;
 
-    public FileSystemCatalog(String name, Path warehouse) {
-        this(name, warehouse, DEFAULT_DATABASE.defaultValue());
-    }
-
-    public FileSystemCatalog(String name, Path warehouse, String defaultDatabase) {
-        super(name, defaultDatabase);
+    public FileSystemCatalog(Path warehouse) {
         this.warehouse = warehouse;
         this.fs = uncheck(warehouse::getFileSystem);
-        uncheck(() -> createDatabase(defaultDatabase, DUMMY_DATABASE, true));
     }
 
     @Override
-    public Optional<Factory> getFactory() {
-        return Optional.of(
-                FactoryUtil.discoverFactory(classLoader(), DynamicTableFactory.class, IDENTIFIER));
-    }
-
-    @Override
-    public List<String> listDatabases() throws CatalogException {
+    public List<String> listDatabases() {
         List<String> databases = new ArrayList<>();
         for (FileStatus status : uncheck(() -> safelyListFileStatus(warehouse))) {
             Path path = status.getPath();
@@ -96,62 +59,44 @@ public class FileSystemCatalog extends TableStoreCatalog {
     }
 
     @Override
-    public CatalogDatabase getDatabase(String databaseName)
-            throws DatabaseNotExistException, CatalogException {
-        if (!databaseExists(databaseName)) {
-            throw new DatabaseNotExistException(getName(), databaseName);
-        }
-        return new CatalogDatabaseImpl(Collections.emptyMap(), "");
-    }
-
-    @Override
-    public boolean databaseExists(String databaseName) throws CatalogException {
+    public boolean databaseExists(String databaseName) {
         return uncheck(() -> fs.exists(databasePath(databaseName)));
     }
 
     @Override
-    public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
-            throws DatabaseAlreadyExistException, CatalogException {
-        if (database.getProperties().size() > 0) {
-            throw new UnsupportedOperationException(
-                    "Create database with properties is unsupported.");
-        }
-
-        if (database.getDescription().isPresent() && !database.getDescription().get().equals("")) {
-            throw new UnsupportedOperationException(
-                    "Create database with description is unsupported.");
-        }
-
-        if (!ignoreIfExists && databaseExists(name)) {
-            throw new DatabaseAlreadyExistException(getName(), name);
+    public void createDatabase(String name, boolean ignoreIfExists)
+            throws DatabaseAlreadyExistException {
+        if (databaseExists(name)) {
+            if (ignoreIfExists) {
+                return;
+            }
+            throw new DatabaseAlreadyExistException(name);
         }
-
         uncheck(() -> fs.mkdirs(databasePath(name)));
     }
 
     @Override
     public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
-            throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
+            throws DatabaseNotExistException, DatabaseNotEmptyException {
         if (!databaseExists(name)) {
             if (ignoreIfNotExists) {
                 return;
             }
 
-            throw new DatabaseNotExistException(getName(), name);
+            throw new DatabaseNotExistException(name);
         }
 
-        if (listTables(name).size() > 0) {
-            throw new DatabaseNotEmptyException(getName(), name);
+        if (!cascade && listTables(name).size() > 0) {
+            throw new DatabaseNotEmptyException(name);
         }
 
         uncheck(() -> fs.delete(databasePath(name), true));
     }
 
     @Override
-    public List<String> listTables(String databaseName)
-            throws DatabaseNotExistException, CatalogException {
+    public List<String> listTables(String databaseName) throws DatabaseNotExistException {
         if (!databaseExists(databaseName)) {
-            throw new DatabaseNotExistException(getName(), databaseName);
+            throw new DatabaseNotExistException(databaseName);
         }
 
         List<String> tables = new ArrayList<>();
@@ -164,22 +109,20 @@ public class FileSystemCatalog extends TableStoreCatalog {
     }
 
     @Override
-    public CatalogBaseTable getTable(ObjectPath tablePath)
-            throws TableNotExistException, CatalogException {
+    public Path getTableLocation(ObjectPath tablePath) {
+        return tablePath(tablePath);
+    }
+
+    @Override
+    public TableSchema getTable(ObjectPath tablePath) throws TableNotExistException {
         Path path = tablePath(tablePath);
-        TableSchema tableSchema =
-                new SchemaManager(path)
-                        .latest()
-                        .orElseThrow(() -> new TableNotExistException(getName(), tablePath));
-
-        CatalogTable table = tableSchema.toUpdateSchema().toCatalogTable();
-        // add path to source and sink
-        table.getOptions().put(PATH.key(), path.toString());
-        return table;
+        return new SchemaManager(path)
+                .latest()
+                .orElseThrow(() -> new TableNotExistException(tablePath));
     }
 
     @Override
-    public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+    public boolean tableExists(ObjectPath tablePath) {
         return tableExists(tablePath(tablePath));
     }
 
@@ -189,24 +132,24 @@ public class FileSystemCatalog extends TableStoreCatalog {
 
     @Override
     public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
-            throws TableNotExistException, CatalogException {
+            throws TableNotExistException {
         Path path = tablePath(tablePath);
         if (!tableExists(path)) {
             if (ignoreIfNotExists) {
                 return;
             }
 
-            throw new TableNotExistException(getName(), tablePath);
+            throw new TableNotExistException(tablePath);
         }
 
         uncheck(() -> fs.delete(path, true));
     }
 
     @Override
-    public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
-            throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+    public void createTable(ObjectPath tablePath, UpdateSchema table, boolean ignoreIfExists)
+            throws TableAlreadyExistException, DatabaseNotExistException {
         if (!databaseExists(tablePath.getDatabaseName())) {
-            throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());
+            throw new DatabaseNotExistException(tablePath.getDatabaseName());
         }
 
         Path path = tablePath(tablePath);
@@ -215,23 +158,22 @@ public class FileSystemCatalog extends TableStoreCatalog {
                 return;
             }
 
-            throw new TableAlreadyExistException(getName(), tablePath);
+            throw new TableAlreadyExistException(tablePath);
         }
 
         commitTableChange(path, table);
     }
 
     @Override
-    public void alterTable(
-            ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
-            throws TableNotExistException, CatalogException {
+    public void alterTable(ObjectPath tablePath, UpdateSchema newTable, boolean ignoreIfNotExists)
+            throws TableNotExistException {
         Path path = tablePath(tablePath);
         if (!tableExists(path)) {
             if (ignoreIfNotExists) {
                 return;
             }
 
-            throw new TableNotExistException(getName(), tablePath);
+            throw new TableNotExistException(tablePath);
         }
 
         commitTableChange(path, newTable);
@@ -241,25 +183,10 @@ public class FileSystemCatalog extends TableStoreCatalog {
         try {
             return callable.call();
         } catch (Exception e) {
-            if (e instanceof RuntimeException) {
-                throw (RuntimeException) e;
-            }
-            throw new CatalogException(e);
+            throw new RuntimeException(e);
         }
     }
 
-    private static void uncheck(RunnableWithException runnable) {
-        FileSystemCatalog.uncheck(
-                () -> {
-                    runnable.run();
-                    return null;
-                });
-    }
-
-    private static ClassLoader classLoader() {
-        return FileSystemCatalog.class.getClassLoader();
-    }
-
     private static boolean isDatabase(Path path) {
         return path.getName().endsWith(DB_SUFFIX);
     }
@@ -278,30 +205,10 @@ public class FileSystemCatalog extends TableStoreCatalog {
         return new Path(databasePath(objectPath.getDatabaseName()), objectPath.getObjectName());
     }
 
-    private void commitTableChange(Path tablePath, CatalogBaseTable baseTable) {
-        if (!(baseTable instanceof ResolvedCatalogTable)) {
-            throw new UnsupportedOperationException(
-                    "Only support ResolvedCatalogTable, but is: " + baseTable.getClass());
-        }
-        ResolvedCatalogTable table = (ResolvedCatalogTable) baseTable;
-        Map<String, String> options = table.getOptions();
-        if (options.containsKey(CONNECTOR.key())) {
-            throw new CatalogException(
-                    "Table Store Catalog only supports table store tables, not Flink connector: "
-                            + options.get(CONNECTOR.key()));
-        }
-
-        // remove table path
-        String specific = options.remove(PATH.key());
-        if (specific != null) {
-            if (!tablePath.equals(new Path(specific))) {
-                throw new IllegalArgumentException(
-                        "Illegal table path in table options: " + specific);
-            }
-            table = table.copy(options);
-        }
-
-        UpdateSchema updateSchema = UpdateSchema.fromCatalogTable(table);
-        uncheck(() -> new SchemaManager(tablePath).commitNewVersion(updateSchema));
+    private void commitTableChange(Path tablePath, UpdateSchema table) {
+        uncheck(() -> new SchemaManager(tablePath).commitNewVersion(table));
     }
+
+    @Override
+    public void close() throws Exception {}
 }
diff --git a/flink-table-store-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table-store-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
deleted file mode 100644
index 8e32399..0000000
--- a/flink-table-store-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.flink.table.store.file.catalog.TableStoreCatalogFactory
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogTest.java
deleted file mode 100644
index 93b8439..0000000
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogTest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file.catalog;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.catalog.CatalogTest;
-
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.IOException;
-
-/** Test for {@link FileSystemCatalog}. */
-public class FileSystemCatalogTest extends TableStoreCatalogTest {
-
-    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
-
-    @BeforeClass
-    public static void beforeEach() throws IOException {
-        String path = TEMPORARY_FOLDER.newFolder().toURI().toString();
-        catalog = new FileSystemCatalog(CatalogTest.TEST_CATALOG_NAME, new Path(path));
-    }
-}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogTest.java
deleted file mode 100644
index 83f13f1..0000000
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogTest.java
+++ /dev/null
@@ -1,425 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file.catalog;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogDatabase;
-import org.apache.flink.table.catalog.CatalogDatabaseImpl;
-import org.apache.flink.table.catalog.CatalogFunction;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTestBase;
-import org.apache.flink.table.catalog.CatalogTestUtil;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.ResolvedCatalogTable;
-import org.apache.flink.table.catalog.ResolvedSchema;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/** Test for {@link TableStoreCatalog}. */
-public abstract class TableStoreCatalogTest extends CatalogTestBase {
-
-    @Override
-    protected boolean isGeneric() {
-        return false;
-    }
-
-    @Override
-    protected Map<String, String> getBatchTableProperties() {
-        return new HashMap<String, String>() {
-            {
-                this.put("is_streaming", "false");
-            }
-        };
-    }
-
-    @Override
-    protected Map<String, String> getStreamingTableProperties() {
-        return new HashMap<String, String>() {
-            {
-                this.put("is_streaming", "true");
-            }
-        };
-    }
-
-    @Override
-    public CatalogDatabase createDb() {
-        return new CatalogDatabaseImpl(Collections.emptyMap(), "");
-    }
-
-    @Override
-    public CatalogTable createAnotherTable() {
-        // TODO support change schema, modify it to createAnotherSchema
-        ResolvedSchema resolvedSchema = this.createSchema();
-        CatalogTable origin =
-                CatalogTable.of(
-                        Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
-                        "test comment",
-                        Collections.emptyList(),
-                        this.getBatchTableProperties());
-        return new ResolvedCatalogTable(origin, resolvedSchema);
-    }
-
-    @Override
-    public CatalogTable createAnotherPartitionedTable() {
-        // TODO support change schema, modify it to createAnotherSchema
-        ResolvedSchema resolvedSchema = this.createSchema();
-        CatalogTable origin =
-                CatalogTable.of(
-                        Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
-                        "test comment",
-                        this.createPartitionKeys(),
-                        this.getBatchTableProperties());
-        return new ResolvedCatalogTable(origin, resolvedSchema);
-    }
-
-    @Override
-    public CatalogDatabase createAnotherDb() {
-        // Not support database with properties or comment
-        return new CatalogDatabaseImpl(new HashMap<String, String>() {}, null);
-    }
-
-    @Override
-    public void testAlterTable() throws Exception {
-        catalog.createDatabase("db1", this.createDb(), false);
-        CatalogTable table = this.createTable();
-        catalog.createTable(this.path1, table, false);
-        checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
-        CatalogTable newTable = this.createAnotherTable();
-        catalog.alterTable(this.path1, newTable, false);
-        Assert.assertNotEquals(table, catalog.getTable(this.path1));
-        checkEquals(path1, newTable, (CatalogTable) catalog.getTable(this.path1));
-        catalog.dropTable(this.path1, false);
-
-        // Not support views
-    }
-
-    @Test
-    public void testListTables() throws Exception {
-        catalog.createDatabase("db1", this.createDb(), false);
-        catalog.createTable(this.path1, this.createTable(), false);
-        catalog.createTable(this.path3, this.createTable(), false);
-        Assert.assertEquals(2L, catalog.listTables("db1").size());
-
-        // Not support views
-    }
-
-    @Override
-    public void testAlterTable_differentTypedTable() {
-        // TODO support this
-    }
-
-    @Test
-    public void testCreateFlinkTable() throws DatabaseAlreadyExistException {
-        // create a flink table
-        CatalogTable table = createTable();
-        HashMap<String, String> newOptions = new HashMap<>(table.getOptions());
-        newOptions.put("connector", "filesystem");
-        CatalogTable newTable = table.copy(newOptions);
-
-        catalog.createDatabase("db1", this.createDb(), false);
-
-        assertThatThrownBy(() -> catalog.createTable(this.path1, newTable, false))
-                .isInstanceOf(CatalogException.class)
-                .hasMessageContaining(
-                        "Table Store Catalog only supports table store tables, not Flink connector: filesystem");
-    }
-
-    @Test
-    public void testCreateTable_Streaming() throws Exception {
-        catalog.createDatabase("db1", createDb(), false);
-        CatalogTable table = createStreamingTable();
-        catalog.createTable(path1, table, false);
-        checkEquals(path1, table, (CatalogTable) catalog.getTable(path1));
-    }
-
-    @Test
-    public void testAlterPartitionedTable() throws Exception {
-        catalog.createDatabase("db1", this.createDb(), false);
-        CatalogTable table = this.createPartitionedTable();
-        catalog.createTable(this.path1, table, false);
-        checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
-        CatalogTable newTable = this.createAnotherPartitionedTable();
-        catalog.alterTable(this.path1, newTable, false);
-        checkEquals(path1, newTable, (CatalogTable) catalog.getTable(this.path1));
-    }
-
-    @Test
-    public void testCreateTable_Batch() throws Exception {
-        catalog.createDatabase("db1", this.createDb(), false);
-        CatalogTable table = this.createTable();
-        catalog.createTable(this.path1, table, false);
-        CatalogBaseTable tableCreated = catalog.getTable(this.path1);
-        checkEquals(path1, table, (CatalogTable) tableCreated);
-        Assert.assertEquals("test comment", tableCreated.getDescription().get());
-        List<String> tables = catalog.listTables("db1");
-        Assert.assertEquals(1L, tables.size());
-        Assert.assertEquals(this.path1.getObjectName(), tables.get(0));
-        catalog.dropTable(this.path1, false);
-    }
-
-    @Test
-    public void testCreateTable_TableAlreadyExist_ignored() throws Exception {
-        catalog.createDatabase("db1", this.createDb(), false);
-        CatalogTable table = this.createTable();
-        catalog.createTable(this.path1, table, false);
-        checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
-        catalog.createTable(this.path1, this.createAnotherTable(), true);
-        checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
-    }
-
-    @Test
-    public void testCreatePartitionedTable_Batch() throws Exception {
-        catalog.createDatabase("db1", this.createDb(), false);
-        CatalogTable table = this.createPartitionedTable();
-        catalog.createTable(this.path1, table, false);
-        checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
-        List<String> tables = catalog.listTables("db1");
-        Assert.assertEquals(1L, tables.size());
-        Assert.assertEquals(this.path1.getObjectName(), tables.get(0));
-    }
-
-    private void checkEquals(ObjectPath path, CatalogTable t1, CatalogTable t2) {
-        Path tablePath = ((FileSystemCatalog) catalog).tablePath(path);
-        Map<String, String> options = new HashMap<>(t1.getOptions());
-        options.put("path", tablePath.toString());
-        t1 = ((ResolvedCatalogTable) t1).copy(options);
-        CatalogTestUtil.checkEquals(t1, t2);
-    }
-
-    // --------------------- unsupported methods ----------------------------
-
-    @Override
-    protected CatalogFunction createFunction() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected CatalogFunction createPythonFunction() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected CatalogFunction createAnotherFunction() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void testCreateView() {}
-
-    @Override
-    public void testAlterDb() {}
-
-    @Override
-    public void testAlterDb_DatabaseNotExistException() {}
-
-    @Override
-    public void testAlterDb_DatabaseNotExist_ignored() {}
-
-    @Override
-    public void testRenameTable_nonPartitionedTable() {}
-
-    @Override
-    public void testRenameTable_TableNotExistException_ignored() {}
-
-    @Override
-    public void testRenameTable_TableNotExistException() {}
-
-    @Override
-    public void testRenameTable_TableAlreadyExistException() {}
-
-    @Override
-    public void testCreateView_DatabaseNotExistException() {}
-
-    @Override
-    public void testCreateView_TableAlreadyExistException() {}
-
-    @Override
-    public void testCreateView_TableAlreadyExist_ignored() {}
-
-    @Override
-    public void testDropView() {}
-
-    @Override
-    public void testAlterView() {}
-
-    @Override
-    public void testAlterView_TableNotExistException() {}
-
-    @Override
-    public void testAlterView_TableNotExist_ignored() {}
-
-    @Override
-    public void testListView() {}
-
-    @Override
-    public void testRenameView() {}
-
-    @Override
-    public void testCreateFunction() {}
-
-    @Override
-    public void testCreatePythonFunction() {}
-
-    @Override
-    public void testCreateFunction_DatabaseNotExistException() {}
-
-    @Override
-    public void testCreateFunction_FunctionAlreadyExistException() {}
-
-    @Override
-    public void testAlterFunction() {}
-
-    @Override
-    public void testAlterPythonFunction() {}
-
-    @Override
-    public void testAlterFunction_FunctionNotExistException() {}
-
-    @Override
-    public void testAlterFunction_FunctionNotExist_ignored() {}
-
-    @Override
-    public void testListFunctions() {}
-
-    @Override
-    public void testListFunctions_DatabaseNotExistException() {}
-
-    @Override
-    public void testGetFunction_FunctionNotExistException() {}
-
-    @Override
-    public void testGetFunction_FunctionNotExistException_NoDb() {}
-
-    @Override
-    public void testDropFunction() {}
-
-    @Override
-    public void testDropFunction_FunctionNotExistException() {}
-
-    @Override
-    public void testDropFunction_FunctionNotExist_ignored() {}
-
-    @Override
-    public void testCreatePartition() {}
-
-    @Override
-    public void testCreatePartition_TableNotExistException() {}
-
-    @Override
-    public void testCreatePartition_TableNotPartitionedException() {}
-
-    @Override
-    public void testCreatePartition_PartitionSpecInvalidException() {}
-
-    @Override
-    public void testCreatePartition_PartitionAlreadyExistsException() {}
-
-    @Override
-    public void testCreatePartition_PartitionAlreadyExists_ignored() {}
-
-    @Override
-    public void testDropPartition() {}
-
-    @Override
-    public void testDropPartition_TableNotExist() {}
-
-    @Override
-    public void testDropPartition_TableNotPartitioned() {}
-
-    @Override
-    public void testDropPartition_PartitionSpecInvalid() {}
-
-    @Override
-    public void testDropPartition_PartitionNotExist() {}
-
-    @Override
-    public void testDropPartition_PartitionNotExist_ignored() {}
-
-    @Override
-    public void testAlterPartition() {}
-
-    @Override
-    public void testAlterPartition_TableNotExist() {}
-
-    @Override
-    public void testAlterPartition_TableNotPartitioned() {}
-
-    @Override
-    public void testAlterPartition_PartitionSpecInvalid() {}
-
-    @Override
-    public void testAlterPartition_PartitionNotExist() {}
-
-    @Override
-    public void testAlterPartition_PartitionNotExist_ignored() {}
-
-    @Override
-    public void testGetPartition_TableNotExist() {}
-
-    @Override
-    public void testGetPartition_TableNotPartitioned() {}
-
-    @Override
-    public void testGetPartition_PartitionSpecInvalid_invalidPartitionSpec() {}
-
-    @Override
-    public void testGetPartition_PartitionSpecInvalid_sizeNotEqual() {}
-
-    @Override
-    public void testGetPartition_PartitionNotExist() {}
-
-    @Override
-    public void testPartitionExists() {}
-
-    @Override
-    public void testListPartitionPartialSpec() {}
-
-    @Override
-    public void testGetTableStats_TableNotExistException() {}
-
-    @Override
-    public void testGetPartitionStats() {}
-
-    @Override
-    public void testAlterTableStats() {}
-
-    @Override
-    public void testAlterTableStats_partitionedTable() {}
-
-    @Override
-    public void testAlterPartitionTableStats() {}
-
-    @Override
-    public void testAlterTableStats_TableNotExistException() {}
-
-    @Override
-    public void testAlterTableStats_TableNotExistException_ignore() {}
-}