You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/13 09:50:49 UTC

[flink-table-store] branch master updated: [FLINK-27755] Introduce a filesystem catalog for table store

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 14dbb700 [FLINK-27755] Introduce a filesystem catalog for table store
14dbb700 is described below

commit 14dbb700f6d5222a03f6b6b8b0b17a1a5a6a4d9c
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Mon Jun 13 17:50:45 2022 +0800

    [FLINK-27755] Introduce a filesystem catalog for table store
    
    This closes #137
---
 .../connector/TableStoreConnectorFactory.java      |  32 ++
 .../org.apache.flink.table.factories.Factory       |   1 +
 .../store/connector/FileSystemCatalogITCase.java   | 111 +++++++
 flink-table-store-core/pom.xml                     |  35 +++
 .../store/file/catalog/FileSystemCatalog.java      | 305 ++++++++++++++++++
 .../store/file/catalog/TableStoreCatalog.java      | 224 ++++++++++++++
 .../file/catalog/TableStoreCatalogFactory.java     |  76 +++++
 .../flink/table/store/file/schema/Schema.java      |  14 +-
 .../table/store/file/schema/UpdateSchema.java      |  86 ++++--
 .../org.apache.flink.table.factories.Factory       |   2 +-
 .../store/file/catalog/FileSystemCatalogTest.java  |  40 +++
 .../store/file/catalog/TableStoreCatalogTest.java  | 340 +++++++++++++++++++++
 12 files changed, 1235 insertions(+), 31 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
new file mode 100644
index 00000000..9c23f21e
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.table.factories.DynamicTableFactory;
+
+import static org.apache.flink.table.store.file.catalog.TableStoreCatalogFactory.IDENTIFIER;
+
+/** A table store {@link DynamicTableFactory} to create source and sink. */
+public class TableStoreConnectorFactory extends AbstractTableStoreFactory {
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+}
diff --git a/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 726415ab..2de717ce 100644
--- a/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -14,3 +14,4 @@
 # limitations under the License.
 
 org.apache.flink.table.store.connector.TableStoreManagedFactory
