You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2021/12/17 08:03:06 UTC

[flink] 04/04: [FLINK-25174][table] Implement callback for managed table

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f01e911a0d56ecd2e40aa81d1808067515c9610c
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Fri Dec 17 15:56:01 2021 +0800

    [FLINK-25174][table] Implement callback for managed table
    
    This closes #18088
---
 .../apache/flink/table/catalog/CatalogManager.java |  54 ++++++--
 .../flink/table/catalog/ManagedTableListener.java  | 137 ++++++++++++++++++
 .../flink/table/api/TableEnvironmentTest.java      | 153 +++++++++++++++++----
 3 files changed, 312 insertions(+), 32 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index f0648b8..496c936 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -84,8 +84,13 @@ public final class CatalogManager {
 
     private final DataTypeFactory typeFactory;
 
+    private final ManagedTableListener managedTableListener;
+
     private CatalogManager(
-            String defaultCatalogName, Catalog defaultCatalog, DataTypeFactory typeFactory) {
+            String defaultCatalogName,
+            Catalog defaultCatalog,
+            DataTypeFactory typeFactory,
+            ManagedTableListener managedTableListener) {
         checkArgument(
                 !StringUtils.isNullOrWhitespaceOnly(defaultCatalogName),
                 "Default catalog name cannot be null or empty");
@@ -101,6 +106,7 @@ public final class CatalogManager {
         builtInCatalogName = defaultCatalogName;
 
         this.typeFactory = typeFactory;
+        this.managedTableListener = managedTableListener;
     }
 
     public static Builder newBuilder() {
@@ -147,7 +153,8 @@ public final class CatalogManager {
             return new CatalogManager(
                     defaultCatalogName,
                     defaultCatalog,
-                    new DataTypeFactoryImpl(classLoader, config, executionConfig));
+                    new DataTypeFactoryImpl(classLoader, config, executionConfig),
+                    new ManagedTableListener(classLoader, config));
         }
     }
 
@@ -656,8 +663,18 @@ public final class CatalogManager {
     public void createTable(
             CatalogBaseTable table, ObjectIdentifier objectIdentifier, boolean ignoreIfExists) {
         execute(
-                (catalog, path) ->
-                        catalog.createTable(path, resolveCatalogBaseTable(table), ignoreIfExists),
+                (catalog, path) -> {
+                    ResolvedCatalogBaseTable<?> resolvedTable = resolveCatalogBaseTable(table);
+                    ResolvedCatalogBaseTable<?> resolvedListenedTable =
+                            managedTableListener.notifyTableCreation(
+                                    catalog,
+                                    objectIdentifier,
+                                    resolvedTable,
+                                    false,
+                                    ignoreIfExists);
+
+                    catalog.createTable(path, resolvedListenedTable, ignoreIfExists);
+                },
                 objectIdentifier,
                 false,
                 "CreateTable");
@@ -686,13 +703,21 @@ public final class CatalogManager {
                         }
                         return v;
                     } else {
-                        final CatalogBaseTable resolvedTable = resolveCatalogBaseTable(table);
+                        ResolvedCatalogBaseTable<?> resolvedTable = resolveCatalogBaseTable(table);
+                        ResolvedCatalogBaseTable<?> resolvedListenedTable =
+                                managedTableListener.notifyTableCreation(
+                                        getCatalog(objectIdentifier.getCatalogName()).orElse(null),
+                                        objectIdentifier,
+                                        resolvedTable,
+                                        true,
+                                        ignoreIfExists);
+
                         if (listener.isPresent()) {
                             return listener.get()
                                     .onCreateTemporaryTable(
-                                            objectIdentifier.toObjectPath(), resolvedTable);
+                                            objectIdentifier.toObjectPath(), resolvedListenedTable);
                         }
-                        return resolvedTable;
+                        return resolvedListenedTable;
                     }
                 });
     }
