You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2021/02/12 14:27:09 UTC

[flink] branch master updated: [FLINK-21295][table-api] Support 'useModules' and 'listFullModules' in TableEnvironment and ModuleManager

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 75327e0  [FLINK-21295][table-api] Support 'useModules' and 'listFullModules' in TableEnvironment and ModuleManager
75327e0 is described below

commit 75327e0a14c69c739665a34d8d3a7705b0c11d35
Author: Jane <55...@users.noreply.github.com>
AuthorDate: Fri Feb 12 22:26:43 2021 +0800

    [FLINK-21295][table-api] Support 'useModules' and 'listFullModules' in TableEnvironment and ModuleManager
    
    This closes #14895
---
 .../table/tests/test_environment_completeness.py   |   2 +
 .../apache/flink/table/api/TableEnvironment.java   |  21 +-
 .../table/api/internal/TableEnvironmentImpl.java   |  11 ++
 .../org/apache/flink/table/module/ModuleEntry.java |  69 +++++++
 .../apache/flink/table/module/ModuleManager.java   | 138 +++++++++----
 .../flink/table/module/ModuleManagerTest.java      | 217 +++++++++++++++++++++
 .../org/apache/flink/table/utils/ModuleMock.java   |  60 ++++++
 .../src/test/resources/log4j2-test.properties      |  28 +++
 .../flink/table/api/internal/TableEnvImpl.scala    |  12 +-
 .../flink/table/utils/MockTableEnvironment.scala   |   7 +-
 10 files changed, 518 insertions(+), 47 deletions(-)

