You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/24 09:00:48 UTC
[flink-table-store] branch master updated: [FLINK-28219] Refactor TableStoreCatalog: Introduce a dedicated Catalog for table store
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new bdcf482 [FLINK-28219] Refactor TableStoreCatalog: Introduce a dedicated Catalog for table store
bdcf482 is described below
commit bdcf482531ac33b77476b7af651ef216e53525de
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Fri Jun 24 17:00:43 2022 +0800
[FLINK-28219] Refactor TableStoreCatalog: Introduce a dedicated Catalog for table store
This closes #173
---
.../flink/table/store/connector/FlinkCatalog.java | 190 ++++++++-
.../table/store/connector/FlinkCatalogFactory.java | 22 +-
.../connector/TableStoreConnectorFactory.java | 2 +-
.../org.apache.flink.table.factories.Factory | 1 +
.../store/connector/FileSystemCatalogITCase.java | 3 +-
.../table/store/connector/FlinkCatalogTest.java | 411 ++++++++++++++++++++
flink-table-store-core/pom.xml | 48 ---
.../flink/table/store/file/catalog/Catalog.java | 249 ++++++++++++
.../store/file/catalog/FileSystemCatalog.java | 177 ++-------
.../org.apache.flink.table.factories.Factory | 16 -
.../store/file/catalog/FileSystemCatalogTest.java | 40 --
.../store/file/catalog/TableStoreCatalogTest.java | 425 ---------------------
12 files changed, 908 insertions(+), 676 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/TableStoreCatalog.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
similarity index 50%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/TableStoreCatalog.java
rename to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
index 847c1b9..232015e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/TableStoreCatalog.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
@@ -16,29 +16,208 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.file.catalog;
+package org.apache.flink.table.store.connector;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.store.file.catalog.Catalog;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.flink.table.store.connector.FlinkCatalogFactory.IDENTIFIER;
+import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
/** Catalog for table store. */
-public abstract class TableStoreCatalog extends AbstractCatalog {
+public class FlinkCatalog extends AbstractCatalog {
+
+ private final Catalog catalog;
- public TableStoreCatalog(String name, String defaultDatabase) {
+ public FlinkCatalog(Catalog catalog, String name, String defaultDatabase) {
super(name, defaultDatabase);
+ this.catalog = catalog;
+ try {
+ this.catalog.createDatabase(defaultDatabase, true);
+ } catch (Catalog.DatabaseAlreadyExistException ignore) {
+ }
+ }
+
+ @VisibleForTesting
+ Catalog catalog() {
+ return catalog;
+ }
+
+ @Override
+ public Optional<Factory> getFactory() {
+ return Optional.of(
+ FactoryUtil.discoverFactory(classLoader(), DynamicTableFactory.class, IDENTIFIER));
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ return catalog.listDatabases();
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException {
+ return catalog.databaseExists(databaseName);
+ }
+
+ @Override
+ public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ if (database != null) {
+ if (database.getProperties().size() > 0) {
+ throw new UnsupportedOperationException(
+ "Create database with properties is unsupported.");
+ }
+
+ if (database.getDescription().isPresent()
+ && !database.getDescription().get().equals("")) {
+ throw new UnsupportedOperationException(
+ "Create database with description is unsupported.");
+ }
+ }
+
+ try {
+ catalog.createDatabase(name, ignoreIfExists);
+ } catch (Catalog.DatabaseAlreadyExistException e) {
+ throw new DatabaseAlreadyExistException(getName(), e.database());
+ }
+ }
+
+ @Override
+ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
+ throws DatabaseNotEmptyException, DatabaseNotExistException, CatalogException {
+ try {
+ catalog.dropDatabase(name, ignoreIfNotExists, cascade);
+ } catch (Catalog.DatabaseNotEmptyException e) {
+ throw new DatabaseNotEmptyException(getName(), e.database());
+ } catch (Catalog.DatabaseNotExistException e) {
+ throw new DatabaseNotExistException(getName(), e.database());
+ }
+ }
+
+ @Override
+ public List<String> listTables(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ try {
+ return catalog.listTables(databaseName);
+ } catch (Catalog.DatabaseNotExistException e) {
+ throw new DatabaseNotExistException(getName(), e.database());
+ }
+ }
+
+ @Override
+ public CatalogBaseTable getTable(ObjectPath tablePath)
+ throws TableNotExistException, CatalogException {
+ TableSchema schema;
+ try {
+ schema = catalog.getTable(tablePath);
+ } catch (Catalog.TableNotExistException e) {
+ throw new TableNotExistException(getName(), e.tablePath());
+ }
+
+ CatalogTable table = schema.toUpdateSchema().toCatalogTable();
+ // add path to source and sink
+ table.getOptions().put(PATH.key(), catalog.getTableLocation(tablePath).toString());
+ return table;
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+ return catalog.tableExists(tablePath);
+ }
+
+ @Override
+ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ try {
+ catalog.dropTable(tablePath, ignoreIfNotExists);
+ } catch (Catalog.TableNotExistException e) {
+ throw new TableNotExistException(getName(), e.tablePath());
+ }
+ }
+
+ @Override
+ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+ try {
+ catalog.createTable(tablePath, convertTableToSchema(tablePath, table), ignoreIfExists);
+ } catch (Catalog.TableAlreadyExistException e) {
+ throw new TableAlreadyExistException(getName(), e.tablePath());
+ } catch (Catalog.DatabaseNotExistException e) {
+ throw new DatabaseNotExistException(getName(), e.database());
+ }
+ }
+
+ @Override
+ public void alterTable(
+ ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ try {
+ catalog.alterTable(
+ tablePath, convertTableToSchema(tablePath, newTable), ignoreIfNotExists);
+ } catch (Catalog.TableNotExistException e) {
+ throw new TableNotExistException(getName(), e.tablePath());
+ }
+ }
+
+ private static ClassLoader classLoader() {
+ return FlinkCatalog.class.getClassLoader();
+ }
+
+ private UpdateSchema convertTableToSchema(ObjectPath tablePath, CatalogBaseTable baseTable) {
+ if (!(baseTable instanceof CatalogTable)) {
+ throw new UnsupportedOperationException(
+ "Only support CatalogTable, but is: " + baseTable.getClass());
+ }
+ CatalogTable table = (CatalogTable) baseTable;
+ Map<String, String> options = table.getOptions();
+ if (options.containsKey(CONNECTOR.key())) {
+ throw new CatalogException(
+ "Table Store Catalog only supports table store tables, not Flink connector: "
+ + options.get(CONNECTOR.key()));
+ }
+
+ // remove table path
+ String specific = options.remove(PATH.key());
+ if (specific != null) {
+ if (!catalog.getTableLocation(tablePath).equals(new Path(specific))) {
+ throw new IllegalArgumentException(
+ "Illegal table path in table options: " + specific);
+ }
+ table = table.copy(options);
+ }
+
+ return UpdateSchema.fromCatalogTable(table);
}
// --------------------- unsupported methods ----------------------------
@@ -49,6 +228,11 @@ public abstract class TableStoreCatalog extends AbstractCatalog {
@Override
public final void close() throws CatalogException {}
+ @Override
+ public CatalogDatabase getDatabase(String databaseName) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public final void alterDatabase(
String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
similarity index 75%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogFactory.java
rename to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
index c14d0d0..6937e88 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.file.catalog;
+package org.apache.flink.table.store.connector;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
@@ -24,14 +24,15 @@ import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.store.file.catalog.Catalog;
import java.util.HashSet;
import java.util.Set;
import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
-/** Factory for {@link TableStoreCatalog}. */
-public class TableStoreCatalogFactory implements CatalogFactory {
+/** Factory for {@link FlinkCatalog}. */
+public class FlinkCatalogFactory implements CatalogFactory {
public static final String IDENTIFIER = "table-store";
@@ -65,12 +66,21 @@ public class TableStoreCatalogFactory implements CatalogFactory {
}
@Override
- public TableStoreCatalog createCatalog(Context context) {
+ public FlinkCatalog createCatalog(Context context) {
FactoryUtil.CatalogFactoryHelper helper =
FactoryUtil.createCatalogFactoryHelper(this, context);
helper.validate();
ReadableConfig options = helper.getOptions();
- return new FileSystemCatalog(
- context.getName(), new Path(options.get(WAREHOUSE)), options.get(DEFAULT_DATABASE));
+ return createCatalog(
+ new Path(options.get(WAREHOUSE)), context.getName(), options.get(DEFAULT_DATABASE));
+ }
+
+ public static FlinkCatalog createCatalog(Path warehouse, String catalogName) {
+ return createCatalog(warehouse, catalogName, DEFAULT_DATABASE.defaultValue());
+ }
+
+ public static FlinkCatalog createCatalog(
+ Path warehouse, String catalogName, String defaultDatabase) {
+ return new FlinkCatalog(Catalog.create(warehouse), catalogName, defaultDatabase);
}
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
index 9c23f21..e5a5a4e 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.store.connector;
import org.apache.flink.table.factories.DynamicTableFactory;
-import static org.apache.flink.table.store.file.catalog.TableStoreCatalogFactory.IDENTIFIER;
+import static org.apache.flink.table.store.connector.FlinkCatalogFactory.IDENTIFIER;
/** A table store {@link DynamicTableFactory} to create source and sink. */
public class TableStoreConnectorFactory extends AbstractTableStoreFactory {
diff --git a/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 2de717c..daec216 100644
--- a/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -15,3 +15,4 @@
org.apache.flink.table.store.connector.TableStoreManagedFactory
org.apache.flink.table.store.connector.TableStoreConnectorFactory
+org.apache.flink.table.store.connector.FlinkCatalogFactory
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java
index 642d0dd..01e8421 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.store.connector;
-import org.apache.flink.table.store.file.catalog.FileSystemCatalog;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.table.store.kafka.KafkaTableTestBase;
import org.apache.flink.types.Row;
@@ -32,7 +31,7 @@ import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
-/** ITCase for {@link FileSystemCatalog}. */
+/** ITCase for {@link FlinkCatalog}. */
public class FileSystemCatalogITCase extends KafkaTableTestBase {
@Before
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java
new file mode 100644
index 0000000..79b8153
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link FlinkCatalog}. */
+public class FlinkCatalogTest {
+
+ @Rule public ExpectedException exception = ExpectedException.none();
+
+ private final ObjectPath path1 = new ObjectPath("db1", "t1");
+ private final ObjectPath path3 = new ObjectPath("db1", "t2");
+ private final ObjectPath nonExistDbPath = ObjectPath.fromString("non.exist");
+ private final ObjectPath nonExistObjectPath = ObjectPath.fromString("db1.nonexist");
+ private Catalog catalog;
+
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+ @Before
+ public void beforeEach() throws IOException {
+ String path = TEMPORARY_FOLDER.newFolder().toURI().toString();
+ catalog = FlinkCatalogFactory.createCatalog(new Path(path), "test-catalog");
+ }
+
+ private ResolvedSchema createSchema() {
+ return new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("first", DataTypes.STRING()),
+ Column.physical("second", DataTypes.INT()),
+ Column.physical("third", DataTypes.STRING())),
+ Collections.emptyList(),
+ null);
+ }
+
+ private CatalogTable createStreamingTable() {
+ ResolvedSchema resolvedSchema = this.createSchema();
+ CatalogTable origin =
+ CatalogTable.of(
+ Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
+ "test comment",
+ Collections.emptyList(),
+ this.getStreamingTableProperties());
+ return new ResolvedCatalogTable(origin, resolvedSchema);
+ }
+
+ private List<String> createPartitionKeys() {
+ return Arrays.asList("second", "third");
+ }
+
+ private Map<String, String> getBatchTableProperties() {
+ return new HashMap<String, String>() {
+ {
+ this.put("is_streaming", "false");
+ }
+ };
+ }
+
+ private Map<String, String> getStreamingTableProperties() {
+ return new HashMap<String, String>() {
+ {
+ this.put("is_streaming", "true");
+ }
+ };
+ }
+
+ private CatalogTable createAnotherTable() {
+ // TODO support change schema, modify it to createAnotherSchema
+ ResolvedSchema resolvedSchema = this.createSchema();
+ CatalogTable origin =
+ CatalogTable.of(
+ Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
+ "test comment",
+ Collections.emptyList(),
+ this.getBatchTableProperties());
+ return new ResolvedCatalogTable(origin, resolvedSchema);
+ }
+
+ private CatalogTable createAnotherPartitionedTable() {
+ // TODO support change schema, modify it to createAnotherSchema
+ ResolvedSchema resolvedSchema = this.createSchema();
+ CatalogTable origin =
+ CatalogTable.of(
+ Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
+ "test comment",
+ this.createPartitionKeys(),
+ this.getBatchTableProperties());
+ return new ResolvedCatalogTable(origin, resolvedSchema);
+ }
+
+ private CatalogTable createTable() {
+ ResolvedSchema resolvedSchema = this.createSchema();
+ CatalogTable origin =
+ CatalogTable.of(
+ Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
+ "test comment",
+ Collections.emptyList(),
+ this.getBatchTableProperties());
+ return new ResolvedCatalogTable(origin, resolvedSchema);
+ }
+
+ private CatalogTable createPartitionedTable() {
+ ResolvedSchema resolvedSchema = this.createSchema();
+ CatalogTable origin =
+ CatalogTable.of(
+ Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
+ "test comment",
+ this.createPartitionKeys(),
+ this.getBatchTableProperties());
+ return new ResolvedCatalogTable(origin, resolvedSchema);
+ }
+
+ @Test
+ public void testAlterTable() throws Exception {
+ catalog.createDatabase(path1.getDatabaseName(), null, false);
+ CatalogTable table = this.createTable();
+ catalog.createTable(this.path1, table, false);
+ checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
+ CatalogTable newTable = this.createAnotherTable();
+ catalog.alterTable(this.path1, newTable, false);
+ Assert.assertNotEquals(table, catalog.getTable(this.path1));
+ checkEquals(path1, newTable, (CatalogTable) catalog.getTable(this.path1));
+ catalog.dropTable(this.path1, false);
+
+ // Not support views
+ }
+
+ @Test
+ public void testListTables() throws Exception {
+ catalog.createDatabase(path1.getDatabaseName(), null, false);
+ catalog.createTable(this.path1, this.createTable(), false);
+ catalog.createTable(this.path3, this.createTable(), false);
+ Assert.assertEquals(2L, catalog.listTables("db1").size());
+
+ // Not support views
+ }
+
+ @Test
+ public void testAlterTable_differentTypedTable() {
+ // TODO support this
+ }
+
+ @Test
+ public void testCreateFlinkTable() {
+ // create a flink table
+ CatalogTable table = createTable();
+ HashMap<String, String> newOptions = new HashMap<>(table.getOptions());
+ newOptions.put("connector", "filesystem");
+ CatalogTable newTable = table.copy(newOptions);
+
+ assertThatThrownBy(() -> catalog.createTable(this.path1, newTable, false))
+ .isInstanceOf(CatalogException.class)
+ .hasMessageContaining(
+ "Table Store Catalog only supports table store tables, not Flink connector: filesystem");
+ }
+
+ @Test
+ public void testCreateTable_Streaming() throws Exception {
+ catalog.createDatabase(path1.getDatabaseName(), null, false);
+ CatalogTable table = createStreamingTable();
+ catalog.createTable(path1, table, false);
+ checkEquals(path1, table, (CatalogTable) catalog.getTable(path1));
+ }
+
+ @Test
+ public void testAlterPartitionedTable() throws Exception {
+ catalog.createDatabase(path1.getDatabaseName(), null, false);
+ CatalogTable table = this.createPartitionedTable();
+ catalog.createTable(this.path1, table, false);
+ checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
+ CatalogTable newTable = this.createAnotherPartitionedTable();
+ catalog.alterTable(this.path1, newTable, false);
+ checkEquals(path1, newTable, (CatalogTable) catalog.getTable(this.path1));
+ }
+
+ @Test
+ public void testCreateTable_Batch() throws Exception {
+ catalog.createDatabase(path1.getDatabaseName(), null, false);
+ CatalogTable table = this.createTable();
+ catalog.createTable(this.path1, table, false);
+ CatalogBaseTable tableCreated = catalog.getTable(this.path1);
+ checkEquals(path1, table, (CatalogTable) tableCreated);
+ Assert.assertEquals("test comment", tableCreated.getDescription().get());
+ List<String> tables = catalog.listTables("db1");
+ Assert.assertEquals(1L, tables.size());
+ Assert.assertEquals(this.path1.getObjectName(), tables.get(0));
+ catalog.dropTable(this.path1, false);
+ }
+
+ @Test
+ public void testCreateTable_TableAlreadyExist_ignored() throws Exception {
+ catalog.createDatabase(path1.getDatabaseName(), null, false);
+ CatalogTable table = this.createTable();
+ catalog.createTable(this.path1, table, false);
+ checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
+ catalog.createTable(this.path1, this.createAnotherTable(), true);
+ checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
+ }
+
+ @Test
+ public void testCreatePartitionedTable_Batch() throws Exception {
+ catalog.createDatabase(path1.getDatabaseName(), null, false);
+ CatalogTable table = this.createPartitionedTable();
+ catalog.createTable(this.path1, table, false);
+ checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
+ List<String> tables = catalog.listTables("db1");
+ Assert.assertEquals(1L, tables.size());
+ Assert.assertEquals(this.path1.getObjectName(), tables.get(0));
+ }
+
+ @Test
+ public void testDropDb_DatabaseNotEmptyException() throws Exception {
+ catalog.createDatabase(path1.getDatabaseName(), null, false);
+ catalog.createTable(this.path1, this.createTable(), false);
+ this.exception.expect(DatabaseNotEmptyException.class);
+ this.exception.expectMessage("Database db1 in catalog test-catalog is not empty");
+ catalog.dropDatabase("db1", true, false);
+ }
+
+ @Test
+ public void testTableExists() throws Exception {
+ catalog.createDatabase(path1.getDatabaseName(), null, false);
+ Assert.assertFalse(catalog.tableExists(this.path1));
+ catalog.createTable(this.path1, this.createTable(), false);
+ Assert.assertTrue(catalog.tableExists(this.path1));
+ }
+
+ @Test
+ public void testAlterTable_TableNotExist_ignored() throws Exception {
+ catalog.alterTable(this.nonExistObjectPath, this.createTable(), true);
+ Assert.assertFalse(catalog.tableExists(this.nonExistObjectPath));
+ }
+
+ @Test
+ public void testDropTable_TableNotExist_ignored() throws Exception {
+ catalog.dropTable(this.nonExistObjectPath, true);
+ }
+
+ @Test
+ public void testCreateTable_TableAlreadyExistException() throws Exception {
+ catalog.createDatabase(path1.getDatabaseName(), null, false);
+ catalog.createTable(this.path1, this.createTable(), false);
+ this.exception.expect(TableAlreadyExistException.class);
+ this.exception.expectMessage("Table (or view) db1.t1 already exists in Catalog");
+ catalog.createTable(this.path1, this.createTable(), false);
+ }
+
+ @Test
+ public void testDropTable_nonPartitionedTable() throws Exception {
+ catalog.createDatabase(path1.getDatabaseName(), null, false);
+ catalog.createTable(this.path1, this.createTable(), false);
+ Assert.assertTrue(catalog.tableExists(this.path1));
+ catalog.dropTable(this.path1, false);
+ Assert.assertFalse(catalog.tableExists(this.path1));
+ }
+
+ @Test
+ public void testGetTable_TableNotExistException() throws Exception {
+ this.exception.expect(TableNotExistException.class);
+ this.exception.expectMessage("Table (or view) db1.nonexist does not exist in Catalog");
+ catalog.getTable(this.nonExistObjectPath);
+ }
+
+ @Test
+ public void testDbExists() throws Exception {
+ catalog.createDatabase(path1.getDatabaseName(), null, false);
+ catalog.createTable(this.path1, this.createTable(), false);
+ Assert.assertTrue(catalog.databaseExists("db1"));
+ }
+
+ @Test
+ public void testDropDb_DatabaseNotExist_Ignore() throws Exception {
+ catalog.dropDatabase("db1", true, false);
+ }
+
+ @Test
+ public void testAlterTable_TableNotExistException() throws Exception {
+ this.exception.expect(TableNotExistException.class);
+ this.exception.expectMessage("Table (or view) non.exist does not exist in Catalog");
+ catalog.alterTable(this.nonExistDbPath, this.createTable(), false);
+ }
+
+ @Test
+ public void testDropTable_TableNotExistException() throws Exception {
+ this.exception.expect(TableNotExistException.class);
+ this.exception.expectMessage("Table (or view) non.exist does not exist in Catalog");
+ catalog.dropTable(this.nonExistDbPath, false);
+ }
+
+ @Test
+ public void testCreateDb_Database() throws Exception {
+ catalog.createDatabase(path1.getDatabaseName(), null, false);
+ List<String> dbs = catalog.listDatabases();
+ assertThat(dbs).hasSize(2);
+ assertThat(new HashSet<>(dbs))
+ .isEqualTo(
+ new HashSet<>(
+ Arrays.asList(
+ path1.getDatabaseName(), catalog.getDefaultDatabase())));
+ }
+
+ @Test
+ public void testCreateDb_DatabaseAlreadyExistException() throws Exception {
+ catalog.createDatabase(path1.getDatabaseName(), null, false);
+
+ assertThatThrownBy(() -> catalog.createDatabase(path1.getDatabaseName(), null, false))
+ .isInstanceOf(DatabaseAlreadyExistException.class)
+ .hasMessage("Database db1 already exists in Catalog test-catalog.");
+ }
+
+ @Test
+ public void testCreateDb_DatabaseWithPropertiesException() {
+ CatalogDatabaseImpl database =
+ new CatalogDatabaseImpl(Collections.singletonMap("haa", "ccc"), null);
+ assertThatThrownBy(() -> catalog.createDatabase(path1.getDatabaseName(), database, false))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("Create database with properties is unsupported.");
+ }
+
+ @Test
+ public void testCreateDb_DatabaseWithCommentException() {
+ CatalogDatabaseImpl database = new CatalogDatabaseImpl(Collections.emptyMap(), "haha");
+ assertThatThrownBy(() -> catalog.createDatabase(path1.getDatabaseName(), database, false))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("Create database with description is unsupported.");
+ }
+
+ @Test
+ public void testCreateTable_DatabaseNotExistException() {
+ assertThat(catalog.databaseExists(path1.getDatabaseName())).isFalse();
+
+ assertThatThrownBy(() -> catalog.createTable(nonExistObjectPath, createTable(), false))
+ .isInstanceOf(DatabaseNotExistException.class)
+ .hasMessage("Database db1 does not exist in Catalog test-catalog.");
+ }
+
+ @Test
+ public void testDropDb_DatabaseNotExistException() {
+ assertThatThrownBy(() -> catalog.dropDatabase(path1.getDatabaseName(), false, false))
+ .isInstanceOf(DatabaseNotExistException.class)
+ .hasMessage("Database db1 does not exist in Catalog test-catalog.");
+ }
+
+ private void checkEquals(ObjectPath path, CatalogTable t1, CatalogTable t2) {
+ Path tablePath = ((FlinkCatalog) catalog).catalog().getTableLocation(path);
+ Map<String, String> options = new HashMap<>(t1.getOptions());
+ options.put("path", tablePath.toString());
+ t1 = ((ResolvedCatalogTable) t1).copy(options);
+ checkEquals(t1, t2);
+ }
+
+ private static void checkEquals(CatalogTable t1, CatalogTable t2) {
+ assertThat(t2.getTableKind()).isEqualTo(t1.getTableKind());
+ assertThat(t2.getSchema()).isEqualTo(t1.getSchema());
+ assertThat(t2.getComment()).isEqualTo(t1.getComment());
+ assertThat(t2.getPartitionKeys()).isEqualTo(t1.getPartitionKeys());
+ assertThat(t2.isPartitioned()).isEqualTo(t1.isPartitioned());
+ assertThat(t2.getOptions()).isEqualTo(t1.getOptions());
+ }
+}
diff --git a/flink-table-store-core/pom.xml b/flink-table-store-core/pom.xml
index e371d9e..b880507 100644
--- a/flink-table-store-core/pom.xml
+++ b/flink-table-store-core/pom.xml
@@ -89,54 +89,6 @@ under the License.
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-common</artifactId>
- <version>${flink.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-api-java</artifactId>
- <version>${flink.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>${junit4.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.junit.vintage</groupId>
- <artifactId>junit-vintage-engine</artifactId>
- <version>${junit5.version}</version>
- <exclusions>
- <exclusion>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </exclusion>
- </exclusions>
- <scope>test</scope>
- </dependency>
</dependencies>
<build>
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
new file mode 100644
index 0000000..01b2366
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.catalog;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+
+import java.util.List;
+
+/**
+ * This interface is responsible for reading and writing metadata such as database/table from a
+ * table store catalog.
+ */
+public interface Catalog extends AutoCloseable {
+
+ /**
+ * Get the names of all databases in this catalog.
+ *
+ * @return a list of the names of all databases
+ */
+ List<String> listDatabases();
+
+ /**
+ * Check if a database exists in this catalog.
+ *
+ * @param databaseName Name of the database
+ * @return true if the given database exists in the catalog false otherwise
+ */
+ boolean databaseExists(String databaseName);
+
+ /**
+ * Create a database.
+ *
+ * @param name Name of the database to be created
+ * @param ignoreIfExists Flag to specify behavior when a database with the given name already
+ * exists: if set to false, throw a DatabaseAlreadyExistException, if set to true, do
+ * nothing.
+ * @throws DatabaseAlreadyExistException if the given database already exists and ignoreIfExists
+ * is false
+ */
+ void createDatabase(String name, boolean ignoreIfExists) throws DatabaseAlreadyExistException;
+
+ /**
+ * Drop a database.
+ *
+ * @param name Name of the database to be dropped.
+ * @param ignoreIfNotExists Flag to specify behavior when the database does not exist: if set to
+ * false, throw an exception, if set to true, do nothing.
+ * @param cascade Flag to specify behavior when the database contains table or function: if set
+ * to true, delete all tables and functions in the database and then delete the database, if
+ * set to false, throw an exception.
+ * @throws DatabaseNotEmptyException if the given database is not empty and isRestrict is true
+ */
+ void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
+ throws DatabaseNotExistException, DatabaseNotEmptyException;
+
+ /**
+ * Get names of all tables under this database. An empty list is returned if none exists.
+ *
+ * @return a list of the names of all tables in this database
+ * @throws DatabaseNotExistException if the database does not exist
+ */
+ List<String> listTables(String databaseName) throws DatabaseNotExistException;
+
+ /**
+ * Return the table location path identified by the given {@link ObjectPath}.
+ *
+ * @param tablePath Path of the table
+ * @return The requested table location
+ */
+ Path getTableLocation(ObjectPath tablePath);
+
+ /**
+ * Return a {@link TableSchema} identified by the given {@link ObjectPath}.
+ *
+ * @param tablePath Path of the table
+ * @return The requested table
+ * @throws TableNotExistException if the target does not exist
+ */
+ TableSchema getTable(ObjectPath tablePath) throws TableNotExistException;
+
+ /**
+ * Check if a table exists in this catalog.
+ *
+ * @param tablePath Path of the table
+ * @return true if the given table exists in the catalog false otherwise
+ */
+ boolean tableExists(ObjectPath tablePath);
+
+ /**
+ * Drop a table.
+ *
+ * @param tablePath Path of the table to be dropped
+ * @param ignoreIfNotExists Flag to specify behavior when the table does not exist: if set to
+ * false, throw an exception, if set to true, do nothing.
+ * @throws TableNotExistException if the table does not exist
+ */
+ void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException;
+
+ /**
+ * Create a new table.
+ *
+ * @param tablePath path of the table to be created
+ * @param tableSchema the table definition
+ * @param ignoreIfExists flag to specify behavior when a table already exists at the given path:
+ * if set to false, it throws a TableAlreadyExistException, if set to true, do nothing.
+ * @throws TableAlreadyExistException if table already exists and ignoreIfExists is false
+ * @throws DatabaseNotExistException if the database in tablePath doesn't exist
+ */
+ void createTable(ObjectPath tablePath, UpdateSchema tableSchema, boolean ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException;
+
+ /**
+ * Modify an existing table.
+ *
+ * @param tablePath path of the table to be modified
+ * @param newTableSchema the new table definition
+ * @param ignoreIfNotExists flag to specify behavior when the table does not exist: if set to
+ * false, throw an exception, if set to true, do nothing.
+ * @throws TableNotExistException if the table does not exist
+ */
+ void alterTable(ObjectPath tablePath, UpdateSchema newTableSchema, boolean ignoreIfNotExists)
+ throws TableNotExistException;
+
+ /** Exception for trying to drop on a database that is not empty. */
+ class DatabaseNotEmptyException extends Exception {
+ private static final String MSG = "Database %s is not empty.";
+
+ private final String database;
+
+ public DatabaseNotEmptyException(String database, Throwable cause) {
+ super(String.format(MSG, database), cause);
+ this.database = database;
+ }
+
+ public DatabaseNotEmptyException(String database) {
+ this(database, null);
+ }
+
+ public String database() {
+ return database;
+ }
+ }
+
+ /** Exception for trying to create a database that already exists. */
+ class DatabaseAlreadyExistException extends Exception {
+ private static final String MSG = "Database %s already exists.";
+
+ private final String database;
+
+ public DatabaseAlreadyExistException(String database, Throwable cause) {
+ super(String.format(MSG, database), cause);
+ this.database = database;
+ }
+
+ public DatabaseAlreadyExistException(String database) {
+ this(database, null);
+ }
+
+ public String database() {
+ return database;
+ }
+ }
+
+ /** Exception for trying to operate on a database that doesn't exist. */
+ class DatabaseNotExistException extends Exception {
+ private static final String MSG = "Database %s does not exist.";
+
+ private final String database;
+
+ public DatabaseNotExistException(String database, Throwable cause) {
+ super(String.format(MSG, database), cause);
+ this.database = database;
+ }
+
+ public DatabaseNotExistException(String database) {
+ this(database, null);
+ }
+
+ public String database() {
+ return database;
+ }
+ }
+
+ /** Exception for trying to create a table that already exists. */
+ class TableAlreadyExistException extends Exception {
+
+ private static final String MSG = "Table %s already exists.";
+
+ private final ObjectPath tablePath;
+
+ public TableAlreadyExistException(ObjectPath tablePath) {
+ this(tablePath, null);
+ }
+
+ public TableAlreadyExistException(ObjectPath tablePath, Throwable cause) {
+ super(String.format(MSG, tablePath.getFullName()), cause);
+ this.tablePath = tablePath;
+ }
+
+ public ObjectPath tablePath() {
+ return tablePath;
+ }
+ }
+
+ /** Exception for trying to operate on a table that doesn't exist. */
+ class TableNotExistException extends Exception {
+
+ private static final String MSG = "Table %s does not exist.";
+
+ private final ObjectPath tablePath;
+
+ public TableNotExistException(ObjectPath tablePath) {
+ this(tablePath, null);
+ }
+
+ public TableNotExistException(ObjectPath tablePath, Throwable cause) {
+ super(String.format(MSG, tablePath.getFullName()), cause);
+ this.tablePath = tablePath;
+ }
+
+ public ObjectPath tablePath() {
+ return tablePath;
+ }
+ }
+
+ /** Create a {@link Catalog} from warehouse path. */
+ static Catalog create(Path warehouse) {
+ return new FileSystemCatalog(warehouse);
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
index f31a00d..d4a451e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
@@ -22,69 +22,32 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogDatabase;
-import org.apache.flink.table.catalog.CatalogDatabaseImpl;
-import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.ResolvedCatalogTable;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.table.factories.Factory;
-import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.schema.UpdateSchema;
-import org.apache.flink.util.function.RunnableWithException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.Callable;
-import static org.apache.flink.table.catalog.GenericInMemoryCatalogFactoryOptions.DEFAULT_DATABASE;
-import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
-import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
-import static org.apache.flink.table.store.file.catalog.TableStoreCatalogFactory.IDENTIFIER;
import static org.apache.flink.table.store.file.utils.FileUtils.safelyListFileStatus;
/** A catalog implementation for {@link FileSystem}. */
-public class FileSystemCatalog extends TableStoreCatalog {
+public class FileSystemCatalog implements Catalog {
public static final String DB_SUFFIX = ".db";
- public static final CatalogDatabaseImpl DUMMY_DATABASE =
- new CatalogDatabaseImpl(Collections.emptyMap(), null);
-
private final FileSystem fs;
private final Path warehouse;
- public FileSystemCatalog(String name, Path warehouse) {
- this(name, warehouse, DEFAULT_DATABASE.defaultValue());
- }
-
- public FileSystemCatalog(String name, Path warehouse, String defaultDatabase) {
- super(name, defaultDatabase);
+ public FileSystemCatalog(Path warehouse) {
this.warehouse = warehouse;
this.fs = uncheck(warehouse::getFileSystem);
- uncheck(() -> createDatabase(defaultDatabase, DUMMY_DATABASE, true));
}
@Override
- public Optional<Factory> getFactory() {
- return Optional.of(
- FactoryUtil.discoverFactory(classLoader(), DynamicTableFactory.class, IDENTIFIER));
- }
-
- @Override
- public List<String> listDatabases() throws CatalogException {
+ public List<String> listDatabases() {
List<String> databases = new ArrayList<>();
for (FileStatus status : uncheck(() -> safelyListFileStatus(warehouse))) {
Path path = status.getPath();
@@ -96,62 +59,44 @@ public class FileSystemCatalog extends TableStoreCatalog {
}
@Override
- public CatalogDatabase getDatabase(String databaseName)
- throws DatabaseNotExistException, CatalogException {
- if (!databaseExists(databaseName)) {
- throw new DatabaseNotExistException(getName(), databaseName);
- }
- return new CatalogDatabaseImpl(Collections.emptyMap(), "");
- }
-
- @Override
- public boolean databaseExists(String databaseName) throws CatalogException {
+ public boolean databaseExists(String databaseName) {
return uncheck(() -> fs.exists(databasePath(databaseName)));
}
@Override
- public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
- throws DatabaseAlreadyExistException, CatalogException {
- if (database.getProperties().size() > 0) {
- throw new UnsupportedOperationException(
- "Create database with properties is unsupported.");
- }
-
- if (database.getDescription().isPresent() && !database.getDescription().get().equals("")) {
- throw new UnsupportedOperationException(
- "Create database with description is unsupported.");
- }
-
- if (!ignoreIfExists && databaseExists(name)) {
- throw new DatabaseAlreadyExistException(getName(), name);
+ public void createDatabase(String name, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException {
+ if (databaseExists(name)) {
+ if (ignoreIfExists) {
+ return;
+ }
+ throw new DatabaseAlreadyExistException(name);
}
-
uncheck(() -> fs.mkdirs(databasePath(name)));
}
@Override
public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
- throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
+ throws DatabaseNotExistException, DatabaseNotEmptyException {
if (!databaseExists(name)) {
if (ignoreIfNotExists) {
return;
}
- throw new DatabaseNotExistException(getName(), name);
+ throw new DatabaseNotExistException(name);
}
- if (listTables(name).size() > 0) {
- throw new DatabaseNotEmptyException(getName(), name);
+ if (!cascade && listTables(name).size() > 0) {
+ throw new DatabaseNotEmptyException(name);
}
uncheck(() -> fs.delete(databasePath(name), true));
}
@Override
- public List<String> listTables(String databaseName)
- throws DatabaseNotExistException, CatalogException {
+ public List<String> listTables(String databaseName) throws DatabaseNotExistException {
if (!databaseExists(databaseName)) {
- throw new DatabaseNotExistException(getName(), databaseName);
+ throw new DatabaseNotExistException(databaseName);
}
List<String> tables = new ArrayList<>();
@@ -164,22 +109,20 @@ public class FileSystemCatalog extends TableStoreCatalog {
}
@Override
- public CatalogBaseTable getTable(ObjectPath tablePath)
- throws TableNotExistException, CatalogException {
+ public Path getTableLocation(ObjectPath tablePath) {
+ return tablePath(tablePath);
+ }
+
+ @Override
+ public TableSchema getTable(ObjectPath tablePath) throws TableNotExistException {
Path path = tablePath(tablePath);
- TableSchema tableSchema =
- new SchemaManager(path)
- .latest()
- .orElseThrow(() -> new TableNotExistException(getName(), tablePath));
-
- CatalogTable table = tableSchema.toUpdateSchema().toCatalogTable();
- // add path to source and sink
- table.getOptions().put(PATH.key(), path.toString());
- return table;
+ return new SchemaManager(path)
+ .latest()
+ .orElseThrow(() -> new TableNotExistException(tablePath));
}
@Override
- public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+ public boolean tableExists(ObjectPath tablePath) {
return tableExists(tablePath(tablePath));
}
@@ -189,24 +132,24 @@ public class FileSystemCatalog extends TableStoreCatalog {
@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
- throws TableNotExistException, CatalogException {
+ throws TableNotExistException {
Path path = tablePath(tablePath);
if (!tableExists(path)) {
if (ignoreIfNotExists) {
return;
}
- throw new TableNotExistException(getName(), tablePath);
+ throw new TableNotExistException(tablePath);
}
uncheck(() -> fs.delete(path, true));
}
@Override
- public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
- throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+ public void createTable(ObjectPath tablePath, UpdateSchema table, boolean ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException {
if (!databaseExists(tablePath.getDatabaseName())) {
- throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());
+ throw new DatabaseNotExistException(tablePath.getDatabaseName());
}
Path path = tablePath(tablePath);
@@ -215,23 +158,22 @@ public class FileSystemCatalog extends TableStoreCatalog {
return;
}
- throw new TableAlreadyExistException(getName(), tablePath);
+ throw new TableAlreadyExistException(tablePath);
}
commitTableChange(path, table);
}
@Override
- public void alterTable(
- ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
- throws TableNotExistException, CatalogException {
+ public void alterTable(ObjectPath tablePath, UpdateSchema newTable, boolean ignoreIfNotExists)
+ throws TableNotExistException {
Path path = tablePath(tablePath);
if (!tableExists(path)) {
if (ignoreIfNotExists) {
return;
}
- throw new TableNotExistException(getName(), tablePath);
+ throw new TableNotExistException(tablePath);
}
commitTableChange(path, newTable);
@@ -241,25 +183,10 @@ public class FileSystemCatalog extends TableStoreCatalog {
try {
return callable.call();
} catch (Exception e) {
- if (e instanceof RuntimeException) {
- throw (RuntimeException) e;
- }
- throw new CatalogException(e);
+ throw new RuntimeException(e);
}
}
- private static void uncheck(RunnableWithException runnable) {
- FileSystemCatalog.uncheck(
- () -> {
- runnable.run();
- return null;
- });
- }
-
- private static ClassLoader classLoader() {
- return FileSystemCatalog.class.getClassLoader();
- }
-
private static boolean isDatabase(Path path) {
return path.getName().endsWith(DB_SUFFIX);
}
@@ -278,30 +205,10 @@ public class FileSystemCatalog extends TableStoreCatalog {
return new Path(databasePath(objectPath.getDatabaseName()), objectPath.getObjectName());
}
- private void commitTableChange(Path tablePath, CatalogBaseTable baseTable) {
- if (!(baseTable instanceof ResolvedCatalogTable)) {
- throw new UnsupportedOperationException(
- "Only support ResolvedCatalogTable, but is: " + baseTable.getClass());
- }
- ResolvedCatalogTable table = (ResolvedCatalogTable) baseTable;
- Map<String, String> options = table.getOptions();
- if (options.containsKey(CONNECTOR.key())) {
- throw new CatalogException(
- "Table Store Catalog only supports table store tables, not Flink connector: "
- + options.get(CONNECTOR.key()));
- }
-
- // remove table path
- String specific = options.remove(PATH.key());
- if (specific != null) {
- if (!tablePath.equals(new Path(specific))) {
- throw new IllegalArgumentException(
- "Illegal table path in table options: " + specific);
- }
- table = table.copy(options);
- }
-
- UpdateSchema updateSchema = UpdateSchema.fromCatalogTable(table);
- uncheck(() -> new SchemaManager(tablePath).commitNewVersion(updateSchema));
+ private void commitTableChange(Path tablePath, UpdateSchema table) {
+ uncheck(() -> new SchemaManager(tablePath).commitNewVersion(table));
}
+
+ @Override
+ public void close() throws Exception {}
}
diff --git a/flink-table-store-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table-store-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
deleted file mode 100644
index 8e32399..0000000
--- a/flink-table-store-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.flink.table.store.file.catalog.TableStoreCatalogFactory
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogTest.java
deleted file mode 100644
index 93b8439..0000000
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogTest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file.catalog;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.catalog.CatalogTest;
-
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.IOException;
-
-/** Test for {@link FileSystemCatalog}. */
-public class FileSystemCatalogTest extends TableStoreCatalogTest {
-
- @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
-
- @BeforeClass
- public static void beforeEach() throws IOException {
- String path = TEMPORARY_FOLDER.newFolder().toURI().toString();
- catalog = new FileSystemCatalog(CatalogTest.TEST_CATALOG_NAME, new Path(path));
- }
-}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogTest.java
deleted file mode 100644
index 83f13f1..0000000
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogTest.java
+++ /dev/null
@@ -1,425 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file.catalog;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogDatabase;
-import org.apache.flink.table.catalog.CatalogDatabaseImpl;
-import org.apache.flink.table.catalog.CatalogFunction;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTestBase;
-import org.apache.flink.table.catalog.CatalogTestUtil;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.ResolvedCatalogTable;
-import org.apache.flink.table.catalog.ResolvedSchema;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/** Test for {@link TableStoreCatalog}. */
-public abstract class TableStoreCatalogTest extends CatalogTestBase {
-
- @Override
- protected boolean isGeneric() {
- return false;
- }
-
- @Override
- protected Map<String, String> getBatchTableProperties() {
- return new HashMap<String, String>() {
- {
- this.put("is_streaming", "false");
- }
- };
- }
-
- @Override
- protected Map<String, String> getStreamingTableProperties() {
- return new HashMap<String, String>() {
- {
- this.put("is_streaming", "true");
- }
- };
- }
-
- @Override
- public CatalogDatabase createDb() {
- return new CatalogDatabaseImpl(Collections.emptyMap(), "");
- }
-
- @Override
- public CatalogTable createAnotherTable() {
- // TODO support change schema, modify it to createAnotherSchema
- ResolvedSchema resolvedSchema = this.createSchema();
- CatalogTable origin =
- CatalogTable.of(
- Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
- "test comment",
- Collections.emptyList(),
- this.getBatchTableProperties());
- return new ResolvedCatalogTable(origin, resolvedSchema);
- }
-
- @Override
- public CatalogTable createAnotherPartitionedTable() {
- // TODO support change schema, modify it to createAnotherSchema
- ResolvedSchema resolvedSchema = this.createSchema();
- CatalogTable origin =
- CatalogTable.of(
- Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
- "test comment",
- this.createPartitionKeys(),
- this.getBatchTableProperties());
- return new ResolvedCatalogTable(origin, resolvedSchema);
- }
-
- @Override
- public CatalogDatabase createAnotherDb() {
- // Not support database with properties or comment
- return new CatalogDatabaseImpl(new HashMap<String, String>() {}, null);
- }
-
- @Override
- public void testAlterTable() throws Exception {
- catalog.createDatabase("db1", this.createDb(), false);
- CatalogTable table = this.createTable();
- catalog.createTable(this.path1, table, false);
- checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
- CatalogTable newTable = this.createAnotherTable();
- catalog.alterTable(this.path1, newTable, false);
- Assert.assertNotEquals(table, catalog.getTable(this.path1));
- checkEquals(path1, newTable, (CatalogTable) catalog.getTable(this.path1));
- catalog.dropTable(this.path1, false);
-
- // Not support views
- }
-
- @Test
- public void testListTables() throws Exception {
- catalog.createDatabase("db1", this.createDb(), false);
- catalog.createTable(this.path1, this.createTable(), false);
- catalog.createTable(this.path3, this.createTable(), false);
- Assert.assertEquals(2L, catalog.listTables("db1").size());
-
- // Not support views
- }
-
- @Override
- public void testAlterTable_differentTypedTable() {
- // TODO support this
- }
-
- @Test
- public void testCreateFlinkTable() throws DatabaseAlreadyExistException {
- // create a flink table
- CatalogTable table = createTable();
- HashMap<String, String> newOptions = new HashMap<>(table.getOptions());
- newOptions.put("connector", "filesystem");
- CatalogTable newTable = table.copy(newOptions);
-
- catalog.createDatabase("db1", this.createDb(), false);
-
- assertThatThrownBy(() -> catalog.createTable(this.path1, newTable, false))
- .isInstanceOf(CatalogException.class)
- .hasMessageContaining(
- "Table Store Catalog only supports table store tables, not Flink connector: filesystem");
- }
-
- @Test
- public void testCreateTable_Streaming() throws Exception {
- catalog.createDatabase("db1", createDb(), false);
- CatalogTable table = createStreamingTable();
- catalog.createTable(path1, table, false);
- checkEquals(path1, table, (CatalogTable) catalog.getTable(path1));
- }
-
- @Test
- public void testAlterPartitionedTable() throws Exception {
- catalog.createDatabase("db1", this.createDb(), false);
- CatalogTable table = this.createPartitionedTable();
- catalog.createTable(this.path1, table, false);
- checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
- CatalogTable newTable = this.createAnotherPartitionedTable();
- catalog.alterTable(this.path1, newTable, false);
- checkEquals(path1, newTable, (CatalogTable) catalog.getTable(this.path1));
- }
-
- @Test
- public void testCreateTable_Batch() throws Exception {
- catalog.createDatabase("db1", this.createDb(), false);
- CatalogTable table = this.createTable();
- catalog.createTable(this.path1, table, false);
- CatalogBaseTable tableCreated = catalog.getTable(this.path1);
- checkEquals(path1, table, (CatalogTable) tableCreated);
- Assert.assertEquals("test comment", tableCreated.getDescription().get());
- List<String> tables = catalog.listTables("db1");
- Assert.assertEquals(1L, tables.size());
- Assert.assertEquals(this.path1.getObjectName(), tables.get(0));
- catalog.dropTable(this.path1, false);
- }
-
- @Test
- public void testCreateTable_TableAlreadyExist_ignored() throws Exception {
- catalog.createDatabase("db1", this.createDb(), false);
- CatalogTable table = this.createTable();
- catalog.createTable(this.path1, table, false);
- checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
- catalog.createTable(this.path1, this.createAnotherTable(), true);
- checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
- }
-
- @Test
- public void testCreatePartitionedTable_Batch() throws Exception {
- catalog.createDatabase("db1", this.createDb(), false);
- CatalogTable table = this.createPartitionedTable();
- catalog.createTable(this.path1, table, false);
- checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
- List<String> tables = catalog.listTables("db1");
- Assert.assertEquals(1L, tables.size());
- Assert.assertEquals(this.path1.getObjectName(), tables.get(0));
- }
-
- private void checkEquals(ObjectPath path, CatalogTable t1, CatalogTable t2) {
- Path tablePath = ((FileSystemCatalog) catalog).tablePath(path);
- Map<String, String> options = new HashMap<>(t1.getOptions());
- options.put("path", tablePath.toString());
- t1 = ((ResolvedCatalogTable) t1).copy(options);
- CatalogTestUtil.checkEquals(t1, t2);
- }
-
- // --------------------- unsupported methods ----------------------------
-
- @Override
- protected CatalogFunction createFunction() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected CatalogFunction createPythonFunction() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected CatalogFunction createAnotherFunction() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void testCreateView() {}
-
- @Override
- public void testAlterDb() {}
-
- @Override
- public void testAlterDb_DatabaseNotExistException() {}
-
- @Override
- public void testAlterDb_DatabaseNotExist_ignored() {}
-
- @Override
- public void testRenameTable_nonPartitionedTable() {}
-
- @Override
- public void testRenameTable_TableNotExistException_ignored() {}
-
- @Override
- public void testRenameTable_TableNotExistException() {}
-
- @Override
- public void testRenameTable_TableAlreadyExistException() {}
-
- @Override
- public void testCreateView_DatabaseNotExistException() {}
-
- @Override
- public void testCreateView_TableAlreadyExistException() {}
-
- @Override
- public void testCreateView_TableAlreadyExist_ignored() {}
-
- @Override
- public void testDropView() {}
-
- @Override
- public void testAlterView() {}
-
- @Override
- public void testAlterView_TableNotExistException() {}
-
- @Override
- public void testAlterView_TableNotExist_ignored() {}
-
- @Override
- public void testListView() {}
-
- @Override
- public void testRenameView() {}
-
- @Override
- public void testCreateFunction() {}
-
- @Override
- public void testCreatePythonFunction() {}
-
- @Override
- public void testCreateFunction_DatabaseNotExistException() {}
-
- @Override
- public void testCreateFunction_FunctionAlreadyExistException() {}
-
- @Override
- public void testAlterFunction() {}
-
- @Override
- public void testAlterPythonFunction() {}
-
- @Override
- public void testAlterFunction_FunctionNotExistException() {}
-
- @Override
- public void testAlterFunction_FunctionNotExist_ignored() {}
-
- @Override
- public void testListFunctions() {}
-
- @Override
- public void testListFunctions_DatabaseNotExistException() {}
-
- @Override
- public void testGetFunction_FunctionNotExistException() {}
-
- @Override
- public void testGetFunction_FunctionNotExistException_NoDb() {}
-
- @Override
- public void testDropFunction() {}
-
- @Override
- public void testDropFunction_FunctionNotExistException() {}
-
- @Override
- public void testDropFunction_FunctionNotExist_ignored() {}
-
- @Override
- public void testCreatePartition() {}
-
- @Override
- public void testCreatePartition_TableNotExistException() {}
-
- @Override
- public void testCreatePartition_TableNotPartitionedException() {}
-
- @Override
- public void testCreatePartition_PartitionSpecInvalidException() {}
-
- @Override
- public void testCreatePartition_PartitionAlreadyExistsException() {}
-
- @Override
- public void testCreatePartition_PartitionAlreadyExists_ignored() {}
-
- @Override
- public void testDropPartition() {}
-
- @Override
- public void testDropPartition_TableNotExist() {}
-
- @Override
- public void testDropPartition_TableNotPartitioned() {}
-
- @Override
- public void testDropPartition_PartitionSpecInvalid() {}
-
- @Override
- public void testDropPartition_PartitionNotExist() {}
-
- @Override
- public void testDropPartition_PartitionNotExist_ignored() {}
-
- @Override
- public void testAlterPartition() {}
-
- @Override
- public void testAlterPartition_TableNotExist() {}
-
- @Override
- public void testAlterPartition_TableNotPartitioned() {}
-
- @Override
- public void testAlterPartition_PartitionSpecInvalid() {}
-
- @Override
- public void testAlterPartition_PartitionNotExist() {}
-
- @Override
- public void testAlterPartition_PartitionNotExist_ignored() {}
-
- @Override
- public void testGetPartition_TableNotExist() {}
-
- @Override
- public void testGetPartition_TableNotPartitioned() {}
-
- @Override
- public void testGetPartition_PartitionSpecInvalid_invalidPartitionSpec() {}
-
- @Override
- public void testGetPartition_PartitionSpecInvalid_sizeNotEqual() {}
-
- @Override
- public void testGetPartition_PartitionNotExist() {}
-
- @Override
- public void testPartitionExists() {}
-
- @Override
- public void testListPartitionPartialSpec() {}
-
- @Override
- public void testGetTableStats_TableNotExistException() {}
-
- @Override
- public void testGetPartitionStats() {}
-
- @Override
- public void testAlterTableStats() {}
-
- @Override
- public void testAlterTableStats_partitionedTable() {}
-
- @Override
- public void testAlterPartitionTableStats() {}
-
- @Override
- public void testAlterTableStats_TableNotExistException() {}
-
- @Override
- public void testAlterTableStats_TableNotExistException_ignore() {}
-}