@@ -729,6 +754,12 @@ public final class CatalogManager {
         if (filter.test(catalogBaseTable)) {
             getTemporaryOperationListener(objectIdentifier)
                     .ifPresent(l -> l.onDropTemporaryTable(objectIdentifier.toObjectPath()));
+
+            Catalog catalog = catalogs.get(objectIdentifier.getCatalogName());
+            ResolvedCatalogBaseTable<?> resolvedTable = resolveCatalogBaseTable(catalogBaseTable);
+            managedTableListener.notifyTableDrop(
+                    catalog, objectIdentifier, resolvedTable, true, ignoreIfNotExists);
+
             temporaryTables.remove(objectIdentifier);
         } else if (!ignoreIfNotExists) {
             throw new ValidationException(
@@ -808,7 +839,14 @@ public final class CatalogManager {
         final Optional<CatalogBaseTable> resultOpt = getUnresolvedTable(objectIdentifier);
         if (resultOpt.isPresent() && filter.test(resultOpt.get())) {
             execute(
-                    (catalog, path) -> catalog.dropTable(path, ignoreIfNotExists),
+                    (catalog, path) -> {
+                        ResolvedCatalogBaseTable<?> resolvedTable =
+                                resolveCatalogBaseTable(resultOpt.get());
+                        managedTableListener.notifyTableDrop(
+                                catalog, objectIdentifier, resolvedTable, false, ignoreIfNotExists);
+
+                        catalog.dropTable(path, ignoreIfNotExists);
+                    },
                     objectIdentifier,
                     ignoreIfNotExists,
                     "DropTable");
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java
new file mode 100644
index 0000000..211dce7
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+import static org.apache.flink.table.factories.ManagedTableFactory.discoverManagedTableFactory;
+
+/** The listener for managed table operations. */
+@Internal
+public class ManagedTableListener {
+
+    private final ClassLoader classLoader;
+
+    private final ReadableConfig config;
+
+    public ManagedTableListener(ClassLoader classLoader, ReadableConfig config) {
+        this.classLoader = classLoader;
+        this.config = config;
+    }
+
+    /** Notify for creating managed table. */
+    public ResolvedCatalogBaseTable<?> notifyTableCreation(
+            @Nullable Catalog catalog,
+            ObjectIdentifier identifier,
+            ResolvedCatalogBaseTable<?> table,
+            boolean isTemporary,
+            boolean ignoreIfExists) {
+        if (isManagedTable(catalog, table)) {
+            ResolvedCatalogTable managedTable = enrichOptions(identifier, table, isTemporary);
+            discoverManagedTableFactory(classLoader)
+                    .onCreateTable(
+                            createTableFactoryContext(identifier, managedTable, isTemporary),
+                            ignoreIfExists);
+            return managedTable;
+        }
+        return table;
+    }
+
+    /** Notify for dropping managed table. */
+    public void notifyTableDrop(
+            @Nullable Catalog catalog,
+            ObjectIdentifier identifier,
+            ResolvedCatalogBaseTable<?> table,
+            boolean isTemporary,
+            boolean ignoreIfNotExists) {
+        if (isManagedTable(catalog, table)) {
+            discoverManagedTableFactory(classLoader)
+                    .onDropTable(
+                            createTableFactoryContext(
+                                    identifier, (ResolvedCatalogTable) table, isTemporary),
+                            ignoreIfNotExists);
+        }
+    }
+
+    private boolean isManagedTable(@Nullable Catalog catalog, ResolvedCatalogBaseTable<?> table) {
+        if (catalog == null || !catalog.supportsManagedTable()) {
+            // catalog not support managed table
+            return false;
+        }
+
+        if (table.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
+            // view is not managed table
+            return false;
+        }
+
+        Map<String, String> options;
+        try {
+            options = table.getOptions();
+        } catch (TableException ignore) {
+            // exclude abnormal tables, such as InlineCatalogTable that does not have the options
+            return false;
+        }
+
+        if (!StringUtils.isNullOrWhitespaceOnly(
+                options.get(ConnectorDescriptorValidator.CONNECTOR_TYPE))) {
+            // legacy connector is not managed table
+            return false;
+        }
+
+        if (!StringUtils.isNullOrWhitespaceOnly(options.get(FactoryUtil.CONNECTOR.key()))) {
+            // with connector is not managed table
+            return false;
+        }
+
+        // ConnectorCatalogTable is not managed table
+        return !(table.getOrigin() instanceof ConnectorCatalogTable);
+    }
+
+    /** Enrich options for creating managed table. */
+    private ResolvedCatalogTable enrichOptions(
+            ObjectIdentifier identifier, ResolvedCatalogBaseTable<?> table, boolean isTemporary) {
+        if (!(table instanceof ResolvedCatalogTable)) {
+            throw new UnsupportedOperationException(
+                    "Managed table only supports catalog table, unsupported table type: "
+                            + table.getClass());
+        }
+        ResolvedCatalogTable resolvedTable = (ResolvedCatalogTable) table;
+        Map<String, String> newOptions =
+                discoverManagedTableFactory(classLoader)
+                        .enrichOptions(
+                                createTableFactoryContext(identifier, resolvedTable, isTemporary));
+        return resolvedTable.copy(newOptions);
+    }
+
+    private DynamicTableFactory.Context createTableFactoryContext(
+            ObjectIdentifier identifier, ResolvedCatalogTable table, boolean isTemporary) {
+        return new FactoryUtil.DefaultDynamicTableContext(
+                identifier, table, config, classLoader, isTemporary);
+    }
+}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java
index 18542bc..c4c12bb 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java
@@ -24,15 +24,22 @@ import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.operations.CatalogQueryOperation;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.operations.ddl.DropTableOperation;
 import org.apache.flink.table.utils.TableEnvironmentMock;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
+import java.util.Map;
 import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.apache.flink.table.factories.TestManagedTableFactory.ENRICHED_KEY;
+import static org.apache.flink.table.factories.TestManagedTableFactory.ENRICHED_VALUE;
+import static org.apache.flink.table.factories.TestManagedTableFactory.MANAGED_TABLES;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link TableEnvironment}. */
 public class TableEnvironmentTest {
@@ -48,20 +55,21 @@ public class TableEnvironmentTest {
                 "T",
                 TableDescriptor.forConnector("fake").schema(schema).option("a", "Test").build());
 
-        assertFalse(
-                tEnv.getCatalog(catalog)
-                        .orElseThrow(AssertionError::new)
-                        .tableExists(new ObjectPath(database, "T")));
+        assertThat(
+                        tEnv.getCatalog(catalog)
+                                .orElseThrow(AssertionError::new)
+                                .tableExists(new ObjectPath(database, "T")))
+                .isFalse();
 
         final Optional<CatalogManager.TableLookupResult> lookupResult =
                 tEnv.getCatalogManager().getTable(ObjectIdentifier.of(catalog, database, "T"));
-        assertTrue(lookupResult.isPresent());
+        assertThat(lookupResult.isPresent()).isTrue();
 
         final CatalogBaseTable catalogTable = lookupResult.get().getTable();
-        assertTrue(catalogTable instanceof CatalogTable);
-        assertEquals(schema, catalogTable.getUnresolvedSchema());
-        assertEquals("fake", catalogTable.getOptions().get("connector"));
-        assertEquals("Test", catalogTable.getOptions().get("a"));
+        assertThat(catalogTable instanceof CatalogTable).isTrue();
+        assertThat(catalogTable.getUnresolvedSchema()).isEqualTo(schema);
+        assertThat(catalogTable.getOptions().get("connector")).isEqualTo("fake");
+        assertThat(catalogTable.getOptions().get("a")).isEqualTo("Test");
     }
 
     @Test
@@ -76,15 +84,18 @@ public class TableEnvironmentTest {
                 TableDescriptor.forConnector("fake").schema(schema).option("a", "Test").build());
 
         final ObjectPath objectPath = new ObjectPath(database, "T");
-        assertTrue(
-                tEnv.getCatalog(catalog).orElseThrow(AssertionError::new).tableExists(objectPath));
+        assertThat(
+                        tEnv.getCatalog(catalog)
+                                .orElseThrow(AssertionError::new)
+                                .tableExists(objectPath))
+                .isTrue();
 
         final CatalogBaseTable catalogTable =
                 tEnv.getCatalog(catalog).orElseThrow(AssertionError::new).getTable(objectPath);
-        assertTrue(catalogTable instanceof CatalogTable);
-        assertEquals(schema, catalogTable.getUnresolvedSchema());
-        assertEquals("fake", catalogTable.getOptions().get("connector"));
-        assertEquals("Test", catalogTable.getOptions().get("a"));
+        assertThat(catalogTable instanceof CatalogTable).isTrue();
+        assertThat(catalogTable.getUnresolvedSchema()).isEqualTo(schema);
+        assertThat(catalogTable.getOptions().get("connector")).isEqualTo("fake");
+        assertThat(catalogTable.getOptions().get("a")).isEqualTo("Test");
     }
 
     @Test
@@ -97,17 +108,111 @@ public class TableEnvironmentTest {
 
         final Table table = tEnv.from(descriptor);
 
-        assertEquals(
-                schema, Schema.newBuilder().fromResolvedSchema(table.getResolvedSchema()).build());
+        assertThat(Schema.newBuilder().fromResolvedSchema(table.getResolvedSchema()).build())
+                .isEqualTo(schema);
 
-        assertTrue(table.getQueryOperation() instanceof CatalogQueryOperation);
+        assertThat(table.getQueryOperation() instanceof CatalogQueryOperation).isTrue();
         final ObjectIdentifier tableIdentifier =
                 ((CatalogQueryOperation) table.getQueryOperation()).getTableIdentifier();
 
         final Optional<CatalogManager.TableLookupResult> lookupResult =
                 tEnv.getCatalogManager().getTable(tableIdentifier);
-        assertTrue(lookupResult.isPresent());
+        assertThat(lookupResult.isPresent()).isTrue();
+
+        assertThat(lookupResult.get().getTable().getOptions().get("connector")).isEqualTo("fake");
+    }
+
+    @Test
+    public void testManagedTable() {
+        innerTestManagedTableFromDescriptor(false, false);
+    }
 
-        assertEquals("fake", lookupResult.get().getTable().getOptions().get("connector"));
+    @Test
+    public void testManagedTableWithIgnoreExists() {
+        innerTestManagedTableFromDescriptor(true, false);
+    }
+
+    @Test
+    public void testTemporaryManagedTableWithIgnoreExists() {
+        innerTestManagedTableFromDescriptor(true, true);
+    }
+
+    @Test
+    public void testTemporaryManagedTable() {
+        innerTestManagedTableFromDescriptor(true, true);
+    }
+
+    private void innerTestManagedTableFromDescriptor(boolean ignoreIfExists, boolean isTemporary) {
+        final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance();
+        final String catalog = tEnv.getCurrentCatalog();
+        final String database = tEnv.getCurrentDatabase();
+
+        final Schema schema = Schema.newBuilder().column("f0", DataTypes.INT()).build();
+        final String tableName = UUID.randomUUID().toString();
+        ObjectIdentifier identifier = ObjectIdentifier.of(catalog, database, tableName);
+
+        // create table
+        MANAGED_TABLES.put(identifier, new AtomicReference<>());
+        CreateTableOperation createOperation =
+                new CreateTableOperation(
+                        identifier,
+                        TableDescriptor.forManaged()
+                                .schema(schema)
+                                .option("a", "Test")
+                                .build()
+                                .toCatalogTable(),
+                        ignoreIfExists,
+                        isTemporary);
+
+        tEnv.executeInternal(createOperation);
+
+        // test ignore: create again
+        if (ignoreIfExists) {
+            tEnv.executeInternal(createOperation);
+        } else {
+            assertThatThrownBy(
+                    () -> tEnv.executeInternal(createOperation),
+                    isTemporary ? "already exists" : "Could not execute CreateTable");
+        }
+
+        // lookup table
+
+        boolean isInCatalog =
+                tEnv.getCatalog(catalog)
+                        .orElseThrow(AssertionError::new)
+                        .tableExists(new ObjectPath(database, tableName));
+        if (isTemporary) {
+            assertThat(isInCatalog).isFalse();
+        } else {
+            assertThat(isInCatalog).isTrue();
+        }
+
+        final Optional<CatalogManager.TableLookupResult> lookupResult =
+                tEnv.getCatalogManager().getTable(identifier);
+        assertThat(lookupResult.isPresent()).isTrue();
+
+        final CatalogBaseTable catalogTable = lookupResult.get().getTable();
+        assertThat(catalogTable instanceof CatalogTable).isTrue();
+        assertThat(catalogTable.getUnresolvedSchema()).isEqualTo(schema);
+        assertThat(catalogTable.getOptions().get("a")).isEqualTo("Test");
+        assertThat(catalogTable.getOptions().get(ENRICHED_KEY)).isEqualTo(ENRICHED_VALUE);
+
+        AtomicReference<Map<String, String>> reference = MANAGED_TABLES.get(identifier);
+        assertThat(reference.get()).isNotNull();
+        assertThat(reference.get().get("a")).isEqualTo("Test");
+        assertThat(reference.get().get(ENRICHED_KEY)).isEqualTo(ENRICHED_VALUE);
+
+        DropTableOperation dropOperation =
+                new DropTableOperation(identifier, ignoreIfExists, isTemporary);
+        tEnv.executeInternal(dropOperation);
+        assertThat(MANAGED_TABLES.get(identifier).get()).isNull();
+
+        // test ignore: drop again
+        if (ignoreIfExists) {
+            tEnv.executeInternal(dropOperation);
+        } else {
+            assertThatThrownBy(() -> tEnv.executeInternal(dropOperation), "does not exist");
+        }
+        MANAGED_TABLES.remove(identifier);
     }
 }