You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2021/12/17 08:03:06 UTC
[flink] 04/04: [FLINK-25174][table] Implement callback for managed table
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f01e911a0d56ecd2e40aa81d1808067515c9610c
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Fri Dec 17 15:56:01 2021 +0800
[FLINK-25174][table] Implement callback for managed table
This closes #18088
---
.../apache/flink/table/catalog/CatalogManager.java | 54 ++++++--
.../flink/table/catalog/ManagedTableListener.java | 137 ++++++++++++++++++
.../flink/table/api/TableEnvironmentTest.java | 153 +++++++++++++++++----
3 files changed, 312 insertions(+), 32 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index f0648b8..496c936 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -84,8 +84,13 @@ public final class CatalogManager {
private final DataTypeFactory typeFactory;
+ private final ManagedTableListener managedTableListener;
+
private CatalogManager(
- String defaultCatalogName, Catalog defaultCatalog, DataTypeFactory typeFactory) {
+ String defaultCatalogName,
+ Catalog defaultCatalog,
+ DataTypeFactory typeFactory,
+ ManagedTableListener managedTableListener) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(defaultCatalogName),
"Default catalog name cannot be null or empty");
@@ -101,6 +106,7 @@ public final class CatalogManager {
builtInCatalogName = defaultCatalogName;
this.typeFactory = typeFactory;
+ this.managedTableListener = managedTableListener;
}
public static Builder newBuilder() {
@@ -147,7 +153,8 @@ public final class CatalogManager {
return new CatalogManager(
defaultCatalogName,
defaultCatalog,
- new DataTypeFactoryImpl(classLoader, config, executionConfig));
+ new DataTypeFactoryImpl(classLoader, config, executionConfig),
+ new ManagedTableListener(classLoader, config));
}
}
@@ -656,8 +663,18 @@ public final class CatalogManager {
public void createTable(
CatalogBaseTable table, ObjectIdentifier objectIdentifier, boolean ignoreIfExists) {
execute(
- (catalog, path) ->
- catalog.createTable(path, resolveCatalogBaseTable(table), ignoreIfExists),
+ (catalog, path) -> {
+ ResolvedCatalogBaseTable<?> resolvedTable = resolveCatalogBaseTable(table);
+ ResolvedCatalogBaseTable<?> resolvedListenedTable =
+ managedTableListener.notifyTableCreation(
+ catalog,
+ objectIdentifier,
+ resolvedTable,
+ false,
+ ignoreIfExists);
+
+ catalog.createTable(path, resolvedListenedTable, ignoreIfExists);
+ },
objectIdentifier,
false,
"CreateTable");
@@ -686,13 +703,21 @@ public final class CatalogManager {
}
return v;
} else {
- final CatalogBaseTable resolvedTable = resolveCatalogBaseTable(table);
+ ResolvedCatalogBaseTable<?> resolvedTable = resolveCatalogBaseTable(table);
+ ResolvedCatalogBaseTable<?> resolvedListenedTable =
+ managedTableListener.notifyTableCreation(
+ getCatalog(objectIdentifier.getCatalogName()).orElse(null),
+ objectIdentifier,
+ resolvedTable,
+ true,
+ ignoreIfExists);
+
if (listener.isPresent()) {
return listener.get()
.onCreateTemporaryTable(
- objectIdentifier.toObjectPath(), resolvedTable);
+ objectIdentifier.toObjectPath(), resolvedListenedTable);
}
- return resolvedTable;
+ return resolvedListenedTable;
}
});
}
@@ -729,6 +754,12 @@ public final class CatalogManager {
if (filter.test(catalogBaseTable)) {
getTemporaryOperationListener(objectIdentifier)
.ifPresent(l -> l.onDropTemporaryTable(objectIdentifier.toObjectPath()));
+
+ Catalog catalog = catalogs.get(objectIdentifier.getCatalogName());
+ ResolvedCatalogBaseTable<?> resolvedTable = resolveCatalogBaseTable(catalogBaseTable);
+ managedTableListener.notifyTableDrop(
+ catalog, objectIdentifier, resolvedTable, true, ignoreIfNotExists);
+
temporaryTables.remove(objectIdentifier);
} else if (!ignoreIfNotExists) {
throw new ValidationException(
@@ -808,7 +839,14 @@ public final class CatalogManager {
final Optional<CatalogBaseTable> resultOpt = getUnresolvedTable(objectIdentifier);
if (resultOpt.isPresent() && filter.test(resultOpt.get())) {
execute(
- (catalog, path) -> catalog.dropTable(path, ignoreIfNotExists),
+ (catalog, path) -> {
+ ResolvedCatalogBaseTable<?> resolvedTable =
+ resolveCatalogBaseTable(resultOpt.get());
+ managedTableListener.notifyTableDrop(
+ catalog, objectIdentifier, resolvedTable, false, ignoreIfNotExists);
+
+ catalog.dropTable(path, ignoreIfNotExists);
+ },
objectIdentifier,
ignoreIfNotExists,
"DropTable");
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java
new file mode 100644
index 0000000..211dce7
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+import static org.apache.flink.table.factories.ManagedTableFactory.discoverManagedTableFactory;
+
+/** The listener for managed table operations. */
+@Internal
+public class ManagedTableListener {
+
+ private final ClassLoader classLoader;
+
+ private final ReadableConfig config;
+
+ public ManagedTableListener(ClassLoader classLoader, ReadableConfig config) {
+ this.classLoader = classLoader;
+ this.config = config;
+ }
+
+ /** Notify for creating managed table. */
+ public ResolvedCatalogBaseTable<?> notifyTableCreation(
+ @Nullable Catalog catalog,
+ ObjectIdentifier identifier,
+ ResolvedCatalogBaseTable<?> table,
+ boolean isTemporary,
+ boolean ignoreIfExists) {
+ if (isManagedTable(catalog, table)) {
+ ResolvedCatalogTable managedTable = enrichOptions(identifier, table, isTemporary);
+ discoverManagedTableFactory(classLoader)
+ .onCreateTable(
+ createTableFactoryContext(identifier, managedTable, isTemporary),
+ ignoreIfExists);
+ return managedTable;
+ }
+ return table;
+ }
+
+ /** Notify for dropping managed table. */
+ public void notifyTableDrop(
+ @Nullable Catalog catalog,
+ ObjectIdentifier identifier,
+ ResolvedCatalogBaseTable<?> table,
+ boolean isTemporary,
+ boolean ignoreIfNotExists) {
+ if (isManagedTable(catalog, table)) {
+ discoverManagedTableFactory(classLoader)
+ .onDropTable(
+ createTableFactoryContext(
+ identifier, (ResolvedCatalogTable) table, isTemporary),
+ ignoreIfNotExists);
+ }
+ }
+
+ private boolean isManagedTable(@Nullable Catalog catalog, ResolvedCatalogBaseTable<?> table) {
+ if (catalog == null || !catalog.supportsManagedTable()) {
+ // catalog not support managed table
+ return false;
+ }
+
+ if (table.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
+ // view is not managed table
+ return false;
+ }
+
+ Map<String, String> options;
+ try {
+ options = table.getOptions();
+ } catch (TableException ignore) {
+ // exclude abnormal tables, such as InlineCatalogTable that does not have the options
+ return false;
+ }
+
+ if (!StringUtils.isNullOrWhitespaceOnly(
+ options.get(ConnectorDescriptorValidator.CONNECTOR_TYPE))) {
+ // legacy connector is not managed table
+ return false;
+ }
+
+ if (!StringUtils.isNullOrWhitespaceOnly(options.get(FactoryUtil.CONNECTOR.key()))) {
+ // with connector is not managed table
+ return false;
+ }
+
+ // ConnectorCatalogTable is not managed table
+ return !(table.getOrigin() instanceof ConnectorCatalogTable);
+ }
+
+ /** Enrich options for creating managed table. */
+ private ResolvedCatalogTable enrichOptions(
+ ObjectIdentifier identifier, ResolvedCatalogBaseTable<?> table, boolean isTemporary) {
+ if (!(table instanceof ResolvedCatalogTable)) {
+ throw new UnsupportedOperationException(
+ "Managed table only supports catalog table, unsupported table type: "
+ + table.getClass());
+ }
+ ResolvedCatalogTable resolvedTable = (ResolvedCatalogTable) table;
+ Map<String, String> newOptions =
+ discoverManagedTableFactory(classLoader)
+ .enrichOptions(
+ createTableFactoryContext(identifier, resolvedTable, isTemporary));
+ return resolvedTable.copy(newOptions);
+ }
+
+ private DynamicTableFactory.Context createTableFactoryContext(
+ ObjectIdentifier identifier, ResolvedCatalogTable table, boolean isTemporary) {
+ return new FactoryUtil.DefaultDynamicTableContext(
+ identifier, table, config, classLoader, isTemporary);
+ }
+}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java
index 18542bc..c4c12bb 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java
@@ -24,15 +24,22 @@ import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.operations.CatalogQueryOperation;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.operations.ddl.DropTableOperation;
import org.apache.flink.table.utils.TableEnvironmentMock;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import java.util.Map;
import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.apache.flink.table.factories.TestManagedTableFactory.ENRICHED_KEY;
+import static org.apache.flink.table.factories.TestManagedTableFactory.ENRICHED_VALUE;
+import static org.apache.flink.table.factories.TestManagedTableFactory.MANAGED_TABLES;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link TableEnvironment}. */
public class TableEnvironmentTest {
@@ -48,20 +55,21 @@ public class TableEnvironmentTest {
"T",
TableDescriptor.forConnector("fake").schema(schema).option("a", "Test").build());
- assertFalse(
- tEnv.getCatalog(catalog)
- .orElseThrow(AssertionError::new)
- .tableExists(new ObjectPath(database, "T")));
+ assertThat(
+ tEnv.getCatalog(catalog)
+ .orElseThrow(AssertionError::new)
+ .tableExists(new ObjectPath(database, "T")))
+ .isFalse();
final Optional<CatalogManager.TableLookupResult> lookupResult =
tEnv.getCatalogManager().getTable(ObjectIdentifier.of(catalog, database, "T"));
- assertTrue(lookupResult.isPresent());
+ assertThat(lookupResult.isPresent()).isTrue();
final CatalogBaseTable catalogTable = lookupResult.get().getTable();
- assertTrue(catalogTable instanceof CatalogTable);
- assertEquals(schema, catalogTable.getUnresolvedSchema());
- assertEquals("fake", catalogTable.getOptions().get("connector"));
- assertEquals("Test", catalogTable.getOptions().get("a"));
+ assertThat(catalogTable instanceof CatalogTable).isTrue();
+ assertThat(catalogTable.getUnresolvedSchema()).isEqualTo(schema);
+ assertThat(catalogTable.getOptions().get("connector")).isEqualTo("fake");
+ assertThat(catalogTable.getOptions().get("a")).isEqualTo("Test");
}
@Test
@@ -76,15 +84,18 @@ public class TableEnvironmentTest {
TableDescriptor.forConnector("fake").schema(schema).option("a", "Test").build());
final ObjectPath objectPath = new ObjectPath(database, "T");
- assertTrue(
- tEnv.getCatalog(catalog).orElseThrow(AssertionError::new).tableExists(objectPath));
+ assertThat(
+ tEnv.getCatalog(catalog)
+ .orElseThrow(AssertionError::new)
+ .tableExists(objectPath))
+ .isTrue();
final CatalogBaseTable catalogTable =
tEnv.getCatalog(catalog).orElseThrow(AssertionError::new).getTable(objectPath);
- assertTrue(catalogTable instanceof CatalogTable);
- assertEquals(schema, catalogTable.getUnresolvedSchema());
- assertEquals("fake", catalogTable.getOptions().get("connector"));
- assertEquals("Test", catalogTable.getOptions().get("a"));
+ assertThat(catalogTable instanceof CatalogTable).isTrue();
+ assertThat(catalogTable.getUnresolvedSchema()).isEqualTo(schema);
+ assertThat(catalogTable.getOptions().get("connector")).isEqualTo("fake");
+ assertThat(catalogTable.getOptions().get("a")).isEqualTo("Test");
}
@Test
@@ -97,17 +108,111 @@ public class TableEnvironmentTest {
final Table table = tEnv.from(descriptor);
- assertEquals(
- schema, Schema.newBuilder().fromResolvedSchema(table.getResolvedSchema()).build());
+ assertThat(Schema.newBuilder().fromResolvedSchema(table.getResolvedSchema()).build())
+ .isEqualTo(schema);
- assertTrue(table.getQueryOperation() instanceof CatalogQueryOperation);
+ assertThat(table.getQueryOperation() instanceof CatalogQueryOperation).isTrue();
final ObjectIdentifier tableIdentifier =
((CatalogQueryOperation) table.getQueryOperation()).getTableIdentifier();
final Optional<CatalogManager.TableLookupResult> lookupResult =
tEnv.getCatalogManager().getTable(tableIdentifier);
- assertTrue(lookupResult.isPresent());
+ assertThat(lookupResult.isPresent()).isTrue();
+
+ assertThat(lookupResult.get().getTable().getOptions().get("connector")).isEqualTo("fake");
+ }
+
+ @Test
+ public void testManagedTable() {
+ innerTestManagedTableFromDescriptor(false, false);
+ }
- assertEquals("fake", lookupResult.get().getTable().getOptions().get("connector"));
+ @Test
+ public void testManagedTableWithIgnoreExists() {
+ innerTestManagedTableFromDescriptor(true, false);
+ }
+
+ @Test
+ public void testTemporaryManagedTableWithIgnoreExists() {
+ innerTestManagedTableFromDescriptor(true, true);
+ }
+
+ @Test
+ public void testTemporaryManagedTable() {
+ innerTestManagedTableFromDescriptor(true, true);
+ }
+
+ private void innerTestManagedTableFromDescriptor(boolean ignoreIfExists, boolean isTemporary) {
+ final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance();
+ final String catalog = tEnv.getCurrentCatalog();
+ final String database = tEnv.getCurrentDatabase();
+
+ final Schema schema = Schema.newBuilder().column("f0", DataTypes.INT()).build();
+ final String tableName = UUID.randomUUID().toString();
+ ObjectIdentifier identifier = ObjectIdentifier.of(catalog, database, tableName);
+
+ // create table
+ MANAGED_TABLES.put(identifier, new AtomicReference<>());
+ CreateTableOperation createOperation =
+ new CreateTableOperation(
+ identifier,
+ TableDescriptor.forManaged()
+ .schema(schema)
+ .option("a", "Test")
+ .build()
+ .toCatalogTable(),
+ ignoreIfExists,
+ isTemporary);
+
+ tEnv.executeInternal(createOperation);
+
+ // test ignore: create again
+ if (ignoreIfExists) {
+ tEnv.executeInternal(createOperation);
+ } else {
+ assertThatThrownBy(
+ () -> tEnv.executeInternal(createOperation),
+ isTemporary ? "already exists" : "Could not execute CreateTable");
+ }
+
+ // lookup table
+
+ boolean isInCatalog =
+ tEnv.getCatalog(catalog)
+ .orElseThrow(AssertionError::new)
+ .tableExists(new ObjectPath(database, tableName));
+ if (isTemporary) {
+ assertThat(isInCatalog).isFalse();
+ } else {
+ assertThat(isInCatalog).isTrue();
+ }
+
+ final Optional<CatalogManager.TableLookupResult> lookupResult =
+ tEnv.getCatalogManager().getTable(identifier);
+ assertThat(lookupResult.isPresent()).isTrue();
+
+ final CatalogBaseTable catalogTable = lookupResult.get().getTable();
+ assertThat(catalogTable instanceof CatalogTable).isTrue();
+ assertThat(catalogTable.getUnresolvedSchema()).isEqualTo(schema);
+ assertThat(catalogTable.getOptions().get("a")).isEqualTo("Test");
+ assertThat(catalogTable.getOptions().get(ENRICHED_KEY)).isEqualTo(ENRICHED_VALUE);
+
+ AtomicReference<Map<String, String>> reference = MANAGED_TABLES.get(identifier);
+ assertThat(reference.get()).isNotNull();
+ assertThat(reference.get().get("a")).isEqualTo("Test");
+ assertThat(reference.get().get(ENRICHED_KEY)).isEqualTo(ENRICHED_VALUE);
+
+ DropTableOperation dropOperation =
+ new DropTableOperation(identifier, ignoreIfExists, isTemporary);
+ tEnv.executeInternal(dropOperation);
+ assertThat(MANAGED_TABLES.get(identifier).get()).isNull();
+
+ // test ignore: drop again
+ if (ignoreIfExists) {
+ tEnv.executeInternal(dropOperation);
+ } else {
+ assertThatThrownBy(() -> tEnv.executeInternal(dropOperation), "does not exist");
+ }
+ MANAGED_TABLES.remove(identifier);
}
}