diff --git a/flink-python/pyflink/table/tests/test_environment_completeness.py b/flink-python/pyflink/table/tests/test_environment_completeness.py
index 7cfd27e..a9fec44 100644
--- a/flink-python/pyflink/table/tests/test_environment_completeness.py
+++ b/flink-python/pyflink/table/tests/test_environment_completeness.py
@@ -43,6 +43,8 @@ class EnvironmentAPICompletenessTests(PythonAPICompletenessTestCase, PyFlinkTest
             'create',
             'loadModule',
             'unloadModule',
+            'useModules',
+            'listFullModules',
             'createTemporarySystemFunction',
             'dropTemporarySystemFunction',
             'createFunction',
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index c736e69..fd9c1fa 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.UserDefinedFunction;
 import org.apache.flink.table.module.Module;
+import org.apache.flink.table.module.ModuleEntry;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.types.AbstractDataType;
@@ -384,6 +385,14 @@ public interface TableEnvironment {
     void loadModule(String moduleName, Module module);
 
     /**
+     * Enable modules in use with declared name order. Modules that have been loaded but not exist
+     * in names varargs will become unused.
+     *
+     * @param moduleNames module names to be used
+     */
+    void useModules(String... moduleNames);
+
+    /**
      * Unloads a {@link Module} with given name. ValidationException is thrown when there is no
      * module with the given name.
      *
@@ -718,13 +727,21 @@ public interface TableEnvironment {
     String[] listCatalogs();
 
     /**
-     * Gets an array of names of all modules in this environment in the loaded order.
+     * Gets an array of names of all used modules in this environment in resolution order.
      *
-     * @return A list of the names of all modules in the loaded order.
+     * @return A list of the names of used modules in resolution order.
      */
     String[] listModules();
 
     /**
+     * Gets an array of all loaded modules with use status in this environment. Used modules are
+     * kept in resolution order.
+     *
+     * @return A list of name and use status entries of all loaded modules.
+     */
+    ModuleEntry[] listFullModules();
+
+    /**
      * Gets the names of all databases registered in the current catalog.
      *
      * @return A list of the names of all registered databases in the current catalog.
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 806d271..c4d254a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -79,6 +79,7 @@ import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.UserDefinedFunction;
 import org.apache.flink.table.functions.UserDefinedFunctionHelper;
 import org.apache.flink.table.module.Module;
+import org.apache.flink.table.module.ModuleEntry;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.operations.CatalogQueryOperation;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
@@ -371,6 +372,11 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
     }
 
     @Override
+    public void useModules(String... moduleNames) {
+        moduleManager.useModules(moduleNames);
+    }
+
+    @Override
     public void unloadModule(String moduleName) {
         moduleManager.unloadModule(moduleName);
     }
@@ -540,6 +546,11 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
     }
 
     @Override
+    public ModuleEntry[] listFullModules() {
+        return moduleManager.listFullModules().toArray(new ModuleEntry[0]);
+    }
+
+    @Override
     public String[] listDatabases() {
         return catalogManager
                 .getCatalog(catalogManager.getCurrentCatalog())
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleEntry.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleEntry.java
new file mode 100644
index 0000000..d68cb97
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleEntry.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.module;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+/** A POJO to represent a module's name and use status. */
+@PublicEvolving
+public class ModuleEntry {
+    private final String name;
+    private final boolean used;
+
+    public ModuleEntry(String name, boolean used) {
+        this.name = name;
+        this.used = used;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public boolean used() {
+        return used;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        ModuleEntry entry = (ModuleEntry) o;
+
+        return new EqualsBuilder().append(used, entry.used).append(name, entry.name).isEquals();
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder(17, 37).append(name).append(used).toHashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "ModuleEntry{" + "name='" + name + '\'' + ", used=" + used + '}';
+    }
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java
index b4bdcd6..b0f5b5f 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.module;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.util.StringUtils;
@@ -26,6 +27,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -45,99 +49,153 @@ public class ModuleManager {
 
     private static final Logger LOG = LoggerFactory.getLogger(ModuleManager.class);
 
-    private LinkedHashMap<String, Module> modules;
+    /** To keep {@link #listFullModules()} deterministic. */
+    private LinkedHashMap<String, Module> loadedModules;
 
-    public ModuleManager() {
-        this.modules = new LinkedHashMap<>();
+    /** Keep tracking used modules with resolution order. */
+    private List<String> usedModules;
 
-        modules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE);
+    public ModuleManager() {
+        this.loadedModules = new LinkedHashMap<>();
+        this.usedModules = new ArrayList<>();
+        loadedModules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE);
+        usedModules.add(MODULE_TYPE_CORE);
     }
 
     /**
      * Load a module under a unique name. Modules will be kept in the loaded order, and new module
-     * will be added to the end. ValidationException is thrown when there is already a module with
-     * the same name.
+     * will be added to the left before the unused module and turn on use by default.
      *
      * @param name name of the module
      * @param module the module instance
+     * @throws ValidationException when there already exists a module with the same name
      */
     public void loadModule(String name, Module module) {
         checkArgument(
                 !StringUtils.isNullOrWhitespaceOnly(name), "name cannot be null or empty string");
         checkNotNull(module, "module cannot be null");
 
-        if (!modules.containsKey(name)) {
-            modules.put(name, module);
-
-            LOG.info("Loaded module {} from class {}", name, module.getClass().getName());
-        } else {
+        if (loadedModules.containsKey(name)) {
             throw new ValidationException(
-                    String.format("A module with name %s already exists", name));
+                    String.format("A module with name '%s' already exists", name));
+        } else {
+            usedModules.add(name);
+            loadedModules.put(name, module);
+            LOG.info("Loaded module '{}' from class {}", name, module.getClass().getName());
         }
     }
 
     /**
-     * Unload a module with given name. ValidationException is thrown when there is no module with
-     * the given name.
+     * Unload a module with given name.
      *
      * @param name name of the module
+     * @throws ValidationException when there is no module with the given name
      */
     public void unloadModule(String name) {
-        if (modules.containsKey(name)) {
-            modules.remove(name);
-
-            LOG.info("Unloaded module {}", name);
+        if (loadedModules.containsKey(name)) {
+            loadedModules.remove(name);
+            boolean used = usedModules.remove(name);
+            LOG.info("Unloaded an {} module '{}'", used ? "used" : "unused", name);
         } else {
-            throw new ValidationException(String.format("No module with name %s exists", name));
+            throw new ValidationException(String.format("No module with name '%s' exists", name));
+        }
+    }
+
+    /**
+     * Enable modules in use with declared name order. Modules that have been loaded but not exist
+     * in names varargs will become unused.
+     *
+     * @param names module names to be used
+     * @throws ValidationException when module names contain an unloaded name
+     */
+    public void useModules(String... names) {
+        checkNotNull(names, "names cannot be null");
+        Set<String> deduplicateNames = new HashSet<>();
+        for (String name : names) {
+            if (!loadedModules.containsKey(name)) {
+                throw new ValidationException(
+                        String.format("No module with name '%s' exists", name));
+            }
+            if (!deduplicateNames.add(name)) {
+                throw new ValidationException(
+                        String.format("Module '%s' appears more than once", name));
+            }
         }
+        usedModules.clear();
+        usedModules.addAll(Arrays.asList(names));
     }
 
     /**
-     * Get names of all modules loaded.
+     * Get names of all used modules in resolution order.
      *
-     * @return a list of names of modules loaded
+     * @return a list of names of used modules
      */
     public List<String> listModules() {
-        return new ArrayList<>(modules.keySet());
+        return new ArrayList<>(usedModules);
+    }
+
+    /**
+     * Get all loaded modules with use status. Modules in use status are returned in resolution
+     * order.
+     *
+     * @return a list of module entries with module name and use status
+     */
+    public List<ModuleEntry> listFullModules() {
+        // keep the order for used modules
+        List<ModuleEntry> moduleEntries =
+                usedModules.stream()
+                        .map(name -> new ModuleEntry(name, true))
+                        .collect(Collectors.toList());
+        loadedModules.keySet().stream()
+                .filter(name -> !usedModules.contains(name))
+                .forEach(name -> moduleEntries.add(new ModuleEntry(name, false)));
+        return moduleEntries;
     }
 
     /**
-     * Get names of all functions from all modules.
+     * Get names of all functions from used modules.
      *
-     * @return a set of names of registered modules.
+     * @return a set of function names of used modules
      */
     public Set<String> listFunctions() {
-        return modules.values().stream()
-                .map(m -> m.listFunctions())
-                .flatMap(n -> n.stream())
+        return usedModules.stream()
+                .map(name -> loadedModules.get(name).listFunctions())
+                .flatMap(Collection::stream)
                 .collect(Collectors.toSet());
     }
 
     /**
      * Get an optional of {@link FunctionDefinition} by a given name. Function will be resolved to
-     * modules in the loaded order, and the first match will be returned. If no match is found in
-     * all modules, return an optional.
+     * modules in the used order, and the first match will be returned. If no match is found in all
+     * modules, return an optional.
      *
      * @param name name of the function
      * @return an optional of {@link FunctionDefinition}
      */
     public Optional<FunctionDefinition> getFunctionDefinition(String name) {
-        Optional<Map.Entry<String, Module>> result =
-                modules.entrySet().stream()
+        Optional<String> module =
+                usedModules.stream()
                         .filter(
-                                p ->
-                                        p.getValue().listFunctions().stream()
+                                n ->
+                                        loadedModules.get(n).listFunctions().stream()
                                                 .anyMatch(e -> e.equalsIgnoreCase(name)))
                         .findFirst();
+        if (module.isPresent()) {
+            LOG.debug("Got FunctionDefinition '{}' from '{}' module.", name, module.get());
+            return loadedModules.get(module.get()).getFunctionDefinition(name);
+        }
+        LOG.debug("Cannot find FunctionDefinition '{}' from any loaded modules.", name);
 
-        if (result.isPresent()) {
-            LOG.debug("Got FunctionDefinition '{}' from '{}' module.", name, result.get().getKey());
+        return Optional.empty();
+    }
 
-            return result.get().getValue().getFunctionDefinition(name);
-        } else {
-            LOG.debug("Cannot find FunctionDefinition '{}' from any loaded modules.", name);
+    @VisibleForTesting
+    List<String> getUsedModules() {
+        return usedModules;
+    }
 
-            return Optional.empty();
-        }
+    @VisibleForTesting
+    Map<String, Module> getLoadedModules() {
+        return loadedModules;
     }
 }
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/module/ModuleManagerTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/module/ModuleManagerTest.java
new file mode 100644
index 0000000..189dc03
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/module/ModuleManagerTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.module;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.utils.ModuleMock;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.descriptors.CoreModuleDescriptorValidator.MODULE_TYPE_CORE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link ModuleManager}. */
+public class ModuleManagerTest extends TestLogger {
+    private ModuleManager manager;
+    @Rule public ExpectedException thrown = ExpectedException.none();
+
+    @Before
+    public void before() {
+        manager = new ModuleManager();
+    }
+
+    @Test
+    public void testLoadModuleTwice() {
+        // CoreModule is loaded by default
+        assertEquals(Collections.singletonList(MODULE_TYPE_CORE), manager.getUsedModules());
+        assertEquals(CoreModule.INSTANCE, manager.getLoadedModules().get(MODULE_TYPE_CORE));
+
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage("A module with name 'core' already exists");
+        manager.loadModule(MODULE_TYPE_CORE, CoreModule.INSTANCE);
+    }
+
+    @Test
+    public void testLoadModuleWithoutUnusedModulesExist() {
+        ModuleMock x = new ModuleMock("x");
+        ModuleMock y = new ModuleMock("y");
+        ModuleMock z = new ModuleMock("z");
+        manager.loadModule(x.getType(), x);
+        manager.loadModule(y.getType(), y);
+        manager.loadModule(z.getType(), z);
+
+        Map<String, Module> expectedLoadedModules = new HashMap<>();
+        expectedLoadedModules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE);
+        expectedLoadedModules.put("x", x);
+        expectedLoadedModules.put("y", y);
+        expectedLoadedModules.put("z", z);
+
+        assertEquals(Arrays.asList(MODULE_TYPE_CORE, "x", "y", "z"), manager.getUsedModules());
+        assertEquals(expectedLoadedModules, manager.getLoadedModules());
+    }
+
+    @Test
+    public void testLoadModuleWithUnusedModulesExist() {
+        ModuleMock y = new ModuleMock("y");
+        ModuleMock z = new ModuleMock("z");
+        manager.loadModule(y.getType(), y);
+        manager.loadModule(z.getType(), z);
+
+        Map<String, Module> expectedLoadedModules = new HashMap<>();
+        expectedLoadedModules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE);
+        expectedLoadedModules.put("y", y);
+        expectedLoadedModules.put("z", z);
+
+        assertEquals(Arrays.asList(MODULE_TYPE_CORE, "y", "z"), manager.getUsedModules());
+        assertEquals(expectedLoadedModules, manager.getLoadedModules());
+
+        // disable module y and z
+        manager.useModules(MODULE_TYPE_CORE);
+
+        // load module x to test the order
+        ModuleMock x = new ModuleMock("x");
+        manager.loadModule(x.getType(), x);
+        expectedLoadedModules.put("x", x);
+
+        assertEquals(Arrays.asList(MODULE_TYPE_CORE, "x"), manager.getUsedModules());
+        assertEquals(expectedLoadedModules, manager.getLoadedModules());
+    }
+
+    @Test
+    public void testUnloadModuleTwice() {
+        assertEquals(Collections.singletonList(MODULE_TYPE_CORE), manager.getUsedModules());
+
+        manager.unloadModule(MODULE_TYPE_CORE);
+        assertEquals(Collections.emptyList(), manager.getUsedModules());
+        assertEquals(Collections.emptyMap(), manager.getLoadedModules());
+
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage("No module with name 'core' exists");
+        manager.unloadModule(MODULE_TYPE_CORE);
+    }
+
+    @Test
+    public void testUseUnloadedModules() {
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage("No module with name 'x' exists");
+        manager.useModules(MODULE_TYPE_CORE, "x");
+    }
+
+    @Test
+    public void testUseModulesWithDuplicateModuleName() {
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage("Module 'core' appears more than once");
+        manager.useModules(MODULE_TYPE_CORE, MODULE_TYPE_CORE);
+    }
+
+    @Test
+    public void testUseModules() {
+        ModuleMock x = new ModuleMock("x");
+        ModuleMock y = new ModuleMock("y");
+        ModuleMock z = new ModuleMock("z");
+        manager.loadModule(x.getType(), x);
+        manager.loadModule(y.getType(), y);
+        manager.loadModule(z.getType(), z);
+
+        assertEquals(Arrays.asList(MODULE_TYPE_CORE, "x", "y", "z"), manager.getUsedModules());
+
+        // test order for used modules
+        manager.useModules("z", MODULE_TYPE_CORE);
+        assertEquals(Arrays.asList("z", MODULE_TYPE_CORE), manager.getUsedModules());
+
+        // test unmentioned modules are still loaded
+        Map<String, Module> expectedLoadedModules = new HashMap<>();
+        expectedLoadedModules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE);
+        expectedLoadedModules.put("x", x);
+        expectedLoadedModules.put("y", y);
+        expectedLoadedModules.put("z", z);
+        assertEquals(expectedLoadedModules, manager.getLoadedModules());
+    }
+
+    @Test
+    public void testListModules() {
+        ModuleMock y = new ModuleMock("y");
+        ModuleMock z = new ModuleMock("z");
+        manager.loadModule("y", y);
+        manager.loadModule("z", z);
+        manager.useModules("z", "y");
+
+        assertEquals(Arrays.asList("z", "y"), manager.listModules());
+    }
+
+    @Test
+    public void testListFullModules() {
+        ModuleMock x = new ModuleMock("x");
+        ModuleMock y = new ModuleMock("y");
+        ModuleMock z = new ModuleMock("z");
+
+        manager.loadModule("y", y);
+        manager.loadModule("x", x);
+        manager.loadModule("z", z);
+        manager.useModules("z", "y");
+
+        assertEquals(
+                getExpectedModuleEntries(2, "z", "y", MODULE_TYPE_CORE, "x"),
+                manager.listFullModules());
+    }
+
+    @Test
+    public void testListFunctions() {
+        ModuleMock x = new ModuleMock("x");
+        manager.loadModule(x.getType(), x);
+
+        assertTrue(manager.listFunctions().contains("dummy"));
+
+        // should not return function name of an unused module
+        manager.useModules(MODULE_TYPE_CORE);
+        assertFalse(manager.listFunctions().contains("dummy"));
+    }
+
+    @Test
+    public void testGetFunctionDefinition() {
+        ModuleMock x = new ModuleMock("x");
+        manager.loadModule(x.getType(), x);
+
+        assertTrue(manager.getFunctionDefinition("dummy").isPresent());
+
+        // should not return function definition of an unused module
+        manager.useModules(MODULE_TYPE_CORE);
+        assertFalse(manager.getFunctionDefinition("dummy").isPresent());
+    }
+
+    private static List<ModuleEntry> getExpectedModuleEntries(int index, String... names) {
+        return IntStream.range(0, names.length)
+                .mapToObj(i -> new ModuleEntry(names[i], i < index))
+                .collect(Collectors.toList());
+    }
+}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ModuleMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ModuleMock.java
new file mode 100644
index 0000000..86f1626
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ModuleMock.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionKind;
+import org.apache.flink.table.module.Module;
+import org.apache.flink.table.types.inference.utils.FunctionDefinitionMock;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+/** Mocking {@link Module} for tests. */
+public class ModuleMock implements Module {
+    private static final Set<String> BUILT_IN_FUNCTIONS =
+            Collections.unmodifiableSet(new HashSet<>(Collections.singletonList("dummy")));
+    private final String type;
+    private final FunctionDefinitionMock functionDef;
+
+    public ModuleMock(String type) {
+        this.type = type;
+        functionDef = new FunctionDefinitionMock();
+        functionDef.functionKind = FunctionKind.OTHER;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    @Override
+    public Set<String> listFunctions() {
+        return BUILT_IN_FUNCTIONS;
+    }
+
+    @Override
+    public Optional<FunctionDefinition> getFunctionDefinition(String name) {
+        if (BUILT_IN_FUNCTIONS.contains(name)) {
+            return Optional.of(functionDef);
+        }
+        return Optional.empty();
+    }
+}
diff --git a/flink-table/flink-table-api-java/src/test/resources/log4j2-test.properties b/flink-table/flink-table-api-java/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..8bb9fe6
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index e304e3c..1fa57d2 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -34,7 +34,7 @@ import org.apache.flink.table.expressions.resolver.SqlExpressionResolver
 import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup
 import org.apache.flink.table.factories.{TableFactoryUtil, TableSinkFactoryContextImpl}
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction, _}
-import org.apache.flink.table.module.{Module, ModuleManager}
+import org.apache.flink.table.module.{Module, ModuleEntry, ModuleManager}
 import org.apache.flink.table.operations.ddl._
 import org.apache.flink.table.operations.utils.OperationTreeBuilder
 import org.apache.flink.table.operations.{CatalogQueryOperation, TableSourceQueryOperation, _}
