You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2021/02/12 14:27:09 UTC
[flink] branch master updated: [FLINK-21295][table-api] Support
'useModules' and 'listFullModules' in TableEnvironment and ModuleManager
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 75327e0 [FLINK-21295][table-api] Support 'useModules' and 'listFullModules' in TableEnvironment and ModuleManager
75327e0 is described below
commit 75327e0a14c69c739665a34d8d3a7705b0c11d35
Author: Jane <55...@users.noreply.github.com>
AuthorDate: Fri Feb 12 22:26:43 2021 +0800
[FLINK-21295][table-api] Support 'useModules' and 'listFullModules' in TableEnvironment and ModuleManager
This closes #14895
---
.../table/tests/test_environment_completeness.py | 2 +
.../apache/flink/table/api/TableEnvironment.java | 21 +-
.../table/api/internal/TableEnvironmentImpl.java | 11 ++
.../org/apache/flink/table/module/ModuleEntry.java | 69 +++++++
.../apache/flink/table/module/ModuleManager.java | 138 +++++++++----
.../flink/table/module/ModuleManagerTest.java | 217 +++++++++++++++++++++
.../org/apache/flink/table/utils/ModuleMock.java | 60 ++++++
.../src/test/resources/log4j2-test.properties | 28 +++
.../flink/table/api/internal/TableEnvImpl.scala | 12 +-
.../flink/table/utils/MockTableEnvironment.scala | 7 +-
10 files changed, 518 insertions(+), 47 deletions(-)
diff --git a/flink-python/pyflink/table/tests/test_environment_completeness.py b/flink-python/pyflink/table/tests/test_environment_completeness.py
index 7cfd27e..a9fec44 100644
--- a/flink-python/pyflink/table/tests/test_environment_completeness.py
+++ b/flink-python/pyflink/table/tests/test_environment_completeness.py
@@ -43,6 +43,8 @@ class EnvironmentAPICompletenessTests(PythonAPICompletenessTestCase, PyFlinkTest
'create',
'loadModule',
'unloadModule',
+ 'useModules',
+ 'listFullModules',
'createTemporarySystemFunction',
'dropTemporarySystemFunction',
'createFunction',
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index c736e69..fd9c1fa 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.module.Module;
+import org.apache.flink.table.module.ModuleEntry;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.types.AbstractDataType;
@@ -384,6 +385,14 @@ public interface TableEnvironment {
void loadModule(String moduleName, Module module);
/**
+ * Enable modules in use with declared name order. Modules that have been loaded but not exist
+ * in names varargs will become unused.
+ *
+ * @param moduleNames module names to be used
+ */
+ void useModules(String... moduleNames);
+
+ /**
* Unloads a {@link Module} with given name. ValidationException is thrown when there is no
* module with the given name.
*
@@ -718,13 +727,21 @@ public interface TableEnvironment {
String[] listCatalogs();
/**
- * Gets an array of names of all modules in this environment in the loaded order.
+ * Gets an array of names of all used modules in this environment in resolution order.
*
- * @return A list of the names of all modules in the loaded order.
+ * @return A list of the names of used modules in resolution order.
*/
String[] listModules();
/**
+ * Gets an array of all loaded modules with use status in this environment. Used modules are
+ * kept in resolution order.
+ *
+ * @return A list of name and use status entries of all loaded modules.
+ */
+ ModuleEntry[] listFullModules();
+
+ /**
* Gets the names of all databases registered in the current catalog.
*
* @return A list of the names of all registered databases in the current catalog.
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 806d271..c4d254a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -79,6 +79,7 @@ import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
import org.apache.flink.table.module.Module;
+import org.apache.flink.table.module.ModuleEntry;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.CatalogQueryOperation;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
@@ -371,6 +372,11 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
}
@Override
+ public void useModules(String... moduleNames) {
+ moduleManager.useModules(moduleNames);
+ }
+
+ @Override
public void unloadModule(String moduleName) {
moduleManager.unloadModule(moduleName);
}
@@ -540,6 +546,11 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
}
@Override
+ public ModuleEntry[] listFullModules() {
+ return moduleManager.listFullModules().toArray(new ModuleEntry[0]);
+ }
+
+ @Override
public String[] listDatabases() {
return catalogManager
.getCatalog(catalogManager.getCurrentCatalog())
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleEntry.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleEntry.java
new file mode 100644
index 0000000..d68cb97
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleEntry.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.module;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+/** A POJO to represent a module's name and use status. */
+@PublicEvolving
+public class ModuleEntry {
+ private final String name;
+ private final boolean used;
+
+ public ModuleEntry(String name, boolean used) {
+ this.name = name;
+ this.used = used;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public boolean used() {
+ return used;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ModuleEntry entry = (ModuleEntry) o;
+
+ return new EqualsBuilder().append(used, entry.used).append(name, entry.name).isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(17, 37).append(name).append(used).toHashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "ModuleEntry{" + "name='" + name + '\'' + ", used=" + used + '}';
+ }
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java
index b4bdcd6..b0f5b5f 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.module;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.util.StringUtils;
@@ -26,6 +27,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -45,99 +49,153 @@ public class ModuleManager {
private static final Logger LOG = LoggerFactory.getLogger(ModuleManager.class);
- private LinkedHashMap<String, Module> modules;
+ /** To keep {@link #listFullModules()} deterministic. */
+ private LinkedHashMap<String, Module> loadedModules;
- public ModuleManager() {
- this.modules = new LinkedHashMap<>();
+ /** Keep tracking used modules with resolution order. */
+ private List<String> usedModules;
- modules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE);
+ public ModuleManager() {
+ this.loadedModules = new LinkedHashMap<>();
+ this.usedModules = new ArrayList<>();
+ loadedModules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE);
+ usedModules.add(MODULE_TYPE_CORE);
}
/**
* Load a module under a unique name. Modules will be kept in the loaded order, and new module
- * will be added to the end. ValidationException is thrown when there is already a module with
- * the same name.
+ * will be added to the left before the unused module and turn on use by default.
*
* @param name name of the module
* @param module the module instance
+ * @throws ValidationException when there already exists a module with the same name
*/
public void loadModule(String name, Module module) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(name), "name cannot be null or empty string");
checkNotNull(module, "module cannot be null");
- if (!modules.containsKey(name)) {
- modules.put(name, module);
-
- LOG.info("Loaded module {} from class {}", name, module.getClass().getName());
- } else {
+ if (loadedModules.containsKey(name)) {
throw new ValidationException(
- String.format("A module with name %s already exists", name));
+ String.format("A module with name '%s' already exists", name));
+ } else {
+ usedModules.add(name);
+ loadedModules.put(name, module);
+ LOG.info("Loaded module '{}' from class {}", name, module.getClass().getName());
}
}
/**
- * Unload a module with given name. ValidationException is thrown when there is no module with
- * the given name.
+ * Unload a module with given name.
*
* @param name name of the module
+ * @throws ValidationException when there is no module with the given name
*/
public void unloadModule(String name) {
- if (modules.containsKey(name)) {
- modules.remove(name);
-
- LOG.info("Unloaded module {}", name);
+ if (loadedModules.containsKey(name)) {
+ loadedModules.remove(name);
+ boolean used = usedModules.remove(name);
+ LOG.info("Unloaded an {} module '{}'", used ? "used" : "unused", name);
} else {
- throw new ValidationException(String.format("No module with name %s exists", name));
+ throw new ValidationException(String.format("No module with name '%s' exists", name));
+ }
+ }
+
+ /**
+ * Enable modules in use with declared name order. Modules that have been loaded but not exist
+ * in names varargs will become unused.
+ *
+ * @param names module names to be used
+ * @throws ValidationException when module names contain an unloaded name
+ */
+ public void useModules(String... names) {
+ checkNotNull(names, "names cannot be null");
+ Set<String> deduplicateNames = new HashSet<>();
+ for (String name : names) {
+ if (!loadedModules.containsKey(name)) {
+ throw new ValidationException(
+ String.format("No module with name '%s' exists", name));
+ }
+ if (!deduplicateNames.add(name)) {
+ throw new ValidationException(
+ String.format("Module '%s' appears more than once", name));
+ }
}
+ usedModules.clear();
+ usedModules.addAll(Arrays.asList(names));
}
/**
- * Get names of all modules loaded.
+ * Get names of all used modules in resolution order.
*
- * @return a list of names of modules loaded
+ * @return a list of names of used modules
*/
public List<String> listModules() {
- return new ArrayList<>(modules.keySet());
+ return new ArrayList<>(usedModules);
+ }
+
+ /**
+ * Get all loaded modules with use status. Modules in use status are returned in resolution
+ * order.
+ *
+ * @return a list of module entries with module name and use status
+ */
+ public List<ModuleEntry> listFullModules() {
+ // keep the order for used modules
+ List<ModuleEntry> moduleEntries =
+ usedModules.stream()
+ .map(name -> new ModuleEntry(name, true))
+ .collect(Collectors.toList());
+ loadedModules.keySet().stream()
+ .filter(name -> !usedModules.contains(name))
+ .forEach(name -> moduleEntries.add(new ModuleEntry(name, false)));
+ return moduleEntries;
}
/**
- * Get names of all functions from all modules.
+ * Get names of all functions from used modules.
*
- * @return a set of names of registered modules.
+ * @return a set of function names of used modules
*/
public Set<String> listFunctions() {
- return modules.values().stream()
- .map(m -> m.listFunctions())
- .flatMap(n -> n.stream())
+ return usedModules.stream()
+ .map(name -> loadedModules.get(name).listFunctions())
+ .flatMap(Collection::stream)
.collect(Collectors.toSet());
}
/**
* Get an optional of {@link FunctionDefinition} by a given name. Function will be resolved to
- * modules in the loaded order, and the first match will be returned. If no match is found in
- * all modules, return an optional.
+ * modules in the used order, and the first match will be returned. If no match is found in all
+ * modules, return an optional.
*
* @param name name of the function
* @return an optional of {@link FunctionDefinition}
*/
public Optional<FunctionDefinition> getFunctionDefinition(String name) {
- Optional<Map.Entry<String, Module>> result =
- modules.entrySet().stream()
+ Optional<String> module =
+ usedModules.stream()
.filter(
- p ->
- p.getValue().listFunctions().stream()
+ n ->
+ loadedModules.get(n).listFunctions().stream()
.anyMatch(e -> e.equalsIgnoreCase(name)))
.findFirst();
+ if (module.isPresent()) {
+ LOG.debug("Got FunctionDefinition '{}' from '{}' module.", name, module.get());
+ return loadedModules.get(module.get()).getFunctionDefinition(name);
+ }
+ LOG.debug("Cannot find FunctionDefinition '{}' from any loaded modules.", name);
- if (result.isPresent()) {
- LOG.debug("Got FunctionDefinition '{}' from '{}' module.", name, result.get().getKey());
+ return Optional.empty();
+ }
- return result.get().getValue().getFunctionDefinition(name);
- } else {
- LOG.debug("Cannot find FunctionDefinition '{}' from any loaded modules.", name);
+ @VisibleForTesting
+ List<String> getUsedModules() {
+ return usedModules;
+ }
- return Optional.empty();
- }
+ @VisibleForTesting
+ Map<String, Module> getLoadedModules() {
+ return loadedModules;
}
}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/module/ModuleManagerTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/module/ModuleManagerTest.java
new file mode 100644
index 0000000..189dc03
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/module/ModuleManagerTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.module;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.utils.ModuleMock;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.descriptors.CoreModuleDescriptorValidator.MODULE_TYPE_CORE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link ModuleManager}. */
+public class ModuleManagerTest extends TestLogger {
+ private ModuleManager manager;
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Before
+ public void before() {
+ manager = new ModuleManager();
+ }
+
+ @Test
+ public void testLoadModuleTwice() {
+ // CoreModule is loaded by default
+ assertEquals(Collections.singletonList(MODULE_TYPE_CORE), manager.getUsedModules());
+ assertEquals(CoreModule.INSTANCE, manager.getLoadedModules().get(MODULE_TYPE_CORE));
+
+ thrown.expect(ValidationException.class);
+ thrown.expectMessage("A module with name 'core' already exists");
+ manager.loadModule(MODULE_TYPE_CORE, CoreModule.INSTANCE);
+ }
+
+ @Test
+ public void testLoadModuleWithoutUnusedModulesExist() {
+ ModuleMock x = new ModuleMock("x");
+ ModuleMock y = new ModuleMock("y");
+ ModuleMock z = new ModuleMock("z");
+ manager.loadModule(x.getType(), x);
+ manager.loadModule(y.getType(), y);
+ manager.loadModule(z.getType(), z);
+
+ Map<String, Module> expectedLoadedModules = new HashMap<>();
+ expectedLoadedModules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE);
+ expectedLoadedModules.put("x", x);
+ expectedLoadedModules.put("y", y);
+ expectedLoadedModules.put("z", z);
+
+ assertEquals(Arrays.asList(MODULE_TYPE_CORE, "x", "y", "z"), manager.getUsedModules());
+ assertEquals(expectedLoadedModules, manager.getLoadedModules());
+ }
+
+ @Test
+ public void testLoadModuleWithUnusedModulesExist() {
+ ModuleMock y = new ModuleMock("y");
+ ModuleMock z = new ModuleMock("z");
+ manager.loadModule(y.getType(), y);
+ manager.loadModule(z.getType(), z);
+
+ Map<String, Module> expectedLoadedModules = new HashMap<>();
+ expectedLoadedModules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE);
+ expectedLoadedModules.put("y", y);
+ expectedLoadedModules.put("z", z);
+
+ assertEquals(Arrays.asList(MODULE_TYPE_CORE, "y", "z"), manager.getUsedModules());
+ assertEquals(expectedLoadedModules, manager.getLoadedModules());
+
+ // disable module y and z
+ manager.useModules(MODULE_TYPE_CORE);
+
+ // load module x to test the order
+ ModuleMock x = new ModuleMock("x");
+ manager.loadModule(x.getType(), x);
+ expectedLoadedModules.put("x", x);
+
+ assertEquals(Arrays.asList(MODULE_TYPE_CORE, "x"), manager.getUsedModules());
+ assertEquals(expectedLoadedModules, manager.getLoadedModules());
+ }
+
+ @Test
+ public void testUnloadModuleTwice() {
+ assertEquals(Collections.singletonList(MODULE_TYPE_CORE), manager.getUsedModules());
+
+ manager.unloadModule(MODULE_TYPE_CORE);
+ assertEquals(Collections.emptyList(), manager.getUsedModules());
+ assertEquals(Collections.emptyMap(), manager.getLoadedModules());
+
+ thrown.expect(ValidationException.class);
+ thrown.expectMessage("No module with name 'core' exists");
+ manager.unloadModule(MODULE_TYPE_CORE);
+ }
+
+ @Test
+ public void testUseUnloadedModules() {
+ thrown.expect(ValidationException.class);
+ thrown.expectMessage("No module with name 'x' exists");
+ manager.useModules(MODULE_TYPE_CORE, "x");
+ }
+
+ @Test
+ public void testUseModulesWithDuplicateModuleName() {
+ thrown.expect(ValidationException.class);
+ thrown.expectMessage("Module 'core' appears more than once");
+ manager.useModules(MODULE_TYPE_CORE, MODULE_TYPE_CORE);
+ }
+
+ @Test
+ public void testUseModules() {
+ ModuleMock x = new ModuleMock("x");
+ ModuleMock y = new ModuleMock("y");
+ ModuleMock z = new ModuleMock("z");
+ manager.loadModule(x.getType(), x);
+ manager.loadModule(y.getType(), y);
+ manager.loadModule(z.getType(), z);
+
+ assertEquals(Arrays.asList(MODULE_TYPE_CORE, "x", "y", "z"), manager.getUsedModules());
+
+ // test order for used modules
+ manager.useModules("z", MODULE_TYPE_CORE);
+ assertEquals(Arrays.asList("z", MODULE_TYPE_CORE), manager.getUsedModules());
+
+ // test unmentioned modules are still loaded
+ Map<String, Module> expectedLoadedModules = new HashMap<>();
+ expectedLoadedModules.put(MODULE_TYPE_CORE, CoreModule.INSTANCE);
+ expectedLoadedModules.put("x", x);
+ expectedLoadedModules.put("y", y);
+ expectedLoadedModules.put("z", z);
+ assertEquals(expectedLoadedModules, manager.getLoadedModules());
+ }
+
+ @Test
+ public void testListModules() {
+ ModuleMock y = new ModuleMock("y");
+ ModuleMock z = new ModuleMock("z");
+ manager.loadModule("y", y);
+ manager.loadModule("z", z);
+ manager.useModules("z", "y");
+
+ assertEquals(Arrays.asList("z", "y"), manager.listModules());
+ }
+
+ @Test
+ public void testListFullModules() {
+ ModuleMock x = new ModuleMock("x");
+ ModuleMock y = new ModuleMock("y");
+ ModuleMock z = new ModuleMock("z");
+
+ manager.loadModule("y", y);
+ manager.loadModule("x", x);
+ manager.loadModule("z", z);
+ manager.useModules("z", "y");
+
+ assertEquals(
+ getExpectedModuleEntries(2, "z", "y", MODULE_TYPE_CORE, "x"),
+ manager.listFullModules());
+ }
+
+ @Test
+ public void testListFunctions() {
+ ModuleMock x = new ModuleMock("x");
+ manager.loadModule(x.getType(), x);
+
+ assertTrue(manager.listFunctions().contains("dummy"));
+
+ // should not return function name of an unused module
+ manager.useModules(MODULE_TYPE_CORE);
+ assertFalse(manager.listFunctions().contains("dummy"));
+ }
+
+ @Test
+ public void testGetFunctionDefinition() {
+ ModuleMock x = new ModuleMock("x");
+ manager.loadModule(x.getType(), x);
+
+ assertTrue(manager.getFunctionDefinition("dummy").isPresent());
+
+ // should not return function definition of an unused module
+ manager.useModules(MODULE_TYPE_CORE);
+ assertFalse(manager.getFunctionDefinition("dummy").isPresent());
+ }
+
+ private static List<ModuleEntry> getExpectedModuleEntries(int index, String... names) {
+ return IntStream.range(0, names.length)
+ .mapToObj(i -> new ModuleEntry(names[i], i < index))
+ .collect(Collectors.toList());
+ }
+}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ModuleMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ModuleMock.java
new file mode 100644
index 0000000..86f1626
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ModuleMock.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionKind;
+import org.apache.flink.table.module.Module;
+import org.apache.flink.table.types.inference.utils.FunctionDefinitionMock;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+/** Mocking {@link Module} for tests. */
+public class ModuleMock implements Module {
+ private static final Set<String> BUILT_IN_FUNCTIONS =
+ Collections.unmodifiableSet(new HashSet<>(Collections.singletonList("dummy")));
+ private final String type;
+ private final FunctionDefinitionMock functionDef;
+
+ public ModuleMock(String type) {
+ this.type = type;
+ functionDef = new FunctionDefinitionMock();
+ functionDef.functionKind = FunctionKind.OTHER;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ @Override
+ public Set<String> listFunctions() {
+ return BUILT_IN_FUNCTIONS;
+ }
+
+ @Override
+ public Optional<FunctionDefinition> getFunctionDefinition(String name) {
+ if (BUILT_IN_FUNCTIONS.contains(name)) {
+ return Optional.of(functionDef);
+ }
+ return Optional.empty();
+ }
+}
diff --git a/flink-table/flink-table-api-java/src/test/resources/log4j2-test.properties b/flink-table/flink-table-api-java/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..8bb9fe6
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index e304e3c..1fa57d2 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -34,7 +34,7 @@ import org.apache.flink.table.expressions.resolver.SqlExpressionResolver
import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup
import org.apache.flink.table.factories.{TableFactoryUtil, TableSinkFactoryContextImpl}
import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction, _}
-import org.apache.flink.table.module.{Module, ModuleManager}
+import org.apache.flink.table.module.{Module, ModuleEntry, ModuleManager}
import org.apache.flink.table.operations.ddl._
import org.apache.flink.table.operations.utils.OperationTreeBuilder
import org.apache.flink.table.operations.{CatalogQueryOperation, TableSourceQueryOperation, _}
@@ -45,7 +45,6 @@ import org.apache.flink.table.types.{AbstractDataType, DataType}
import org.apache.flink.table.util.JavaScalaConversionUtil
import org.apache.flink.table.utils.PrintUtils
import org.apache.flink.types.Row
-
import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.tools.FrameworkConfig
@@ -53,7 +52,6 @@ import org.apache.calcite.tools.FrameworkConfig
import _root_.java.lang.{Iterable => JIterable, Long => JLong}
import _root_.java.util.function.{Function => JFunction, Supplier => JSupplier}
import _root_.java.util.{Optional, Collections => JCollections, HashMap => JHashMap, List => JList, Map => JMap}
-
import _root_.scala.collection.JavaConversions._
import _root_.scala.collection.JavaConverters._
import _root_.scala.util.Try
@@ -283,6 +281,10 @@ abstract class TableEnvImpl(
moduleManager.loadModule(moduleName, module)
}
+ override def useModules(moduleNames: String*): Unit = {
+ moduleManager.useModules(moduleNames: _*)
+ }
+
override def unloadModule(moduleName: String): Unit = {
moduleManager.unloadModule(moduleName)
}
@@ -461,6 +463,10 @@ abstract class TableEnvImpl(
moduleManager.listModules().asScala.toArray
}
+ override def listFullModules(): Array[ModuleEntry] = {
+ moduleManager.listFullModules().asScala.toArray
+ }
+
override def listCatalogs(): Array[String] = {
catalogManager.listCatalogs
.asScala
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index 0f69ff3..7d6bb4c 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -20,14 +20,13 @@ package org.apache.flink.table.utils
import java.lang.{Iterable => JIterable}
import java.util.Optional
-
import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.table.api.{ExplainDetail, StatementSet, Table, TableConfig, TableEnvironment, TableResult}
import org.apache.flink.table.catalog.Catalog
import org.apache.flink.table.descriptors.{ConnectTableDescriptor, ConnectorDescriptor}
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.functions.{ScalarFunction, UserDefinedFunction}
-import org.apache.flink.table.module.Module
+import org.apache.flink.table.module.{Module, ModuleEntry}
import org.apache.flink.table.sources.TableSource
import org.apache.flink.table.types.AbstractDataType
@@ -47,6 +46,8 @@ class MockTableEnvironment extends TableEnvironment {
override def listModules(): Array[String] = ???
+ override def listFullModules(): Array[ModuleEntry] = ???
+
override def listDatabases(): Array[String] = ???
override def listTables(): Array[String] = ???
@@ -102,6 +103,8 @@ class MockTableEnvironment extends TableEnvironment {
override def loadModule(moduleName: String, module: Module): Unit = ???
+ override def useModules(moduleNames: String*): Unit = ???
+
override def unloadModule(moduleName: String): Unit = ???
override def createTemporaryView(