You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/13 09:50:49 UTC
[flink-table-store] branch master updated: [FLINK-27755] Introduce a filesystem catalog for table store
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 14dbb700 [FLINK-27755] Introduce a filesystem catalog for table store
14dbb700 is described below
commit 14dbb700f6d5222a03f6b6b8b0b17a1a5a6a4d9c
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Mon Jun 13 17:50:45 2022 +0800
[FLINK-27755] Introduce a filesystem catalog for table store
This closes #137
---
.../connector/TableStoreConnectorFactory.java | 32 ++
.../org.apache.flink.table.factories.Factory | 1 +
.../store/connector/FileSystemCatalogITCase.java | 111 +++++++
flink-table-store-core/pom.xml | 35 +++
.../store/file/catalog/FileSystemCatalog.java | 305 ++++++++++++++++++
.../store/file/catalog/TableStoreCatalog.java | 224 ++++++++++++++
.../file/catalog/TableStoreCatalogFactory.java | 76 +++++
.../flink/table/store/file/schema/Schema.java | 14 +-
.../table/store/file/schema/UpdateSchema.java | 86 ++++--
.../org.apache.flink.table.factories.Factory | 2 +-
.../store/file/catalog/FileSystemCatalogTest.java | 40 +++
.../store/file/catalog/TableStoreCatalogTest.java | 340 +++++++++++++++++++++
12 files changed, 1235 insertions(+), 31 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
new file mode 100644
index 00000000..9c23f21e
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.table.factories.DynamicTableFactory;
+
+import static org.apache.flink.table.store.file.catalog.TableStoreCatalogFactory.IDENTIFIER;
+
+/** A table store {@link DynamicTableFactory} to create source and sink. */
+public class TableStoreConnectorFactory extends AbstractTableStoreFactory {
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+}
diff --git a/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 726415ab..2de717ce 100644
--- a/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -14,3 +14,4 @@
# limitations under the License.
org.apache.flink.table.store.connector.TableStoreManagedFactory
+org.apache.flink.table.store.connector.TableStoreConnectorFactory
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java
new file mode 100644
index 00000000..642d0ddd
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.table.store.file.catalog.FileSystemCatalog;
+import org.apache.flink.table.store.file.utils.BlockingIterator;
+import org.apache.flink.table.store.kafka.KafkaTableTestBase;
+import org.apache.flink.types.Row;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for {@link FileSystemCatalog}. */
+public class FileSystemCatalogITCase extends KafkaTableTestBase {
+
+ @Before
+ public void before() throws IOException {
+ String path = TEMPORARY_FOLDER.newFolder().toURI().toString();
+ tEnv.executeSql(
+ String.format(
+ "CREATE CATALOG fs WITH ('type'='table-store', 'warehouse'='%s')", path));
+ tEnv.useCatalog("fs");
+ env.setParallelism(1);
+ }
+
+ @Test
+ public void testWriteRead() throws Exception {
+ tEnv.executeSql("CREATE TABLE T (a STRING, b STRING, c STRING)");
+ innerTestWriteRead();
+ }
+
+ @Test
+ public void testLogWriteRead() throws Exception {
+ String topic = UUID.randomUUID().toString();
+ createTopicIfNotExists(topic, 1);
+
+ try {
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE T (a STRING, b STRING, c STRING) WITH ("
+ + "'log.system'='kafka', "
+ + "'log.kafka.bootstrap.servers'='%s',"
+ + "'log.topic'='%s'"
+ + ")",
+ getBootstrapServers(), topic));
+ innerTestWriteRead();
+ } finally {
+ deleteTopicIfExists(topic);
+ }
+ }
+
+ @Test
+ public void testLogWriteReadWithVirtual() throws Exception {
+ String topic = UUID.randomUUID().toString();
+ createTopicIfNotExists(topic, 1);
+
+ try {
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE T ("
+ + "a STRING, "
+ + "b STRING, "
+ + "c STRING, "
+ + "d AS CAST(c as INT) + 1"
+ + ") WITH ("
+ + "'log.system'='kafka', "
+ + "'log.kafka.bootstrap.servers'='%s',"
+ + "'log.topic'='%s'"
+ + ")",
+ getBootstrapServers(), topic));
+ BlockingIterator<Row, Row> iterator =
+ BlockingIterator.of(tEnv.from("T").execute().collect());
+ tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5', '6')").await();
+ List<Row> result = iterator.collectAndClose(2);
+ assertThat(result)
+ .containsExactlyInAnyOrder(Row.of("1", "2", "3", 4), Row.of("4", "5", "6", 7));
+ } finally {
+ deleteTopicIfExists(topic);
+ }
+ }
+
+ private void innerTestWriteRead() throws Exception {
+ BlockingIterator<Row, Row> iterator =
+ BlockingIterator.of(tEnv.from("T").execute().collect());
+ tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5', '6')").await();
+ List<Row> result = iterator.collectAndClose(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
+ }
+}
diff --git a/flink-table-store-core/pom.xml b/flink-table-store-core/pom.xml
index b880507d..b039068f 100644
--- a/flink-table-store-core/pom.xml
+++ b/flink-table-store-core/pom.xml
@@ -89,6 +89,41 @@ under the License.
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit4.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
new file mode 100644
index 00000000..c772a867
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.catalog;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.util.function.RunnableWithException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+
+import static org.apache.flink.table.catalog.GenericInMemoryCatalogFactoryOptions.DEFAULT_DATABASE;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
+import static org.apache.flink.table.store.file.catalog.TableStoreCatalogFactory.IDENTIFIER;
+import static org.apache.flink.table.store.file.utils.FileUtils.safelyListFileStatus;
+
+/** A catalog implementation for {@link FileSystem}. */
+public class FileSystemCatalog extends TableStoreCatalog {
+
+ public static final String DB_SUFFIX = ".db";
+
+ public static final CatalogDatabaseImpl DUMMY_DATABASE =
+ new CatalogDatabaseImpl(Collections.emptyMap(), null);
+
+ private final FileSystem fs;
+ private final Path warehouse;
+
+ public FileSystemCatalog(String name, Path warehouse) {
+ this(name, warehouse, DEFAULT_DATABASE.defaultValue());
+ }
+
+ public FileSystemCatalog(String name, Path warehouse, String defaultDatabase) {
+ super(name, defaultDatabase);
+ this.warehouse = warehouse;
+ this.fs = uncheck(warehouse::getFileSystem);
+ uncheck(() -> createDatabase(defaultDatabase, DUMMY_DATABASE, true));
+ }
+
+ @Override
+ public Optional<Factory> getFactory() {
+ return Optional.of(
+ FactoryUtil.discoverFactory(classLoader(), DynamicTableFactory.class, IDENTIFIER));
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ List<String> databases = new ArrayList<>();
+ for (FileStatus status : uncheck(() -> safelyListFileStatus(warehouse))) {
+ Path path = status.getPath();
+ if (status.isDir() && isDatabase(path)) {
+ databases.add(database(path));
+ }
+ }
+ return databases;
+ }
+
+ @Override
+ public CatalogDatabase getDatabase(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+ return new CatalogDatabaseImpl(Collections.emptyMap(), "");
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException {
+ return uncheck(() -> fs.exists(databasePath(databaseName)));
+ }
+
+ @Override
+ public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ if (database.getProperties().size() > 0) {
+ throw new UnsupportedOperationException(
+ "Create database with properties is unsupported.");
+ }
+
+ if (database.getDescription().isPresent() && !database.getDescription().get().equals("")) {
+ throw new UnsupportedOperationException(
+ "Create database with description is unsupported.");
+ }
+
+ if (!ignoreIfExists && databaseExists(name)) {
+ throw new DatabaseAlreadyExistException(getName(), name);
+ }
+
+ uncheck(() -> fs.mkdirs(databasePath(name)));
+ }
+
+ @Override
+ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
+ throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
+ if (!databaseExists(name)) {
+ if (ignoreIfNotExists) {
+ return;
+ }
+
+ throw new DatabaseNotExistException(getName(), name);
+ }
+
+ if (listTables(name).size() > 0) {
+ throw new DatabaseNotEmptyException(getName(), name);
+ }
+
+ uncheck(() -> fs.delete(databasePath(name), true));
+ }
+
+ @Override
+ public List<String> listTables(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+
+ List<String> tables = new ArrayList<>();
+ for (FileStatus status : uncheck(() -> safelyListFileStatus(databasePath(databaseName)))) {
+ if (status.isDir() && tableExists(status.getPath())) {
+ tables.add(status.getPath().getName());
+ }
+ }
+ return tables;
+ }
+
+ @Override
+ public CatalogBaseTable getTable(ObjectPath tablePath)
+ throws TableNotExistException, CatalogException {
+ Path path = tablePath(tablePath);
+ Schema schema =
+ new SchemaManager(path)
+ .latest()
+ .orElseThrow(() -> new TableNotExistException(getName(), tablePath));
+
+ CatalogTable table = schema.toUpdateSchema().toCatalogTable();
+ // add path to source and sink
+ table.getOptions().put(PATH.key(), path.toString());
+ return table;
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+ return tableExists(tablePath(tablePath));
+ }
+
+ private boolean tableExists(Path tablePath) {
+ return new SchemaManager(tablePath).listAllIds().size() > 0;
+ }
+
+ @Override
+ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ Path path = tablePath(tablePath);
+ if (!tableExists(path)) {
+ if (ignoreIfNotExists) {
+ return;
+ }
+
+ throw new TableNotExistException(getName(), tablePath);
+ }
+
+ uncheck(() -> fs.delete(path, true));
+ }
+
+ @Override
+ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+ if (!databaseExists(tablePath.getDatabaseName())) {
+ throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());
+ }
+
+ Path path = tablePath(tablePath);
+ if (tableExists(path)) {
+ if (ignoreIfExists) {
+ return;
+ }
+
+ throw new TableAlreadyExistException(getName(), tablePath);
+ }
+
+ commitTableChange(path, table);
+ }
+
+ @Override
+ public void alterTable(
+ ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ Path path = tablePath(tablePath);
+ if (!tableExists(path)) {
+ if (ignoreIfNotExists) {
+ return;
+ }
+
+ throw new TableNotExistException(getName(), tablePath);
+ }
+
+ commitTableChange(path, newTable);
+ }
+
+ private static <T> T uncheck(Callable<T> callable) {
+ try {
+ return callable.call();
+ } catch (Exception e) {
+ if (e instanceof RuntimeException) {
+ throw (RuntimeException) e;
+ }
+ throw new CatalogException(e);
+ }
+ }
+
+ private static void uncheck(RunnableWithException runnable) {
+ FileSystemCatalog.uncheck(
+ () -> {
+ runnable.run();
+ return null;
+ });
+ }
+
+ private static ClassLoader classLoader() {
+ return FileSystemCatalog.class.getClassLoader();
+ }
+
+ private static boolean isDatabase(Path path) {
+ return path.getName().endsWith(DB_SUFFIX);
+ }
+
+ private static String database(Path path) {
+ String name = path.getName();
+ return name.substring(0, name.length() - DB_SUFFIX.length());
+ }
+
+ private Path databasePath(String database) {
+ return new Path(warehouse, database + DB_SUFFIX);
+ }
+
+ private Path tablePath(ObjectPath objectPath) {
+ return new Path(databasePath(objectPath.getDatabaseName()), objectPath.getObjectName());
+ }
+
+ private void commitTableChange(Path tablePath, CatalogBaseTable baseTable) {
+ if (!(baseTable instanceof ResolvedCatalogTable)) {
+ throw new UnsupportedOperationException(
+ "Only support ResolvedCatalogTable, but is: " + baseTable.getClass());
+ }
+ ResolvedCatalogTable table = (ResolvedCatalogTable) baseTable;
+ Map<String, String> options = table.getOptions();
+ if (options.containsKey(CONNECTOR.key())) {
+ throw new CatalogException(
+ "Table Store Catalog only supports table store tables, not Flink connector: "
+ + options.get(CONNECTOR.key()));
+ }
+
+ // remove table path
+ String specific = options.remove(PATH.key());
+ if (specific != null) {
+ if (!tablePath.equals(new Path(specific))) {
+ throw new IllegalArgumentException(
+ "Illegal table path in table options: " + specific);
+ }
+ table = table.copy(options);
+ }
+
+ UpdateSchema updateSchema = UpdateSchema.fromCatalogTable(table);
+ uncheck(() -> new SchemaManager(tablePath).commitNewVersion(updateSchema));
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/TableStoreCatalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/TableStoreCatalog.java
new file mode 100644
index 00000000..847c1b99
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/TableStoreCatalog.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.catalog;
+
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+
+import java.util.Collections;
+import java.util.List;
+
+/** Catalog for table store. */
+public abstract class TableStoreCatalog extends AbstractCatalog {
+
+ public TableStoreCatalog(String name, String defaultDatabase) {
+ super(name, defaultDatabase);
+ }
+
+ // --------------------- unsupported methods ----------------------------
+
+ @Override
+ public final void open() throws CatalogException {}
+
+ @Override
+ public final void close() throws CatalogException {}
+
+ @Override
+ public final void alterDatabase(
+ String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void renameTable(
+ ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final List<String> listViews(String databaseName) throws CatalogException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public final List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+ throws CatalogException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public final List<CatalogPartitionSpec> listPartitions(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public final List<CatalogPartitionSpec> listPartitionsByFilter(
+ ObjectPath tablePath, List<Expression> filters) throws CatalogException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public final CatalogPartition getPartition(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws PartitionNotExistException, CatalogException {
+ throw new PartitionNotExistException(getName(), tablePath, partitionSpec);
+ }
+
+ @Override
+ public final boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws CatalogException {
+ return false;
+ }
+
+ @Override
+ public final void createPartition(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogPartition partition,
+ boolean ignoreIfExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void dropPartition(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void alterPartition(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogPartition newPartition,
+ boolean ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final List<String> listFunctions(String dbName) throws CatalogException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public final CatalogFunction getFunction(ObjectPath functionPath)
+ throws FunctionNotExistException, CatalogException {
+ throw new FunctionNotExistException(getName(), functionPath);
+ }
+
+ @Override
+ public final boolean functionExists(ObjectPath functionPath) throws CatalogException {
+ return false;
+ }
+
+ @Override
+ public final void createFunction(
+ ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void alterFunction(
+ ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
+ throws CatalogException {
+ return CatalogTableStatistics.UNKNOWN;
+ }
+
+ @Override
+ public final CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath)
+ throws CatalogException {
+ return CatalogColumnStatistics.UNKNOWN;
+ }
+
+ @Override
+ public final CatalogTableStatistics getPartitionStatistics(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
+ return CatalogTableStatistics.UNKNOWN;
+ }
+
+ @Override
+ public final CatalogColumnStatistics getPartitionColumnStatistics(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
+ return CatalogColumnStatistics.UNKNOWN;
+ }
+
+ @Override
+ public final void alterTableStatistics(
+ ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void alterTableColumnStatistics(
+ ObjectPath tablePath,
+ CatalogColumnStatistics columnStatistics,
+ boolean ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void alterPartitionStatistics(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogTableStatistics partitionStatistics,
+ boolean ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void alterPartitionColumnStatistics(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogColumnStatistics columnStatistics,
+ boolean ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogFactory.java
new file mode 100644
index 00000000..c14d0d0e
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.catalog;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
+
+/** Factory for {@link TableStoreCatalog}. */
+public class TableStoreCatalogFactory implements CatalogFactory {
+
+ public static final String IDENTIFIER = "table-store";
+
+ public static final ConfigOption<String> DEFAULT_DATABASE =
+ ConfigOptions.key("default-database").stringType().defaultValue("default");
+
+ public static final ConfigOption<String> WAREHOUSE =
+ ConfigOptions.key("warehouse")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The warehouse root path of catalog.");
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(WAREHOUSE);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(DEFAULT_DATABASE);
+ options.add(PROPERTY_VERSION);
+ return options;
+ }
+
+ @Override
+ public TableStoreCatalog createCatalog(Context context) {
+ FactoryUtil.CatalogFactoryHelper helper =
+ FactoryUtil.createCatalogFactoryHelper(this, context);
+ helper.validate();
+ ReadableConfig options = helper.getOptions();
+ return new FileSystemCatalog(
+ context.getName(), new Path(options.get(WAREHOUSE)), options.get(DEFAULT_DATABASE));
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/Schema.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/Schema.java
index 2afc641f..253227b4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/Schema.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/Schema.java
@@ -18,14 +18,12 @@
package org.apache.flink.table.store.file.schema;
-import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.util.Preconditions;
import java.io.Serializable;
@@ -193,16 +191,8 @@ public class Schema implements Serializable {
return Objects.hash(fields, partitionKeys, primaryKeys, options, comment);
}
- public TableSchema getTableSchema() {
- TableSchema.Builder builder = TableSchema.builder();
- for (DataField field : fields) {
- builder.field(
- field.name(), TypeConversions.fromLogicalToDataType(field.type().logicalType));
- }
- if (primaryKeys.size() > 0) {
- builder.primaryKey(primaryKeys.toArray(new String[0]));
- }
- return builder.build();
+ public UpdateSchema toUpdateSchema() {
+ return new UpdateSchema(logicalRowType(), partitionKeys, primaryKeys, options, comment);
}
public static List<DataField> newFields(RowType rowType) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/UpdateSchema.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/UpdateSchema.java
index ed7ea727..29aa4840 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/UpdateSchema.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/UpdateSchema.java
@@ -18,14 +18,21 @@
package org.apache.flink.table.store.file.schema;
-import org.apache.flink.table.catalog.ResolvedCatalogTable;
-import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.types.logical.RowType;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
/** A update schema. */
public class UpdateSchema {
@@ -40,22 +47,6 @@ public class UpdateSchema {
private final String comment;
- public static UpdateSchema fromCatalogTable(ResolvedCatalogTable catalogTable) {
- ResolvedSchema schema = catalogTable.getResolvedSchema();
- RowType rowType = (RowType) schema.toPhysicalRowDataType().getLogicalType();
- List<String> primaryKeys = new ArrayList<>();
- if (schema.getPrimaryKey().isPresent()) {
- primaryKeys = schema.getPrimaryKey().get().getColumns();
- }
-
- return new UpdateSchema(
- rowType,
- catalogTable.getPartitionKeys(),
- primaryKeys,
- catalogTable.getOptions(),
- catalogTable.getComment());
- }
-
public UpdateSchema(
RowType rowType,
List<String> partitionKeys,
@@ -104,4 +95,63 @@ public class UpdateSchema {
+ comment
+ '}';
}
+
+ public CatalogTableImpl toCatalogTable() {
+ TableSchema schema;
+ Map<String, String> newOptions = new HashMap<>(options);
+
+ // try to read schema from options
+ // in the case of virtual columns and watermark
+ DescriptorProperties tableSchemaProps = new DescriptorProperties(true);
+ tableSchemaProps.putProperties(newOptions);
+ Optional<TableSchema> optional = tableSchemaProps.getOptionalTableSchema(Schema.SCHEMA);
+ if (optional.isPresent()) {
+ schema = optional.get();
+
+ // remove schema from options
+ DescriptorProperties removeProperties = new DescriptorProperties(false);
+ removeProperties.putTableSchema(SCHEMA, schema);
+ removeProperties.asMap().keySet().forEach(newOptions::remove);
+ } else {
+ TableSchema.Builder builder = TableSchema.builder();
+ for (RowType.RowField field : rowType.getFields()) {
+ builder.field(field.getName(), fromLogicalToDataType(field.getType()));
+ }
+ if (primaryKeys.size() > 0) {
+ builder.primaryKey(primaryKeys.toArray(new String[0]));
+ }
+
+ schema = builder.build();
+ }
+
+ return new CatalogTableImpl(schema, partitionKeys, newOptions, comment);
+ }
+
+ public static UpdateSchema fromCatalogTable(CatalogTable catalogTable) {
+ TableSchema schema = catalogTable.getSchema();
+ RowType rowType = (RowType) schema.toPhysicalRowDataType().getLogicalType();
+ List<String> primaryKeys = new ArrayList<>();
+ if (schema.getPrimaryKey().isPresent()) {
+ primaryKeys = schema.getPrimaryKey().get().getColumns();
+ }
+
+ Map<String, String> options = new HashMap<>(catalogTable.getOptions());
+
+ // Serialize virtual columns and watermark to the options
+ // This is what Flink SQL needs, the storage itself does not need them
+ if (schema.getTableColumns().stream().anyMatch(c -> !c.isPhysical())
+ || schema.getWatermarkSpecs().size() > 0) {
+ DescriptorProperties tableSchemaProps = new DescriptorProperties(true);
+ tableSchemaProps.putTableSchema(
+ org.apache.flink.table.descriptors.Schema.SCHEMA, schema);
+ options.putAll(tableSchemaProps.asMap());
+ }
+
+ return new UpdateSchema(
+ rowType,
+ catalogTable.getPartitionKeys(),
+ primaryKeys,
+ options,
+ catalogTable.getComment());
+ }
}
diff --git a/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table-store-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
similarity index 92%
copy from flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
copy to flink-table-store-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 726415ab..8e32399b 100644
--- a/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/flink-table-store-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.table.store.connector.TableStoreManagedFactory
+org.apache.flink.table.store.file.catalog.TableStoreCatalogFactory
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogTest.java
new file mode 100644
index 00000000..93b8439c
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.catalog;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.CatalogTest;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+
+/** Test for {@link FileSystemCatalog}. */
+public class FileSystemCatalogTest extends TableStoreCatalogTest {
+
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+ @BeforeClass
+ public static void beforeEach() throws IOException {
+ String path = TEMPORARY_FOLDER.newFolder().toURI().toString();
+ catalog = new FileSystemCatalog(CatalogTest.TEST_CATALOG_NAME, new Path(path));
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogTest.java
new file mode 100644
index 00000000..60c52b05
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogTest.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.catalog;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTestBase;
+import org.apache.flink.table.catalog.CatalogTestUtil;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link TableStoreCatalog}. */
+public abstract class TableStoreCatalogTest extends CatalogTestBase {
+
+ @Override
+ protected boolean isGeneric() {
+ return false;
+ }
+
+ @Override
+ public CatalogDatabase createDb() {
+ return new CatalogDatabaseImpl(Collections.emptyMap(), "");
+ }
+
+ @Override
+ public CatalogTable createAnotherTable() {
+ // TODO support change schema, modify it to createAnotherSchema
+ ResolvedSchema resolvedSchema = this.createSchema();
+ CatalogTable origin =
+ CatalogTable.of(
+ Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
+ "test comment",
+ Collections.emptyList(),
+ this.getBatchTableProperties());
+ return new ResolvedCatalogTable(origin, resolvedSchema);
+ }
+
+ @Override
+ public CatalogTable createAnotherPartitionedTable() {
+ // TODO support change schema, modify it to createAnotherSchema
+ ResolvedSchema resolvedSchema = this.createSchema();
+ CatalogTable origin =
+ CatalogTable.of(
+ Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
+ "test comment",
+ this.createPartitionKeys(),
+ this.getBatchTableProperties());
+ return new ResolvedCatalogTable(origin, resolvedSchema);
+ }
+
+ @Override
+ public CatalogDatabase createAnotherDb() {
+ // Not support database with properties or comment
+ return new CatalogDatabaseImpl(new HashMap<String, String>() {}, null);
+ }
+
+ @Override
+ public void testAlterTable() throws Exception {
+ catalog.createDatabase("db1", this.createDb(), false);
+ CatalogTable table = this.createTable();
+ catalog.createTable(this.path1, table, false);
+ CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(this.path1));
+ CatalogTable newTable = this.createAnotherTable();
+ catalog.alterTable(this.path1, newTable, false);
+ Assert.assertNotEquals(table, catalog.getTable(this.path1));
+ CatalogTestUtil.checkEquals(newTable, (CatalogTable) catalog.getTable(this.path1));
+ catalog.dropTable(this.path1, false);
+
+ // Not support views
+ }
+
+ @Test
+ public void testListTables() throws Exception {
+ catalog.createDatabase("db1", this.createDb(), false);
+ catalog.createTable(this.path1, this.createTable(), false);
+ catalog.createTable(this.path3, this.createTable(), false);
+ Assert.assertEquals(2L, catalog.listTables("db1").size());
+
+ // Not support views
+ }
+
+ @Override
+ public void testAlterTable_differentTypedTable() {
+ // TODO support this
+ }
+
+ @Test
+ public void testCreateFlinkTable() throws DatabaseAlreadyExistException {
+ // create a flink table
+ CatalogTable table = createTable();
+ HashMap<String, String> newOptions = new HashMap<>(table.getOptions());
+ newOptions.put("connector", "filesystem");
+ CatalogTable newTable = table.copy(newOptions);
+
+ catalog.createDatabase("db1", this.createDb(), false);
+
+ assertThatThrownBy(() -> catalog.createTable(this.path1, newTable, false))
+ .isInstanceOf(CatalogException.class)
+ .hasMessageContaining(
+ "Table Store Catalog only supports table store tables, not Flink connector: filesystem");
+ }
+
+ // --------------------- unsupported methods ----------------------------
+
+ @Override
+ protected CatalogFunction createFunction() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected CatalogFunction createPythonFunction() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected CatalogFunction createAnotherFunction() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void testCreateView() {}
+
+ @Override
+ public void testAlterDb() {}
+
+ @Override
+ public void testAlterDb_DatabaseNotExistException() {}
+
+ @Override
+ public void testAlterDb_DatabaseNotExist_ignored() {}
+
+ @Override
+ public void testRenameTable_nonPartitionedTable() {}
+
+ @Override
+ public void testRenameTable_TableNotExistException_ignored() {}
+
+ @Override
+ public void testRenameTable_TableNotExistException() {}
+
+ @Override
+ public void testRenameTable_TableAlreadyExistException() {}
+
+ @Override
+ public void testCreateView_DatabaseNotExistException() {}
+
+ @Override
+ public void testCreateView_TableAlreadyExistException() {}
+
+ @Override
+ public void testCreateView_TableAlreadyExist_ignored() {}
+
+ @Override
+ public void testDropView() {}
+
+ @Override
+ public void testAlterView() {}
+
+ @Override
+ public void testAlterView_TableNotExistException() {}
+
+ @Override
+ public void testAlterView_TableNotExist_ignored() {}
+
+ @Override
+ public void testListView() {}
+
+ @Override
+ public void testRenameView() {}
+
+ @Override
+ public void testCreateFunction() {}
+
+ @Override
+ public void testCreatePythonFunction() {}
+
+ @Override
+ public void testCreateFunction_DatabaseNotExistException() {}
+
+ @Override
+ public void testCreateFunction_FunctionAlreadyExistException() {}
+
+ @Override
+ public void testAlterFunction() {}
+
+ @Override
+ public void testAlterPythonFunction() {}
+
+ @Override
+ public void testAlterFunction_FunctionNotExistException() {}
+
+ @Override
+ public void testAlterFunction_FunctionNotExist_ignored() {}
+
+ @Override
+ public void testListFunctions() {}
+
+ @Override
+ public void testListFunctions_DatabaseNotExistException() {}
+
+ @Override
+ public void testGetFunction_FunctionNotExistException() {}
+
+ @Override
+ public void testGetFunction_FunctionNotExistException_NoDb() {}
+
+ @Override
+ public void testDropFunction() {}
+
+ @Override
+ public void testDropFunction_FunctionNotExistException() {}
+
+ @Override
+ public void testDropFunction_FunctionNotExist_ignored() {}
+
+ @Override
+ public void testCreatePartition() {}
+
+ @Override
+ public void testCreatePartition_TableNotExistException() {}
+
+ @Override
+ public void testCreatePartition_TableNotPartitionedException() {}
+
+ @Override
+ public void testCreatePartition_PartitionSpecInvalidException() {}
+
+ @Override
+ public void testCreatePartition_PartitionAlreadyExistsException() {}
+
+ @Override
+ public void testCreatePartition_PartitionAlreadyExists_ignored() {}
+
+ @Override
+ public void testDropPartition() {}
+
+ @Override
+ public void testDropPartition_TableNotExist() {}
+
+ @Override
+ public void testDropPartition_TableNotPartitioned() {}
+
+ @Override
+ public void testDropPartition_PartitionSpecInvalid() {}
+
+ @Override
+ public void testDropPartition_PartitionNotExist() {}
+
+ @Override
+ public void testDropPartition_PartitionNotExist_ignored() {}
+
+ @Override
+ public void testAlterPartition() {}
+
+ @Override
+ public void testAlterPartition_TableNotExist() {}
+
+ @Override
+ public void testAlterPartition_TableNotPartitioned() {}
+
+ @Override
+ public void testAlterPartition_PartitionSpecInvalid() {}
+
+ @Override
+ public void testAlterPartition_PartitionNotExist() {}
+
+ @Override
+ public void testAlterPartition_PartitionNotExist_ignored() {}
+
+ @Override
+ public void testGetPartition_TableNotExist() {}
+
+ @Override
+ public void testGetPartition_TableNotPartitioned() {}
+
+ @Override
+ public void testGetPartition_PartitionSpecInvalid_invalidPartitionSpec() {}
+
+ @Override
+ public void testGetPartition_PartitionSpecInvalid_sizeNotEqual() {}
+
+ @Override
+ public void testGetPartition_PartitionNotExist() {}
+
+ @Override
+ public void testPartitionExists() {}
+
+ @Override
+ public void testListPartitionPartialSpec() {}
+
+ @Override
+ public void testGetTableStats_TableNotExistException() {}
+
+ @Override
+ public void testGetPartitionStats() {}
+
+ @Override
+ public void testAlterTableStats() {}
+
+ @Override
+ public void testAlterTableStats_partitionedTable() {}
+
+ @Override
+ public void testAlterPartitionTableStats() {}
+
+ @Override
+ public void testAlterTableStats_TableNotExistException() {}
+
+ @Override
+ public void testAlterTableStats_TableNotExistException_ignore() {}
+}