@@ -45,7 +45,6 @@ import org.apache.flink.table.types.{AbstractDataType, DataType}
 import org.apache.flink.table.util.JavaScalaConversionUtil
 import org.apache.flink.table.utils.PrintUtils
 import org.apache.flink.types.Row
-
 import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
 import org.apache.calcite.sql.parser.SqlParser
 import org.apache.calcite.tools.FrameworkConfig
@@ -53,7 +52,6 @@ import org.apache.calcite.tools.FrameworkConfig
 import _root_.java.lang.{Iterable => JIterable, Long => JLong}
 import _root_.java.util.function.{Function => JFunction, Supplier => JSupplier}
 import _root_.java.util.{Optional, Collections => JCollections, HashMap => JHashMap, List => JList, Map => JMap}
-
 import _root_.scala.collection.JavaConversions._
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.util.Try
@@ -283,6 +281,10 @@ abstract class TableEnvImpl(
     moduleManager.loadModule(moduleName, module)
   }
 
+  override def useModules(moduleNames: String*): Unit = {
+    moduleManager.useModules(moduleNames: _*)
+  }
+
   override def unloadModule(moduleName: String): Unit = {
     moduleManager.unloadModule(moduleName)
   }
@@ -461,6 +463,10 @@ abstract class TableEnvImpl(
     moduleManager.listModules().asScala.toArray
   }
 