+org.apache.flink.table.store.connector.TableStoreConnectorFactory
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java
new file mode 100644
index 00000000..642d0ddd
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.table.store.file.catalog.FileSystemCatalog;
+import org.apache.flink.table.store.file.utils.BlockingIterator;
+import org.apache.flink.table.store.kafka.KafkaTableTestBase;
+import org.apache.flink.types.Row;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for {@link FileSystemCatalog}. */
+public class FileSystemCatalogITCase extends KafkaTableTestBase {
+
+    @Before
+    public void before() throws IOException {
+        String path = TEMPORARY_FOLDER.newFolder().toURI().toString();
+        tEnv.executeSql(
+                String.format(
+                        "CREATE CATALOG fs WITH ('type'='table-store', 'warehouse'='%s')", path));
+        tEnv.useCatalog("fs");
+        env.setParallelism(1);
+    }
+
+    @Test
+    public void testWriteRead() throws Exception {
+        tEnv.executeSql("CREATE TABLE T (a STRING, b STRING, c STRING)");
+        innerTestWriteRead();
+    }
+
+    @Test
+    public void testLogWriteRead() throws Exception {
+        String topic = UUID.randomUUID().toString();
+        createTopicIfNotExists(topic, 1);
+
+        try {
+            tEnv.executeSql(
+                    String.format(
+                            "CREATE TABLE T (a STRING, b STRING, c STRING) WITH ("
+                                    + "'log.system'='kafka', "
+                                    + "'log.kafka.bootstrap.servers'='%s',"
+                                    + "'log.topic'='%s'"
+                                    + ")",
+                            getBootstrapServers(), topic));
+            innerTestWriteRead();
+        } finally {
+            deleteTopicIfExists(topic);
+        }
+    }
+
+    @Test
+    public void testLogWriteReadWithVirtual() throws Exception {
+        String topic = UUID.randomUUID().toString();
+        createTopicIfNotExists(topic, 1);
+
+        try {
+            tEnv.executeSql(
+                    String.format(
+                            "CREATE TABLE T ("
+                                    + "a STRING, "
+                                    + "b STRING, "
+                                    + "c STRING, "
+                                    + "d AS CAST(c as INT) + 1"
+                                    + ") WITH ("
+                                    + "'log.system'='kafka', "
+                                    + "'log.kafka.bootstrap.servers'='%s',"
+                                    + "'log.topic'='%s'"
+                                    + ")",
+                            getBootstrapServers(), topic));
+            BlockingIterator<Row, Row> iterator =
+                    BlockingIterator.of(tEnv.from("T").execute().collect());
+            tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5', '6')").await();
+            List<Row> result = iterator.collectAndClose(2);
+            assertThat(result)
+                    .containsExactlyInAnyOrder(Row.of("1", "2", "3", 4), Row.of("4", "5", "6", 7));
+        } finally {
+            deleteTopicIfExists(topic);
+        }
+    }
+
+    private void innerTestWriteRead() throws Exception {
+        BlockingIterator<Row, Row> iterator =
+                BlockingIterator.of(tEnv.from("T").execute().collect());
+        tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5', '6')").await();
+        List<Row> result = iterator.collectAndClose(2);
+        assertThat(result).containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
+    }
+}
diff --git a/flink-table-store-core/pom.xml b/flink-table-store-core/pom.xml
index b880507d..b039068f 100644
--- a/flink-table-store-core/pom.xml
+++ b/flink-table-store-core/pom.xml
@@ -89,6 +89,41 @@ under the License.
             <version>${flink.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>junit</groupId>
+                    <artifactId>junit</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>junit</groupId>
+                    <artifactId>junit</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit4.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
new file mode 100644
index 00000000..c772a867
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.catalog;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.util.function.RunnableWithException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+
+import static org.apache.flink.table.catalog.GenericInMemoryCatalogFactoryOptions.DEFAULT_DATABASE;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
+import static org.apache.flink.table.store.file.catalog.TableStoreCatalogFactory.IDENTIFIER;
+import static org.apache.flink.table.store.file.utils.FileUtils.safelyListFileStatus;
+
+/** A catalog implementation for {@link FileSystem}. */
+public class FileSystemCatalog extends TableStoreCatalog {
+
+    public static final String DB_SUFFIX = ".db";
+
+    public static final CatalogDatabaseImpl DUMMY_DATABASE =
+            new CatalogDatabaseImpl(Collections.emptyMap(), null);
+
+    private final FileSystem fs;
+    private final Path warehouse;
+
+    public FileSystemCatalog(String name, Path warehouse) {
+        this(name, warehouse, DEFAULT_DATABASE.defaultValue());
+    }
+
+    public FileSystemCatalog(String name, Path warehouse, String defaultDatabase) {
+        super(name, defaultDatabase);
+        this.warehouse = warehouse;
+        this.fs = uncheck(warehouse::getFileSystem);
+        uncheck(() -> createDatabase(defaultDatabase, DUMMY_DATABASE, true));
+    }
+
+    @Override
+    public Optional<Factory> getFactory() {
+        return Optional.of(
+                FactoryUtil.discoverFactory(classLoader(), DynamicTableFactory.class, IDENTIFIER));
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        List<String> databases = new ArrayList<>();
+        for (FileStatus status : uncheck(() -> safelyListFileStatus(warehouse))) {
+            Path path = status.getPath();
+            if (status.isDir() && isDatabase(path)) {
+                databases.add(database(path));
+            }
+        }
+        return databases;
+    }
+
+    @Override
+    public CatalogDatabase getDatabase(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+        return new CatalogDatabaseImpl(Collections.emptyMap(), "");
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException {
+        return uncheck(() -> fs.exists(databasePath(databaseName)));
+    }
+
+    @Override
+    public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        if (database.getProperties().size() > 0) {
+            throw new UnsupportedOperationException(
+                    "Create database with properties is unsupported.");
+        }
+
+        if (database.getDescription().isPresent() && !database.getDescription().get().equals("")) {
+            throw new UnsupportedOperationException(
+                    "Create database with description is unsupported.");
+        }
+
+        if (!ignoreIfExists && databaseExists(name)) {
+            throw new DatabaseAlreadyExistException(getName(), name);
+        }
+
+        uncheck(() -> fs.mkdirs(databasePath(name)));
+    }
+
+    @Override
+    public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
+            throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
+        if (!databaseExists(name)) {
+            if (ignoreIfNotExists) {
+                return;
+            }
+
+            throw new DatabaseNotExistException(getName(), name);
+        }
+
+        if (listTables(name).size() > 0) {
+            throw new DatabaseNotEmptyException(getName(), name);
+        }
+
+        uncheck(() -> fs.delete(databasePath(name), true));
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        List<String> tables = new ArrayList<>();
+        for (FileStatus status : uncheck(() -> safelyListFileStatus(databasePath(databaseName)))) {
+            if (status.isDir() && tableExists(status.getPath())) {
+                tables.add(status.getPath().getName());
+            }
+        }
+        return tables;
+    }
+
+    @Override
+    public CatalogBaseTable getTable(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        Path path = tablePath(tablePath);
+        Schema schema =
+                new SchemaManager(path)
+                        .latest()
+                        .orElseThrow(() -> new TableNotExistException(getName(), tablePath));
+
+        CatalogTable table = schema.toUpdateSchema().toCatalogTable();
+        // add path to source and sink
+        table.getOptions().put(PATH.key(), path.toString());
+        return table;
+    }
+
+    @Override
+    public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+        return tableExists(tablePath(tablePath));
+    }
+
+    private boolean tableExists(Path tablePath) {
+        return new SchemaManager(tablePath).listAllIds().size() > 0;
+    }
+
+    @Override
+    public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        Path path = tablePath(tablePath);
+        if (!tableExists(path)) {
+            if (ignoreIfNotExists) {
+                return;
+            }
+
+            throw new TableNotExistException(getName(), tablePath);
+        }
+
+        uncheck(() -> fs.delete(path, true));
+    }
+
+    @Override
+    public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
+            throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+        if (!databaseExists(tablePath.getDatabaseName())) {
+            throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());
+        }
+
+        Path path = tablePath(tablePath);
+        if (tableExists(path)) {
+            if (ignoreIfExists) {
+                return;
+            }
+
+            throw new TableAlreadyExistException(getName(), tablePath);
+        }
+
+        commitTableChange(path, table);
+    }
+
+    @Override
+    public void alterTable(
+            ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        Path path = tablePath(tablePath);
+        if (!tableExists(path)) {
+            if (ignoreIfNotExists) {
+                return;
+            }
+
+            throw new TableNotExistException(getName(), tablePath);
+        }
+
+        commitTableChange(path, newTable);
+    }
+
+    private static <T> T uncheck(Callable<T> callable) {
+        try {
+            return callable.call();
+        } catch (Exception e) {
+            if (e instanceof RuntimeException) {
+                throw (RuntimeException) e;
+            }
+            throw new CatalogException(e);
+        }
+    }
+
+    private static void uncheck(RunnableWithException runnable) {
+        FileSystemCatalog.uncheck(
+                () -> {
+                    runnable.run();
+                    return null;
+                });
+    }
+
+    private static ClassLoader classLoader() {
+        return FileSystemCatalog.class.getClassLoader();
+    }
+
+    private static boolean isDatabase(Path path) {
+        return path.getName().endsWith(DB_SUFFIX);
+    }
+
+    private static String database(Path path) {
+        String name = path.getName();
+        return name.substring(0, name.length() - DB_SUFFIX.length());
+    }
+
+    private Path databasePath(String database) {
+        return new Path(warehouse, database + DB_SUFFIX);
+    }
+
+    private Path tablePath(ObjectPath objectPath) {
+        return new Path(databasePath(objectPath.getDatabaseName()), objectPath.getObjectName());
+    }
+
+    private void commitTableChange(Path tablePath, CatalogBaseTable baseTable) {
+        if (!(baseTable instanceof ResolvedCatalogTable)) {
+            throw new UnsupportedOperationException(
+                    "Only support ResolvedCatalogTable, but is: " + baseTable.getClass());
+        }
+        ResolvedCatalogTable table = (ResolvedCatalogTable) baseTable;
+        Map<String, String> options = table.getOptions();
+        if (options.containsKey(CONNECTOR.key())) {
+            throw new CatalogException(
+                    "Table Store Catalog only supports table store tables, not Flink connector: "
+                            + options.get(CONNECTOR.key()));
+        }
+
+        // remove table path
+        String specific = options.remove(PATH.key());
+        if (specific != null) {
+            if (!tablePath.equals(new Path(specific))) {
+                throw new IllegalArgumentException(
+                        "Illegal table path in table options: " + specific);
+            }
+            table = table.copy(options);
+        }
+
+        UpdateSchema updateSchema = UpdateSchema.fromCatalogTable(table);
+        uncheck(() -> new SchemaManager(tablePath).commitNewVersion(updateSchema));
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/TableStoreCatalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/TableStoreCatalog.java
new file mode 100644
index 00000000..847c1b99
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/TableStoreCatalog.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.catalog;
+
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+
+import java.util.Collections;
+import java.util.List;
+
+/** Catalog for table store. */
+public abstract class TableStoreCatalog extends AbstractCatalog {
+
+    public TableStoreCatalog(String name, String defaultDatabase) {
+        super(name, defaultDatabase);
+    }
+
+    // --------------------- unsupported methods ----------------------------
+
+    @Override
+    public final void open() throws CatalogException {}
+
+    @Override
+    public final void close() throws CatalogException {}
+
+    @Override
+    public final void alterDatabase(
+            String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public final void renameTable(
+            ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public final List<String> listViews(String databaseName) throws CatalogException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public final List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+            throws CatalogException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public final List<CatalogPartitionSpec> listPartitions(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public final List<CatalogPartitionSpec> listPartitionsByFilter(
+            ObjectPath tablePath, List<Expression> filters) throws CatalogException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public final CatalogPartition getPartition(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws PartitionNotExistException, CatalogException {
+        throw new PartitionNotExistException(getName(), tablePath, partitionSpec);
+    }
+
+    @Override
+    public final boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws CatalogException {
+        return false;
+    }
+
+    @Override
+    public final void createPartition(
+            ObjectPath tablePath,
+            CatalogPartitionSpec partitionSpec,
+            CatalogPartition partition,
+            boolean ignoreIfExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public final void dropPartition(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public final void alterPartition(
+            ObjectPath tablePath,
+            CatalogPartitionSpec partitionSpec,
+            CatalogPartition newPartition,
+            boolean ignoreIfNotExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public final List<String> listFunctions(String dbName) throws CatalogException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public final CatalogFunction getFunction(ObjectPath functionPath)
+            throws FunctionNotExistException, CatalogException {
+        throw new FunctionNotExistException(getName(), functionPath);
+    }
+
+    @Override
+    public final boolean functionExists(ObjectPath functionPath) throws CatalogException {
+        return false;
+    }
+
+    @Override
+    public final void createFunction(
+            ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public final void alterFunction(
+            ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public final void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public final CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
+            throws CatalogException {
+        return CatalogTableStatistics.UNKNOWN;
+    }
+
+    @Override
+    public final CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath)
+            throws CatalogException {
+        return CatalogColumnStatistics.UNKNOWN;
+    }
+
+    @Override
+    public final CatalogTableStatistics getPartitionStatistics(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
+        return CatalogTableStatistics.UNKNOWN;
+    }
+
+    @Override
+    public final CatalogColumnStatistics getPartitionColumnStatistics(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
+        return CatalogColumnStatistics.UNKNOWN;
+    }
+
+    @Override
+    public final void alterTableStatistics(
+            ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public final void alterTableColumnStatistics(
+            ObjectPath tablePath,
+            CatalogColumnStatistics columnStatistics,
+            boolean ignoreIfNotExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public final void alterPartitionStatistics(
+            ObjectPath tablePath,
+            CatalogPartitionSpec partitionSpec,
+            CatalogTableStatistics partitionStatistics,
+            boolean ignoreIfNotExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public final void alterPartitionColumnStatistics(
+            ObjectPath tablePath,
+            CatalogPartitionSpec partitionSpec,
+            CatalogColumnStatistics columnStatistics,
+            boolean ignoreIfNotExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogFactory.java
new file mode 100644
index 00000000..c14d0d0e
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.catalog;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
+
+/** Factory for {@link TableStoreCatalog}. */
+public class TableStoreCatalogFactory implements CatalogFactory {
+
+    public static final String IDENTIFIER = "table-store";
+
+    public static final ConfigOption<String> DEFAULT_DATABASE =
+            ConfigOptions.key("default-database").stringType().defaultValue("default");
+
+    public static final ConfigOption<String> WAREHOUSE =
+            ConfigOptions.key("warehouse")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The warehouse root path of catalog.");
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(WAREHOUSE);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(DEFAULT_DATABASE);
+        options.add(PROPERTY_VERSION);
+        return options;
+    }
+
+    @Override
+    public TableStoreCatalog createCatalog(Context context) {
+        FactoryUtil.CatalogFactoryHelper helper =
+                FactoryUtil.createCatalogFactoryHelper(this, context);
+        helper.validate();
+        ReadableConfig options = helper.getOptions();
+        return new FileSystemCatalog(
+                context.getName(), new Path(options.get(WAREHOUSE)), options.get(DEFAULT_DATABASE));
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/Schema.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/Schema.java
index 2afc641f..253227b4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/Schema.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/Schema.java
@@ -18,14 +18,12 @@
 
 package org.apache.flink.table.store.file.schema;
 
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.MultisetType;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
@@ -193,16 +191,8 @@ public class Schema implements Serializable {
         return Objects.hash(fields, partitionKeys, primaryKeys, options, comment);
     }
 
-    public TableSchema getTableSchema() {
-        TableSchema.Builder builder = TableSchema.builder();
-        for (DataField field : fields) {
-            builder.field(
-                    field.name(), TypeConversions.fromLogicalToDataType(field.type().logicalType));
-        }
-        if (primaryKeys.size() > 0) {
-            builder.primaryKey(primaryKeys.toArray(new String[0]));
-        }
-        return builder.build();
+    public UpdateSchema toUpdateSchema() {
+        return new UpdateSchema(logicalRowType(), partitionKeys, primaryKeys, options, comment);
     }
 
     public static List<DataField> newFields(RowType rowType) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/UpdateSchema.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/UpdateSchema.java
index ed7ea727..29aa4840 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/UpdateSchema.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/UpdateSchema.java
@@ -18,14 +18,21 @@
 
 package org.apache.flink.table.store.file.schema;
 
-import org.apache.flink.table.catalog.ResolvedCatalogTable;
-import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.Schema;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
 
 /** A update schema. */
 public class UpdateSchema {
@@ -40,22 +47,6 @@ public class UpdateSchema {
 
     private final String comment;
 
-    public static UpdateSchema fromCatalogTable(ResolvedCatalogTable catalogTable) {
-        ResolvedSchema schema = catalogTable.getResolvedSchema();
-        RowType rowType = (RowType) schema.toPhysicalRowDataType().getLogicalType();
-        List<String> primaryKeys = new ArrayList<>();
-        if (schema.getPrimaryKey().isPresent()) {
-            primaryKeys = schema.getPrimaryKey().get().getColumns();
-        }
-
-        return new UpdateSchema(
-                rowType,
-                catalogTable.getPartitionKeys(),
-                primaryKeys,
-                catalogTable.getOptions(),
-                catalogTable.getComment());
-    }
-
     public UpdateSchema(
             RowType rowType,
             List<String> partitionKeys,
@@ -104,4 +95,63 @@ public class UpdateSchema {
                 + comment
                 + '}';
     }
+
+    public CatalogTableImpl toCatalogTable() {
+        TableSchema schema;
+        Map<String, String> newOptions = new HashMap<>(options);
+
+        // try to read schema from options
+        // in the case of virtual columns and watermark
+        DescriptorProperties tableSchemaProps = new DescriptorProperties(true);
+        tableSchemaProps.putProperties(newOptions);
+        Optional<TableSchema> optional = tableSchemaProps.getOptionalTableSchema(Schema.SCHEMA);
+        if (optional.isPresent()) {
+            schema = optional.get();
+
+            // remove schema from options
+            DescriptorProperties removeProperties = new DescriptorProperties(false);
+            removeProperties.putTableSchema(SCHEMA, schema);
+            removeProperties.asMap().keySet().forEach(newOptions::remove);
+        } else {
+            TableSchema.Builder builder = TableSchema.builder();
+            for (RowType.RowField field : rowType.getFields()) {
+                builder.field(field.getName(), fromLogicalToDataType(field.getType()));
+            }
+            if (primaryKeys.size() > 0) {
+                builder.primaryKey(primaryKeys.toArray(new String[0]));
+            }
+
+            schema = builder.build();
+        }
+
+        return new CatalogTableImpl(schema, partitionKeys, newOptions, comment);
+    }
+
+    public static UpdateSchema fromCatalogTable(CatalogTable catalogTable) {
+        TableSchema schema = catalogTable.getSchema();
+        RowType rowType = (RowType) schema.toPhysicalRowDataType().getLogicalType();
+        List<String> primaryKeys = new ArrayList<>();
+        if (schema.getPrimaryKey().isPresent()) {
+            primaryKeys = schema.getPrimaryKey().get().getColumns();
+        }
+
+        Map<String, String> options = new HashMap<>(catalogTable.getOptions());
+
+        // Serialize virtual columns and watermark to the options
+        // This is what Flink SQL needs, the storage itself does not need them
+        if (schema.getTableColumns().stream().anyMatch(c -> !c.isPhysical())
+                || schema.getWatermarkSpecs().size() > 0) {
+            DescriptorProperties tableSchemaProps = new DescriptorProperties(true);
+            tableSchemaProps.putTableSchema(
+                    org.apache.flink.table.descriptors.Schema.SCHEMA, schema);
+            options.putAll(tableSchemaProps.asMap());
+        }
+
+        return new UpdateSchema(
+                rowType,
+                catalogTable.getPartitionKeys(),
+                primaryKeys,
+                options,
+                catalogTable.getComment());
+    }
 }
diff --git a/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table-store-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
similarity index 92%
copy from flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
copy to flink-table-store-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 726415ab..8e32399b 100644
--- a/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/flink-table-store-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.table.store.connector.TableStoreManagedFactory
+org.apache.flink.table.store.file.catalog.TableStoreCatalogFactory
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogTest.java
new file mode 100644
index 00000000..93b8439c
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.catalog;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.CatalogTest;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+
+/** Test for {@link FileSystemCatalog}. */
+public class FileSystemCatalogTest extends TableStoreCatalogTest {
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+    @BeforeClass
+    public static void beforeEach() throws IOException {
+        String path = TEMPORARY_FOLDER.newFolder().toURI().toString();
+        catalog = new FileSystemCatalog(CatalogTest.TEST_CATALOG_NAME, new Path(path));
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogTest.java
new file mode 100644
index 00000000..60c52b05
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogTest.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.catalog;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTestBase;
+import org.apache.flink.table.catalog.CatalogTestUtil;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link TableStoreCatalog}. */
+public abstract class TableStoreCatalogTest extends CatalogTestBase {
+
+    @Override
+    protected boolean isGeneric() {
+        return false;
+    }
+
+    @Override
+    public CatalogDatabase createDb() {
+        return new CatalogDatabaseImpl(Collections.emptyMap(), "");
+    }
+
+    @Override
+    public CatalogTable createAnotherTable() {
+        // TODO support change schema, modify it to createAnotherSchema
+        ResolvedSchema resolvedSchema = this.createSchema();
+        CatalogTable origin =
+                CatalogTable.of(
+                        Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
+                        "test comment",
+                        Collections.emptyList(),
+                        this.getBatchTableProperties());
+        return new ResolvedCatalogTable(origin, resolvedSchema);
+    }
+
+    @Override
+    public CatalogTable createAnotherPartitionedTable() {
+        // TODO support change schema, modify it to createAnotherSchema
+        ResolvedSchema resolvedSchema = this.createSchema();
+        CatalogTable origin =
+                CatalogTable.of(
+                        Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
+                        "test comment",
+                        this.createPartitionKeys(),
+                        this.getBatchTableProperties());
+        return new ResolvedCatalogTable(origin, resolvedSchema);
+    }
+
+    @Override
+    public CatalogDatabase createAnotherDb() {
+        // Not support database with properties or comment
+        return new CatalogDatabaseImpl(new HashMap<String, String>() {}, null);
+    }
+
+    @Override
+    public void testAlterTable() throws Exception {
+        catalog.createDatabase("db1", this.createDb(), false);
+        CatalogTable table = this.createTable();
+        catalog.createTable(this.path1, table, false);
+        CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(this.path1));
+        CatalogTable newTable = this.createAnotherTable();
+        catalog.alterTable(this.path1, newTable, false);
+        Assert.assertNotEquals(table, catalog.getTable(this.path1));
+        CatalogTestUtil.checkEquals(newTable, (CatalogTable) catalog.getTable(this.path1));
+        catalog.dropTable(this.path1, false);
+
+        // Not support views
+    }
+
+    @Test
+    public void testListTables() throws Exception {
+        catalog.createDatabase("db1", this.createDb(), false);
+        catalog.createTable(this.path1, this.createTable(), false);
+        catalog.createTable(this.path3, this.createTable(), false);
+        Assert.assertEquals(2L, catalog.listTables("db1").size());
+
+        // Not support views
+    }
+
+    @Override
+    public void testAlterTable_differentTypedTable() {
+        // TODO support this
+    }
+
+    @Test
+    public void testCreateFlinkTable() throws DatabaseAlreadyExistException {
+        // create a flink table
+        CatalogTable table = createTable();
+        HashMap<String, String> newOptions = new HashMap<>(table.getOptions());
+        newOptions.put("connector", "filesystem");
+        CatalogTable newTable = table.copy(newOptions);
+
+        catalog.createDatabase("db1", this.createDb(), false);
+
+        assertThatThrownBy(() -> catalog.createTable(this.path1, newTable, false))
+                .isInstanceOf(CatalogException.class)
+                .hasMessageContaining(
+                        "Table Store Catalog only supports table store tables, not Flink connector: filesystem");
+    }
+
+    // --------------------- unsupported methods ----------------------------
+
+    @Override
+    protected CatalogFunction createFunction() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected CatalogFunction createPythonFunction() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected CatalogFunction createAnotherFunction() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void testCreateView() {}
+
+    @Override
+    public void testAlterDb() {}
+
+    @Override
+    public void testAlterDb_DatabaseNotExistException() {}
+
+    @Override
+    public void testAlterDb_DatabaseNotExist_ignored() {}
+
+    @Override
+    public void testRenameTable_nonPartitionedTable() {}
+
+    @Override
+    public void testRenameTable_TableNotExistException_ignored() {}
+
+    @Override
+    public void testRenameTable_TableNotExistException() {}
+
+    @Override
+    public void testRenameTable_TableAlreadyExistException() {}
+
+    @Override
+    public void testCreateView_DatabaseNotExistException() {}
+
+    @Override
+    public void testCreateView_TableAlreadyExistException() {}
+
+    @Override
+    public void testCreateView_TableAlreadyExist_ignored() {}
+
+    @Override
+    public void testDropView() {}
+
+    @Override
+    public void testAlterView() {}
+
+    @Override
+    public void testAlterView_TableNotExistException() {}
+
+    @Override
+    public void testAlterView_TableNotExist_ignored() {}
+
+    @Override
+    public void testListView() {}
+
+    @Override
+    public void testRenameView() {}
+
+    @Override
+    public void testCreateFunction() {}
+
+    @Override
+    public void testCreatePythonFunction() {}
+
+    @Override
+    public void testCreateFunction_DatabaseNotExistException() {}
+
+    @Override
+    public void testCreateFunction_FunctionAlreadyExistException() {}
+
+    @Override
+    public void testAlterFunction() {}
+
+    @Override
+    public void testAlterPythonFunction() {}
+
+    @Override
+    public void testAlterFunction_FunctionNotExistException() {}
+
+    @Override
+    public void testAlterFunction_FunctionNotExist_ignored() {}
+
+    @Override
+    public void testListFunctions() {}
+
+    @Override
+    public void testListFunctions_DatabaseNotExistException() {}
+
+    @Override
+    public void testGetFunction_FunctionNotExistException() {}
+
+    @Override
+    public void testGetFunction_FunctionNotExistException_NoDb() {}
+
+    @Override
+    public void testDropFunction() {}
+
+    @Override
+    public void testDropFunction_FunctionNotExistException() {}
+
+    @Override
+    public void testDropFunction_FunctionNotExist_ignored() {}
+
+    @Override
+    public void testCreatePartition() {}
+
+    @Override
+    public void testCreatePartition_TableNotExistException() {}
+
+    @Override
+    public void testCreatePartition_TableNotPartitionedException() {}
+
+    @Override
+    public void testCreatePartition_PartitionSpecInvalidException() {}
+
+    @Override
+    public void testCreatePartition_PartitionAlreadyExistsException() {}
+
+    @Override
+    public void testCreatePartition_PartitionAlreadyExists_ignored() {}
+
+    @Override
+    public void testDropPartition() {}
+
+    @Override
+    public void testDropPartition_TableNotExist() {}
+
+    @Override
+    public void testDropPartition_TableNotPartitioned() {}
+
+    @Override
+    public void testDropPartition_PartitionSpecInvalid() {}
+
+    @Override
+    public void testDropPartition_PartitionNotExist() {}
+
+    @Override
+    public void testDropPartition_PartitionNotExist_ignored() {}
+
+    @Override
+    public void testAlterPartition() {}
+
+    @Override
+    public void testAlterPartition_TableNotExist() {}
+
+    @Override
+    public void testAlterPartition_TableNotPartitioned() {}
+
+    @Override
+    public void testAlterPartition_PartitionSpecInvalid() {}
+
+    @Override
+    public void testAlterPartition_PartitionNotExist() {}
+
+    @Override
+    public void testAlterPartition_PartitionNotExist_ignored() {}
+
+    @Override
+    public void testGetPartition_TableNotExist() {}
+
+    @Override
+    public void testGetPartition_TableNotPartitioned() {}
+
+    @Override
+    public void testGetPartition_PartitionSpecInvalid_invalidPartitionSpec() {}
+
+    @Override
+    public void testGetPartition_PartitionSpecInvalid_sizeNotEqual() {}
+
+    @Override
+    public void testGetPartition_PartitionNotExist() {}
+
+    @Override
+    public void testPartitionExists() {}
+
+    @Override
+    public void testListPartitionPartialSpec() {}
+
+    @Override
+    public void testGetTableStats_TableNotExistException() {}
+
+    @Override
+    public void testGetPartitionStats() {}
+
+    @Override
+    public void testAlterTableStats() {}
+
+    @Override
+    public void testAlterTableStats_partitionedTable() {}
+
+    @Override
+    public void testAlterPartitionTableStats() {}
+
+    @Override
+    public void testAlterTableStats_TableNotExistException() {}
+
+    @Override
+    public void testAlterTableStats_TableNotExistException_ignore() {}
+}