+  override def listFullModules(): Array[ModuleEntry] = {
+    moduleManager.listFullModules().asScala.toArray
+  }
+
   override def listCatalogs(): Array[String] = {
     catalogManager.listCatalogs
       .asScala
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index 0f69ff3..7d6bb4c 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -20,14 +20,13 @@ package org.apache.flink.table.utils
 
 import java.lang.{Iterable => JIterable}
 import java.util.Optional
-
 import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.table.api.{ExplainDetail, StatementSet, Table, TableConfig, TableEnvironment, TableResult}
 import org.apache.flink.table.catalog.Catalog
 import org.apache.flink.table.descriptors.{ConnectTableDescriptor, ConnectorDescriptor}
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.functions.{ScalarFunction, UserDefinedFunction}
-import org.apache.flink.table.module.Module
+import org.apache.flink.table.module.{Module, ModuleEntry}
 import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.types.AbstractDataType
 
@@ -47,6 +46,8 @@ class MockTableEnvironment extends TableEnvironment {
 
   override def listModules(): Array[String] = ???
 
+  override def listFullModules(): Array[ModuleEntry] = ???
+
   override def listDatabases(): Array[String] = ???
 
   override def listTables(): Array[String] = ???
@@ -102,6 +103,8 @@ class MockTableEnvironment extends TableEnvironment {
 
   override def loadModule(moduleName: String, module: Module): Unit = ???
 
+  override def useModules(moduleNames: String*): Unit = ???
+
   override def unloadModule(moduleName: String): Unit = ???
 
   override def createTemporaryView(