You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2021/02/22 06:13:44 UTC
[flink] branch master updated: [FLINK-21297][table] Support
'LOAD/UNLOAD MODULE' syntax
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e2e1121 [FLINK-21297][table] Support 'LOAD/UNLOAD MODULE' syntax
e2e1121 is described below
commit e2e11218516ad579bf2bd9c0fc5510f08cb6d243
Author: Jane <55...@users.noreply.github.com>
AuthorDate: Mon Feb 22 14:13:22 2021 +0800
[FLINK-21297][table] Support 'LOAD/UNLOAD MODULE' syntax
Support 'LOAD/UNLOAD MODULE' syntax both in SQL parser, TableEnvironment and SQL CLI.
This closes #14944
---
.../apache/flink/table/client/cli/CliClient.java | 12 +++
.../apache/flink/table/client/cli/CliStrings.java | 17 ++++
.../flink/table/client/cli/SqlCommandParser.java | 10 +++
.../table/client/cli/SqlCommandParserTest.java | 33 +++++---
.../client/gateway/local/ExecutionContextTest.java | 20 +++--
.../client/gateway/local/LocalExecutorITCase.java | 83 +++++++++++++++++++
.../src/main/codegen/data/Parser.tdd | 8 ++
.../src/main/codegen/includes/parserImpls.ftl | 41 ++++++++++
.../parser/hive/FlinkHiveSqlParserImplTest.java | 13 +++
.../src/main/codegen/data/Parser.tdd | 8 ++
.../src/main/codegen/includes/parserImpls.ftl | 41 ++++++++++
.../apache/flink/sql/parser/dql/SqlLoadModule.java | 95 ++++++++++++++++++++++
.../flink/sql/parser/dql/SqlUnloadModule.java | 70 ++++++++++++++++
.../flink/sql/parser/FlinkSqlParserImplTest.java | 26 +++++-
.../table/api/internal/TableEnvironmentImpl.java | 51 +++++++++++-
.../table/operations/LoadModuleOperation.java | 54 ++++++++++++
.../table/operations/UnloadModuleOperation.java | 39 +++++++++
.../table/factories/module/DummyModuleFactory.java | 66 +++++++++++++++
.../org.apache.flink.table.factories.TableFactory | 1 +
.../operations/SqlToOperationConverter.java | 25 ++++++
.../table/planner/calcite/FlinkPlannerImpl.scala | 4 +-
.../operations/SqlToOperationConverterTest.java | 30 +++++++
.../flink/table/api/TableEnvironmentTest.scala | 93 ++++++++++++++++++++-
23 files changed, 812 insertions(+), 28 deletions(-)
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index 201f6f0..826b96c 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -383,6 +383,18 @@ public class CliClient implements AutoCloseable {
case DROP_CATALOG:
callDdl(cmdCall.operands[0], CliStrings.MESSAGE_CATALOG_REMOVED);
break;
+ case LOAD_MODULE:
+ callDdl(
+ cmdCall.operands[0],
+ CliStrings.MESSAGE_LOAD_MODULE_SUCCEEDED,
+ CliStrings.MESSAGE_LOAD_MODULE_FAILED);
+ break;
+ case UNLOAD_MODULE:
+ callDdl(
+ cmdCall.operands[0],
+ CliStrings.MESSAGE_UNLOAD_MODULE_SUCCEEDED,
+ CliStrings.MESSAGE_UNLOAD_MODULE_FAILED);
+ break;
default:
throw new SqlClientException("Unsupported command: " + cmdCall.command);
}
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
index 37b130f..f90821a 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
@@ -104,6 +104,15 @@ public final class CliStrings {
formatCommand(
SqlCommand.USE,
"Sets the current default database. Experimental! Syntax: 'USE <name>;'"))
+ .append(
+ formatCommand(
+ SqlCommand.LOAD_MODULE,
+ "Load a module. Syntax: 'LOAD MODULE <name> [WITH ('<key1>' = "
+ + "'<value1>' [, '<key2>' = '<value2>', ...])];'"))
+ .append(
+ formatCommand(
+ SqlCommand.UNLOAD_MODULE,
+ "Unload a module. Syntax: 'UNLOAD MODULE <name>;'"))
.style(AttributedStyle.DEFAULT.underline())
.append("\nHint")
.style(AttributedStyle.DEFAULT)
@@ -220,6 +229,14 @@ public final class CliStrings {
public static final String MESSAGE_CATALOG_REMOVED = "Catalog has been removed.";
+ public static final String MESSAGE_LOAD_MODULE_SUCCEEDED = "Load module succeeded!";
+
+ public static final String MESSAGE_UNLOAD_MODULE_SUCCEEDED = "Unload module succeeded!";
+
+ public static final String MESSAGE_LOAD_MODULE_FAILED = "Load module failed!";
+
+ public static final String MESSAGE_UNLOAD_MODULE_FAILED = "Unload module failed!";
+
// --------------------------------------------------------------------------------------------
public static final String RESULT_TITLE = "SQL Query Result";
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
index 5ae3333..62376ab 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.DescribeTableOperation;
import org.apache.flink.table.operations.ExplainOperation;
+import org.apache.flink.table.operations.LoadModuleOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.ShowCatalogsOperation;
@@ -32,6 +33,7 @@ import org.apache.flink.table.operations.ShowDatabasesOperation;
import org.apache.flink.table.operations.ShowFunctionsOperation;
import org.apache.flink.table.operations.ShowPartitionsOperation;
import org.apache.flink.table.operations.ShowTablesOperation;
+import org.apache.flink.table.operations.UnloadModuleOperation;
import org.apache.flink.table.operations.UseCatalogOperation;
import org.apache.flink.table.operations.UseDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
@@ -168,6 +170,10 @@ public final class SqlCommandParser {
cmd = SqlCommand.ALTER_FUNCTION;
} else if (operation instanceof ExplainOperation) {
cmd = SqlCommand.EXPLAIN;
+ } else if (operation instanceof LoadModuleOperation) {
+ cmd = SqlCommand.LOAD_MODULE;
+ } else if (operation instanceof UnloadModuleOperation) {
+ cmd = SqlCommand.UNLOAD_MODULE;
} else if (operation instanceof DescribeTableOperation) {
cmd = SqlCommand.DESCRIBE;
operands =
@@ -252,6 +258,10 @@ public final class SqlCommandParser {
// FLINK-17396
SHOW_MODULES("SHOW\\s+MODULES", NO_OPERANDS),
+ LOAD_MODULE,
+
+ UNLOAD_MODULE,
+
SHOW_PARTITIONS,
USE_CATALOG,
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
index 62a087a..75cfbcc 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
@@ -282,19 +282,28 @@ public class SqlCommandParserTest {
// show current database
TestItem.validSql(
"show current database", SqlCommand.SHOW_CURRENT_DATABASE),
+ // load module with module name as identifier
TestItem.validSql(
- "show current database", SqlCommand.SHOW_CURRENT_DATABASE),
- // show tables
- TestItem.validSql("SHOW TABLES;", SqlCommand.SHOW_TABLES),
- TestItem.validSql(" SHOW TABLES ;", SqlCommand.SHOW_TABLES),
- // show functions
- TestItem.validSql("SHOW FUNCTIONS;", SqlCommand.SHOW_FUNCTIONS),
- TestItem.validSql(" SHOW FUNCTIONS ", SqlCommand.SHOW_FUNCTIONS),
- // show modules
- TestItem.validSql("SHOW MODULES", SqlCommand.SHOW_MODULES)
- .cannotParseComment(),
- TestItem.validSql(" SHOW MODULES ", SqlCommand.SHOW_MODULES)
- .cannotParseComment(),
+ "LOAD MODULE dummy", SqlCommand.LOAD_MODULE, "LOAD MODULE dummy"),
+ // load module with module name as reversed keyword
+ TestItem.validSql(
+ "LOAD MODULE `MODULE`",
+ SqlCommand.LOAD_MODULE,
+ "LOAD MODULE `MODULE`"),
+ // load module with module name as literal
+ TestItem.invalidSql(
+ "LOAD MODULE 'dummy'",
+ SqlExecutionException.class,
+ "Encountered \"\\'dummy\\'\""),
+ TestItem.validSql(
+ "LOAD MODULE dummy WITH ('dummy-version' = '1')",
+ SqlCommand.LOAD_MODULE,
+ "LOAD MODULE dummy WITH ('dummy-version' = '1')"),
+ // unload module
+ TestItem.validSql(
+ "UNLOAD MODULE dummy",
+ SqlCommand.UNLOAD_MODULE,
+ "UNLOAD MODULE dummy"),
// Test create function.
TestItem.invalidSql(
"CREATE FUNCTION ",
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
index 04b3bec..4bfe17e 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
@@ -97,7 +97,7 @@ import static org.powermock.api.mockito.PowerMockito.when;
public class ExecutionContextTest {
private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-client-defaults.yaml";
- private static final String MODULES_ENVIRONMENT_FILE = "test-sql-client-modules.yaml";
+ public static final String MODULES_ENVIRONMENT_FILE = "test-sql-client-modules.yaml";
public static final String CATALOGS_ENVIRONMENT_FILE = "test-sql-client-catalogs.yaml";
private static final String STREAMING_ENVIRONMENT_FILE = "test-sql-client-streaming.yaml";
private static final String CONFIGURATION_ENVIRONMENT_FILE =
@@ -413,19 +413,23 @@ public class ExecutionContextTest {
return replaceVars;
}
+ static Map<String, String> createModuleReplaceVars() {
+ Map<String, String> replaceVars = new HashMap<>();
+ replaceVars.put("$VAR_PLANNER", "blink");
+ replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
+ replaceVars.put("$VAR_RESULT_MODE", "changelog");
+ replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
+ replaceVars.put("$VAR_MAX_ROWS", "100");
+ return replaceVars;
+ }
+
private <T> ExecutionContext<T> createDefaultExecutionContext() throws Exception {
final Map<String, String> replaceVars = createDefaultReplaceVars();
return createExecutionContext(DEFAULTS_ENVIRONMENT_FILE, replaceVars);
}
private <T> ExecutionContext<T> createModuleExecutionContext() throws Exception {
- final Map<String, String> replaceVars = new HashMap<>();
- replaceVars.put("$VAR_PLANNER", "old");
- replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
- replaceVars.put("$VAR_RESULT_MODE", "changelog");
- replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
- replaceVars.put("$VAR_MAX_ROWS", "100");
- return createExecutionContext(MODULES_ENVIRONMENT_FILE, replaceVars);
+ return createExecutionContext(MODULES_ENVIRONMENT_FILE, createModuleReplaceVars());
}
private <T> ExecutionContext<T> createCatalogExecutionContext() throws Exception {
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 4e03d91..449217b 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -87,7 +87,10 @@ import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
import static org.apache.flink.table.client.gateway.local.ExecutionContextTest.CATALOGS_ENVIRONMENT_FILE;
+import static org.apache.flink.table.client.gateway.local.ExecutionContextTest.MODULES_ENVIRONMENT_FILE;
+import static org.apache.flink.table.client.gateway.local.ExecutionContextTest.createModuleReplaceVars;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.CoreMatchers.is;
@@ -1618,6 +1621,86 @@ public class LocalExecutorITCase extends TestLogger {
executor.closeSession(sessionId);
}
+ @Test
+ public void testLoadModuleWithModuleConfEnabled() throws Exception {
+ // only blink planner supports LOAD MODULE syntax
+ Assume.assumeTrue(planner.equals("blink"));
+ final Executor executor =
+ createModifiedExecutor(
+ MODULES_ENVIRONMENT_FILE, clusterClient, createModuleReplaceVars());
+ final SessionContext session = new SessionContext("test-session", new Environment());
+ String sessionId = executor.openSession(session);
+ assertEquals("test-session", sessionId);
+
+ assertThrows(
+ "Could not execute statement: load module core",
+ SqlExecutionException.class,
+ () -> executor.executeSql(sessionId, "load module core"));
+
+ executor.executeSql(sessionId, "load module hive");
+ assertEquals(
+ executor.listModules(sessionId),
+ Arrays.asList("core", "mymodule", "myhive", "myhive2", "hive"));
+ }
+
+ @Test
+ public void testUnloadModuleWithModuleConfEnabled() throws Exception {
+ // only blink planner supports UNLOAD MODULE syntax
+ Assume.assumeTrue(planner.equals("blink"));
+ final Executor executor =
+ createModifiedExecutor(
+ MODULES_ENVIRONMENT_FILE, clusterClient, createModuleReplaceVars());
+ final SessionContext session = new SessionContext("test-session", new Environment());
+ String sessionId = executor.openSession(session);
+ assertEquals("test-session", sessionId);
+
+ executor.executeSql(sessionId, "unload module mymodule");
+ assertEquals(executor.listModules(sessionId), Arrays.asList("core", "myhive", "myhive2"));
+
+ exception.expect(SqlExecutionException.class);
+ exception.expectMessage("Could not execute statement: unload module mymodule");
+ executor.executeSql(sessionId, "unload module mymodule");
+ }
+
+ @Test
+ public void testHiveBuiltInFunctionWithHiveModuleEnabled() throws Exception {
+ // only blink planner supports LOAD MODULE syntax
+ Assume.assumeTrue(planner.equals("blink"));
+
+ final URL url = getClass().getClassLoader().getResource("test-data.csv");
+ Objects.requireNonNull(url);
+ final Map<String, String> replaceVars = new HashMap<>();
+ replaceVars.put("$VAR_PLANNER", planner);
+ replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
+ replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
+ replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
+ replaceVars.put("$VAR_MAX_ROWS", "100");
+ replaceVars.put("$VAR_RESULT_MODE", "table");
+
+ final Executor executor = createModifiedExecutor(clusterClient, replaceVars);
+ final SessionContext session = new SessionContext("test-session", new Environment());
+ String sessionId = executor.openSession(session);
+ assertEquals("test-session", sessionId);
+
+ // cannot use hive built-in function without loading hive module
+ assertThrows(
+ "Could not execute statement: select substring_index('www.apache.org', '.', 2) from TableNumber1",
+ SqlExecutionException.class,
+ () ->
+ executor.executeSql(
+ sessionId,
+ "select substring_index('www.apache.org', '.', 2) from TableNumber1"));
+
+ executor.executeSql(sessionId, "load module hive");
+ assertEquals(executor.listModules(sessionId), Arrays.asList("core", "hive"));
+
+ assertShowResult(
+ executor.executeSql(
+ sessionId,
+ "select substring_index('www.apache.org', '.', 2) from TableNumber1"),
+ hasItems("www.apache"));
+ }
+
private void executeStreamQueryTable(
Map<String, String> replaceVars, String query, List<String> expectedResults)
throws Exception {
diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd
index 57101a3..4ba2efd 100644
--- a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd
@@ -78,6 +78,7 @@
"org.apache.flink.sql.parser.dml.RichSqlInsertKeyword"
"org.apache.flink.sql.parser.dql.SqlDescribeCatalog"
"org.apache.flink.sql.parser.dql.SqlDescribeDatabase"
+ "org.apache.flink.sql.parser.dql.SqlLoadModule"
"org.apache.flink.sql.parser.dql.SqlShowCatalogs"
"org.apache.flink.sql.parser.dql.SqlShowCurrentCatalog"
"org.apache.flink.sql.parser.dql.SqlShowDatabases"
@@ -86,6 +87,7 @@
"org.apache.flink.sql.parser.dql.SqlShowTables"
"org.apache.flink.sql.parser.dql.SqlShowPartitions"
"org.apache.flink.sql.parser.dql.SqlRichDescribeTable"
+ "org.apache.flink.sql.parser.dql.SqlUnloadModule"
"org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec"
"org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec"
"org.apache.flink.sql.parser.type.SqlMapTypeNameSpec"
@@ -125,6 +127,7 @@
"ITEMS"
"KEYS"
"LINES"
+ "LOAD"
"LOCATION"
"NORELY"
"NOVALIDATE"
@@ -145,6 +148,7 @@
"TABLES"
"TBLPROPERTIES"
"TERMINATED"
+ "UNLOAD"
"USE"
"VALIDATE"
]
@@ -272,6 +276,7 @@
"LENGTH"
"LEVEL"
"LIBRARY"
+ "LOAD"
"LOCATOR"
"M"
"MAP"
@@ -443,6 +448,7 @@
"UNCOMMITTED"
"UNCONDITIONAL"
"UNDER"
+ "UNLOAD"
"UNNAMED"
"USAGE"
"USER_DEFINED_TYPE_CATALOG"
@@ -510,6 +516,7 @@
# Example: SqlShowDatabases(), SqlShowTables().
statementParserMethods: [
"RichSqlInsert()"
+ "SqlLoadModule()"
"SqlShowCatalogs()"
"SqlShowCurrentCatalogOrDatabase()"
"SqlDescribeCatalog()"
@@ -524,6 +531,7 @@
"SqlAlterTable()"
"SqlAlterView()"
"SqlShowPartitions()"
+ "SqlUnloadModule()"
]
# List of methods for parsing custom literals.
diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
index f8a69a3..6646d6b 100644
--- a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
@@ -1510,3 +1510,44 @@ SqlShowPartitions SqlShowPartitions() :
[ <PARTITION> { partitionSpec = new SqlNodeList(getPos()); PartitionSpecCommaList(new SqlNodeList(getPos()), partitionSpec); } ]
{ return new SqlShowPartitions(pos, tableIdentifier, partitionSpec); }
}
+
+/**
+* Parses a load module statement.
+* LOAD MODULE module_name [WITH (property_name=property_value, ...)];
+*/
+SqlLoadModule SqlLoadModule() :
+{
+ SqlParserPos startPos;
+ SqlIdentifier moduleName;
+ SqlNodeList propertyList = SqlNodeList.EMPTY;
+}
+{
+ <LOAD> <MODULE> { startPos = getPos(); }
+ moduleName = SimpleIdentifier()
+ [
+ <WITH>
+ propertyList = TableProperties()
+ ]
+ {
+ return new SqlLoadModule(startPos.plus(getPos()),
+ moduleName,
+ propertyList);
+ }
+}
+
+/**
+* Parses an unload module statement.
+* UNLOAD MODULE module_name;
+*/
+SqlUnloadModule SqlUnloadModule() :
+{
+ SqlParserPos startPos;
+ SqlIdentifier moduleName;
+}
+{
+ <UNLOAD> <MODULE> { startPos = getPos(); }
+ moduleName = SimpleIdentifier()
+ {
+ return new SqlUnloadModule(startPos.plus(getPos()), moduleName);
+ }
+}
diff --git a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
index 9d41e68..0b566d1 100644
--- a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
@@ -441,4 +441,17 @@ public class FlinkHiveSqlParserImplTest extends SqlParserTest {
sql("show partitions tbl").ok("SHOW PARTITIONS `TBL`");
sql("show partitions tbl partition (p=1)").ok("SHOW PARTITIONS `TBL` PARTITION (`P` = 1)");
}
+
+ @Test
+ public void testLoadModule() {
+ sql("load module hive").ok("LOAD MODULE `HIVE`");
+
+ sql("load module hive with ('hive-version' = '3.1.2')")
+ .ok("LOAD MODULE `HIVE` WITH (\n 'hive-version' = '3.1.2'\n)");
+ }
+
+ @Test
+ public void testUnloadModule() {
+ sql("unload module hive").ok("UNLOAD MODULE `HIVE`");
+ }
}
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 0a086e9..1c9c865 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -57,6 +57,7 @@
"org.apache.flink.sql.parser.dml.RichSqlInsertKeyword"
"org.apache.flink.sql.parser.dql.SqlDescribeCatalog"
"org.apache.flink.sql.parser.dql.SqlDescribeDatabase"
+ "org.apache.flink.sql.parser.dql.SqlLoadModule"
"org.apache.flink.sql.parser.dql.SqlShowCatalogs"
"org.apache.flink.sql.parser.dql.SqlShowCurrentCatalog"
"org.apache.flink.sql.parser.dql.SqlShowDatabases"
@@ -65,6 +66,7 @@
"org.apache.flink.sql.parser.dql.SqlShowTables"
"org.apache.flink.sql.parser.dql.SqlShowViews"
"org.apache.flink.sql.parser.dql.SqlRichDescribeTable"
+ "org.apache.flink.sql.parser.dql.SqlUnloadModule"
"org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec"
"org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec"
"org.apache.flink.sql.parser.type.SqlMapTypeNameSpec"
@@ -91,6 +93,7 @@
"EXTENDED"
"FUNCTIONS"
"IF"
+ "LOAD"
"METADATA"
"OVERWRITE"
"OVERWRITING"
@@ -102,6 +105,7 @@
"SCALA"
"STRING"
"TABLES"
+ "UNLOAD"
"USE"
"VIEWS"
"VIRTUAL"
@@ -233,6 +237,7 @@
"LENGTH"
"LEVEL"
"LIBRARY"
+ "LOAD"
"LOCATOR"
"M"
"MAP"
@@ -405,6 +410,7 @@
"UNCOMMITTED"
"UNCONDITIONAL"
"UNDER"
+ "UNLOAD"
"UNNAMED"
"USAGE"
"USER_DEFINED_TYPE_CATALOG"
@@ -450,6 +456,7 @@
# Example: SqlShowDatabases(), SqlShowTables().
statementParserMethods: [
"RichSqlInsert()"
+ "SqlLoadModule()"
"SqlShowCatalogs()"
"SqlShowCurrentCatalogOrDatabase()"
"SqlDescribeCatalog()"
@@ -464,6 +471,7 @@
"SqlRichDescribeTable()"
"SqlAlterTable()"
"SqlShowViews()"
+ "SqlUnloadModule()"
]
# List of methods for parsing custom literals.
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 5281c1e..a9cacfc 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -1431,3 +1431,44 @@ SqlDrop SqlDropExtended(Span s, boolean replace) :
return drop;
}
}
+
+/**
+* Parses a load module statement.
+* LOAD MODULE module_name [WITH (property_name=property_value, ...)];
+*/
+SqlLoadModule SqlLoadModule() :
+{
+ SqlParserPos startPos;
+ SqlIdentifier moduleName;
+ SqlNodeList propertyList = SqlNodeList.EMPTY;
+}
+{
+ <LOAD> <MODULE> { startPos = getPos(); }
+ moduleName = SimpleIdentifier()
+ [
+ <WITH>
+ propertyList = TableProperties()
+ ]
+ {
+ return new SqlLoadModule(startPos.plus(getPos()),
+ moduleName,
+ propertyList);
+ }
+}
+
+/**
+* Parses an unload module statement.
+* UNLOAD MODULE module_name;
+*/
+SqlUnloadModule SqlUnloadModule() :
+{
+ SqlParserPos startPos;
+ SqlIdentifier moduleName;
+}
+{
+ <UNLOAD> <MODULE> { startPos = getPos(); }
+ moduleName = SimpleIdentifier()
+ {
+ return new SqlUnloadModule(startPos.plus(getPos()), moduleName);
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlLoadModule.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlLoadModule.java
new file mode 100644
index 0000000..9f28fa4
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlLoadModule.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/** LOAD MODULE sql call. */
+public class SqlLoadModule extends SqlCall {
+ public static final SqlSpecialOperator OPERATOR =
+ new SqlSpecialOperator("LOAD MODULE", SqlKind.OTHER);
+
+ private final SqlIdentifier moduleName;
+
+ private final SqlNodeList propertyList;
+
+ public SqlLoadModule(SqlParserPos pos, SqlIdentifier moduleType, SqlNodeList propertyList) {
+ super(pos);
+ this.moduleName = requireNonNull(moduleType, "moduleName cannot be null");
+ this.propertyList = requireNonNull(propertyList, "propertyList cannot be null");
+ }
+
+ @Override
+ public SqlOperator getOperator() {
+ return OPERATOR;
+ }
+
+ @Override
+ public List<SqlNode> getOperandList() {
+ return ImmutableNullableList.of(moduleName, propertyList);
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ writer.keyword("LOAD MODULE");
+ moduleName.unparse(writer, leftPrec, rightPrec);
+
+ if (this.propertyList.size() > 0) {
+ writer.keyword("WITH");
+ SqlWriter.Frame withFrame = writer.startList("(", ")");
+ for (SqlNode property : propertyList) {
+ printIndent(writer);
+ property.unparse(writer, leftPrec, rightPrec);
+ }
+ writer.newlineAndIndent();
+ writer.endList(withFrame);
+ }
+ }
+
+ private void printIndent(SqlWriter writer) {
+ writer.sep(",", false);
+ writer.newlineAndIndent();
+ writer.print(" ");
+ }
+
+ public SqlIdentifier getModuleName() {
+ return moduleName;
+ }
+
+ public SqlNodeList getPropertyList() {
+ return propertyList;
+ }
+
+ public String moduleName() {
+ return moduleName.getSimple();
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlUnloadModule.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlUnloadModule.java
new file mode 100644
index 0000000..a5fe601
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlUnloadModule.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+
+/** UNLOAD MODULE sql call. */
+public class SqlUnloadModule extends SqlCall {
+
+ public static final SqlSpecialOperator OPERATOR =
+ new SqlSpecialOperator("UNLOAD MODULE", SqlKind.OTHER);
+
+ private final SqlIdentifier moduleName;
+
+ public SqlUnloadModule(SqlParserPos pos, SqlIdentifier moduleName) {
+ super(pos);
+ this.moduleName = moduleName;
+ }
+
+ @Override
+ public SqlOperator getOperator() {
+ return OPERATOR;
+ }
+
+ @Override
+ public List<SqlNode> getOperandList() {
+ return ImmutableNullableList.of(moduleName);
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ writer.keyword("UNLOAD");
+ writer.keyword("MODULE");
+ moduleName.unparse(writer, leftPrec, rightPrec);
+ }
+
+ public SqlIdentifier getModuleName() {
+ return moduleName;
+ }
+
+ public String moduleName() {
+ return moduleName.getSimple();
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index e166a6a..a9e2bf7 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -175,7 +175,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
}
@Test
- public void testShowFuntions() {
+ public void testShowFunctions() {
sql("show functions").ok("SHOW FUNCTIONS");
sql("show functions db1").ok("SHOW FUNCTIONS `DB1`");
sql("show functions catalog1.db1").ok("SHOW FUNCTIONS `CATALOG1`.`DB1`");
@@ -1153,6 +1153,30 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
.ok("DROP TEMPORARY SYSTEM FUNCTION IF EXISTS `CATALOG1`.`DB1`.`FUNCTION1`");
}
+ @Test
+ public void testLoadModule() {
+ sql("load module core").ok("LOAD MODULE `CORE`");
+
+ sql("load module dummy with ('k1' = 'v1', 'k2' = 'v2')")
+ .ok(
+ "LOAD MODULE `DUMMY`"
+ + " WITH (\n"
+ + " 'k1' = 'v1',\n"
+ + " 'k2' = 'v2'\n"
+ + ")");
+
+ sql("load module ^'core'^")
+ .fails("(?s).*Encountered \"\\\\'core\\\\'\" at line 1, column 13.\n.*");
+ }
+
+ @Test
+ public void testUnloadModule() {
+ sql("unload module core").ok("UNLOAD MODULE `CORE`");
+
+ sql("unload module ^'core'^")
+ .fails("(?s).*Encountered \"\\\\'core\\\\'\" at line 1, column 15.\n.*");
+ }
+
public static BaseMatcher<SqlNode> validated(String validatedSql) {
return new TypeSafeDiagnosingMatcher<SqlNode>() {
@Override
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index c4d254a..7f6e888 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -74,6 +74,7 @@ import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.ComponentFactoryService;
+import org.apache.flink.table.factories.ModuleFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
@@ -85,6 +86,7 @@ import org.apache.flink.table.operations.CatalogQueryOperation;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.DescribeTableOperation;
import org.apache.flink.table.operations.ExplainOperation;
+import org.apache.flink.table.operations.LoadModuleOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
@@ -98,6 +100,7 @@ import org.apache.flink.table.operations.ShowPartitionsOperation;
import org.apache.flink.table.operations.ShowTablesOperation;
import org.apache.flink.table.operations.ShowViewsOperation;
import org.apache.flink.table.operations.TableSourceQueryOperation;
+import org.apache.flink.table.operations.UnloadModuleOperation;
import org.apache.flink.table.operations.UseCatalogOperation;
import org.apache.flink.table.operations.UseDatabaseOperation;
import org.apache.flink.table.operations.ddl.AddPartitionsOperation;
@@ -148,6 +151,8 @@ import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
+import static org.apache.flink.table.descriptors.ModuleDescriptorValidator.MODULE_TYPE;
+
/**
* Implementation of {@link TableEnvironment} that works exclusively with Table API interfaces. Only
* {@link TableSource} is supported as an input and {@link TableSink} as an output. It also does not
@@ -175,13 +180,13 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
"Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type "
+ "INSERT, CREATE TABLE, DROP TABLE, ALTER TABLE, USE CATALOG, USE [CATALOG.]DATABASE, "
+ "CREATE DATABASE, DROP DATABASE, ALTER DATABASE, CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, "
- + "CREATE CATALOG, DROP CATALOG, CREATE VIEW, DROP VIEW.";
+ + "CREATE CATALOG, DROP CATALOG, CREATE VIEW, DROP VIEW, LOAD MODULE, UNLOAD MODULE.";
private static final String UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG =
"Unsupported SQL query! executeSql() only accepts a single SQL statement of type "
+ "CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, "
+ "CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, CREATE CATALOG, DROP CATALOG, "
+ "USE CATALOG, USE [CATALOG.]DATABASE, SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW FUNCTIONS, SHOW PARTITIONS"
- + "CREATE VIEW, DROP VIEW, SHOW VIEWS, INSERT, DESCRIBE.";
+ + "CREATE VIEW, DROP VIEW, SHOW VIEWS, INSERT, DESCRIBE, LOAD MODULE, UNLOAD MODULE.";
/** Provides necessary methods for {@link ConnectTableDescriptor}. */
private final Registration registration =
@@ -780,7 +785,9 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
|| operation instanceof CreateCatalogOperation
|| operation instanceof DropCatalogOperation
|| operation instanceof UseCatalogOperation
- || operation instanceof UseDatabaseOperation) {
+ || operation instanceof UseDatabaseOperation
+ || operation instanceof LoadModuleOperation
+ || operation instanceof UnloadModuleOperation) {
executeOperation(operation);
} else {
throw new TableException(UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG);
@@ -1052,6 +1059,10 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
} catch (CatalogException e) {
throw new ValidationException(exMsg, e);
}
+ } else if (operation instanceof LoadModuleOperation) {
+ return loadModule((LoadModuleOperation) operation);
+ } else if (operation instanceof UnloadModuleOperation) {
+ return unloadModule((UnloadModuleOperation) operation);
} else if (operation instanceof UseCatalogOperation) {
UseCatalogOperation useCatalogOperation = (UseCatalogOperation) operation;
catalogManager.setCurrentCatalog(useCatalogOperation.getCatalogName());
@@ -1151,6 +1162,40 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
}
}
+ private TableResult loadModule(LoadModuleOperation operation) {
+ String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());
+ try {
+ // find module by name
+ Map<String, String> properties = new HashMap<>(operation.getProperties());
+ if (properties.containsKey(MODULE_TYPE)) {
+ throw new ValidationException(
+ String.format(
+ "Property 'type' = '%s' is not supported since module name "
+ + "is used to find module",
+ properties.get(MODULE_TYPE)));
+ }
+ properties.put(MODULE_TYPE, operation.getModuleName());
+ final ModuleFactory factory =
+ TableFactoryService.find(ModuleFactory.class, properties, userClassLoader);
+ moduleManager.loadModule(operation.getModuleName(), factory.createModule(properties));
+ return TableResultImpl.TABLE_RESULT_OK;
+ } catch (ValidationException e) {
+ throw new ValidationException(String.format("%s. %s", exMsg, e.getMessage()), e);
+ } catch (Exception e) {
+ throw new TableException(String.format("%s. %s", exMsg, e.getMessage()), e);
+ }
+ }
+
+ private TableResult unloadModule(UnloadModuleOperation operation) {
+ String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());
+ try {
+ moduleManager.unloadModule(operation.getModuleName());
+ return TableResultImpl.TABLE_RESULT_OK;
+ } catch (ValidationException e) {
+ throw new ValidationException(String.format("%s. %s", exMsg, e.getMessage()), e);
+ }
+ }
+
private TableResult buildShowResult(String columnName, String[] objects) {
return buildResult(
new String[] {columnName},
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/LoadModuleOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/LoadModuleOperation.java
new file mode 100644
index 0000000..2409ca0
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/LoadModuleOperation.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Operation to describe a LOAD MODULE statement. */
+public class LoadModuleOperation implements Operation {
+ private final String moduleName;
+
+ private final Map<String, String> properties;
+
+ public LoadModuleOperation(String moduleName, Map<String, String> properties) {
+ this.moduleName = checkNotNull(moduleName);
+ this.properties = checkNotNull(properties);
+ }
+
+ public String getModuleName() {
+ return moduleName;
+ }
+
+ public Map<String, String> getProperties() {
+ return Collections.unmodifiableMap(properties);
+ }
+
+ @Override
+ public String asSummaryString() {
+ Map<String, Object> params = new LinkedHashMap<>();
+ params.put("moduleName", moduleName);
+ params.put("properties", properties);
+ return OperationUtils.formatWithChildren(
+ "LOAD MODULE", params, Collections.emptyList(), Operation::asSummaryString);
+ }
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UnloadModuleOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UnloadModuleOperation.java
new file mode 100644
index 0000000..46dc8bb
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UnloadModuleOperation.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Operation to describe an UNLOAD MODULE statement. */
+public class UnloadModuleOperation implements Operation {
+ private final String moduleName;
+
+ public UnloadModuleOperation(String moduleName) {
+ this.moduleName = checkNotNull(moduleName);
+ }
+
+ public String getModuleName() {
+ return moduleName;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return String.format("UNLOAD MODULE %s", moduleName);
+ }
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/module/DummyModuleFactory.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/module/DummyModuleFactory.java
new file mode 100644
index 0000000..65e6100
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/module/DummyModuleFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.module;
+
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.ModuleDescriptorValidator;
+import org.apache.flink.table.factories.ModuleFactory;
+import org.apache.flink.table.module.Module;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.descriptors.ModuleDescriptorValidator.MODULE_TYPE;
+
+/** Test implementation for {@link ModuleFactory}. */
+public class DummyModuleFactory implements ModuleFactory {
+
+ @Override
+ public Module createModule(Map<String, String> properties) {
+ return new Module() {};
+ }
+
+ @Override
+ public Map<String, String> requiredContext() {
+ Map<String, String> context = new HashMap<>();
+ context.put(MODULE_TYPE, DummyModuleDescriptorValidator.MODULE_TYPE_DUMMY);
+
+ return context;
+ }
+
+ @Override
+ public List<String> supportedProperties() {
+ return Collections.singletonList(DummyModuleDescriptorValidator.MODULE_DUMMY_VERSION);
+ }
+
+ /** Test implementation for {@link ModuleDescriptorValidator}. */
+ public static class DummyModuleDescriptorValidator extends ModuleDescriptorValidator {
+ public static final String MODULE_TYPE_DUMMY = "dummy";
+ public static final String MODULE_DUMMY_VERSION = "dummy-version";
+
+ @Override
+ public void validate(DescriptorProperties properties) {
+ super.validate(properties);
+ properties.validateValue(MODULE_TYPE, MODULE_TYPE_DUMMY, false);
+ properties.validateString(MODULE_DUMMY_VERSION, true, 1);
+ }
+ }
+}
diff --git a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index 1094713..625488d 100644
--- a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -13,4 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+org.apache.flink.table.factories.module.DummyModuleFactory
org.apache.flink.table.factories.TestTableSinkFactory
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index dfed888..1c54d4f 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -48,6 +48,7 @@ import org.apache.flink.sql.parser.ddl.SqlUseCatalog;
import org.apache.flink.sql.parser.ddl.SqlUseDatabase;
import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
import org.apache.flink.sql.parser.dml.RichSqlInsert;
+import org.apache.flink.sql.parser.dql.SqlLoadModule;
import org.apache.flink.sql.parser.dql.SqlRichDescribeTable;
import org.apache.flink.sql.parser.dql.SqlShowCatalogs;
import org.apache.flink.sql.parser.dql.SqlShowCurrentCatalog;
@@ -57,6 +58,7 @@ import org.apache.flink.sql.parser.dql.SqlShowFunctions;
import org.apache.flink.sql.parser.dql.SqlShowPartitions;
import org.apache.flink.sql.parser.dql.SqlShowTables;
import org.apache.flink.sql.parser.dql.SqlShowViews;
+import org.apache.flink.sql.parser.dql.SqlUnloadModule;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
@@ -80,6 +82,7 @@ import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.DescribeTableOperation;
import org.apache.flink.table.operations.ExplainOperation;
+import org.apache.flink.table.operations.LoadModuleOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ShowCatalogsOperation;
import org.apache.flink.table.operations.ShowCurrentCatalogOperation;
@@ -89,6 +92,7 @@ import org.apache.flink.table.operations.ShowFunctionsOperation;
import org.apache.flink.table.operations.ShowPartitionsOperation;
import org.apache.flink.table.operations.ShowTablesOperation;
import org.apache.flink.table.operations.ShowViewsOperation;
+import org.apache.flink.table.operations.UnloadModuleOperation;
import org.apache.flink.table.operations.UseCatalogOperation;
import org.apache.flink.table.operations.UseDatabaseOperation;
import org.apache.flink.table.operations.ddl.AddPartitionsOperation;
@@ -193,11 +197,15 @@ public class SqlToOperationConverter {
return Optional.of(converter.convertCreateCatalog((SqlCreateCatalog) validated));
} else if (validated instanceof SqlDropCatalog) {
return Optional.of(converter.convertDropCatalog((SqlDropCatalog) validated));
+ } else if (validated instanceof SqlLoadModule) {
+ return Optional.of(converter.convertLoadModule((SqlLoadModule) validated));
} else if (validated instanceof SqlShowCatalogs) {
return Optional.of(converter.convertShowCatalogs((SqlShowCatalogs) validated));
} else if (validated instanceof SqlShowCurrentCatalog) {
return Optional.of(
converter.convertShowCurrentCatalog((SqlShowCurrentCatalog) validated));
+ } else if (validated instanceof SqlUnloadModule) {
+ return Optional.of(converter.convertUnloadModule((SqlUnloadModule) validated));
} else if (validated instanceof SqlUseCatalog) {
return Optional.of(converter.convertUseCatalog((SqlUseCatalog) validated));
} else if (validated instanceof SqlCreateDatabase) {
@@ -864,6 +872,23 @@ public class SqlToOperationConverter {
return new DescribeTableOperation(identifier, sqlRichDescribeTable.isExtended());
}
+ /** Convert LOAD MODULE statement. */
+ private Operation convertLoadModule(SqlLoadModule sqlLoadModule) {
+ String moduleName = sqlLoadModule.moduleName();
+ Map<String, String> properties = new HashMap<>();
+ for (SqlNode node : sqlLoadModule.getPropertyList().getList()) {
+ SqlTableOption option = (SqlTableOption) node;
+ properties.put(option.getKeyString(), option.getValueString());
+ }
+ return new LoadModuleOperation(moduleName, properties);
+ }
+
+ /** Convert UNLOAD MODULE statement. */
+ private Operation convertUnloadModule(SqlUnloadModule sqlUnloadModule) {
+ String moduleName = sqlUnloadModule.moduleName();
+ return new UnloadModuleOperation(moduleName);
+ }
+
/** Fallback method for sql query. */
private Operation convertSqlQuery(SqlNode node) {
return toQueryOperation(flinkPlanner, node);
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index 38adb91..4b7e5f8 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -127,6 +127,7 @@ class FlinkPlannerImpl(
|| sqlNode.getKind == SqlKind.CREATE_FUNCTION
|| sqlNode.getKind == SqlKind.DROP_FUNCTION
|| sqlNode.getKind == SqlKind.OTHER_DDL
+ || sqlNode.isInstanceOf[SqlLoadModule]
|| sqlNode.isInstanceOf[SqlShowCatalogs]
|| sqlNode.isInstanceOf[SqlShowCurrentCatalog]
|| sqlNode.isInstanceOf[SqlShowDatabases]
@@ -135,7 +136,8 @@ class FlinkPlannerImpl(
|| sqlNode.isInstanceOf[SqlShowFunctions]
|| sqlNode.isInstanceOf[SqlShowViews]
|| sqlNode.isInstanceOf[SqlShowPartitions]
- || sqlNode.isInstanceOf[SqlRichDescribeTable]) {
+ || sqlNode.isInstanceOf[SqlRichDescribeTable]
+ || sqlNode.isInstanceOf[SqlUnloadModule]) {
return sqlNode
}
sqlNode match {
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index 58cde58..d165f20 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -47,7 +47,9 @@ import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.LoadModuleOperation;
import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.UnloadModuleOperation;
import org.apache.flink.table.operations.UseCatalogOperation;
import org.apache.flink.table.operations.UseDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
@@ -291,6 +293,34 @@ public class SqlToOperationConverterTest {
}
@Test
+ public void testLoadModule() {
+ final String sql = "LOAD MODULE dummy WITH ('k1' = 'v1', 'k2' = 'v2')";
+ final String expectedModuleName = "dummy";
+ final Map<String, String> expectedProperties = new HashMap<>();
+ expectedProperties.put("k1", "v1");
+ expectedProperties.put("k2", "v2");
+
+ Operation operation = parse(sql, SqlDialect.DEFAULT);
+ assert operation instanceof LoadModuleOperation;
+ final LoadModuleOperation loadModuleOperation = (LoadModuleOperation) operation;
+
+ assertEquals(expectedModuleName, loadModuleOperation.getModuleName());
+ assertEquals(expectedProperties, loadModuleOperation.getProperties());
+ }
+
+ @Test
+ public void testUnloadModule() {
+ final String sql = "UNLOAD MODULE dummy";
+ final String expectedModuleName = "dummy";
+
+ Operation operation = parse(sql, SqlDialect.DEFAULT);
+ assert operation instanceof UnloadModuleOperation;
+ final UnloadModuleOperation unloadModuleOperation = (UnloadModuleOperation) operation;
+
+ assertEquals(expectedModuleName, unloadModuleOperation.getModuleName());
+ }
+
+ @Test
public void testCreateTable() {
final String sql =
"CREATE TABLE tbl1 (\n"
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 71f8a53..1f38489 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -30,15 +30,14 @@ import org.apache.flink.table.planner.runtime.stream.table.FunctionITCase.Simple
import org.apache.flink.table.planner.utils.TableTestUtil.replaceStageId
import org.apache.flink.table.planner.utils.{TableTestUtil, TestTableSourceSinks}
import org.apache.flink.types.Row
-
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.sql.SqlExplainLevel
-import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail}
+import org.apache.flink.core.testutils.FlinkMatchers.containsMessage
+import org.junit.Assert.{assertEquals, assertFalse, assertThat, assertTrue, fail}
import org.junit.rules.ExpectedException
import org.junit.{Rule, Test}
import _root_.java.util
-
import _root_.scala.collection.JavaConverters._
class TableEnvironmentTest {
@@ -506,6 +505,94 @@ class TableEnvironmentTest {
}
@Test
+ def testExecuteSqlWithLoadModule: Unit = {
+ val result = tableEnv.executeSql("LOAD MODULE dummy")
+ assertEquals(ResultKind.SUCCESS, result.getResultKind)
+ assert(tableEnv.listModules().sameElements(Array[String]("core", "dummy")))
+
+ val statement =
+ """
+ |LOAD MODULE dummy WITH (
+ |'type' = 'dummy'
+ |)
+ """.stripMargin
+ expectedException.expect(classOf[ValidationException])
+ expectedException.expectMessage(
+ "Property 'type' = 'dummy' is not supported since module name is used to find module")
+ tableEnv.executeSql(statement)
+ }
+
+ @Test
+ def testExecuteSqlWithLoadParameterizedModule(): Unit = {
+ val statement1 =
+ """
+ |LOAD MODULE dummy WITH (
+ | 'dummy-version' = '1'
+ |)
+ """.stripMargin
+ val result = tableEnv.executeSql(statement1)
+ assertEquals(ResultKind.SUCCESS, result.getResultKind)
+ assert(tableEnv.listModules().sameElements(Array[String]("core", "dummy")))
+
+ val statement2 =
+ """
+ |LOAD MODULE dummy WITH (
+ |'dummy-version' = '2'
+ |)
+ """.stripMargin
+ expectedException.expect(classOf[ValidationException])
+ expectedException.expectMessage(
+ "Could not execute LOAD MODULE: (moduleName: [dummy], properties: [{dummy-version=2}])." +
+ " A module with name 'dummy' already exists")
+ tableEnv.executeSql(statement2)
+ }
+
+ @Test
+ def testExecuteSqlWithLoadCaseSensitiveModuleName(): Unit = {
+ val statement1 =
+ """
+ |LOAD MODULE Dummy WITH (
+ | 'dummy-version' = '1'
+ |)
+ """.stripMargin
+
+ try {
+ tableEnv.executeSql(statement1)
+ fail("Expected an exception")
+ } catch {
+ case t: Throwable =>
+ assertThat(t, containsMessage("Could not execute LOAD MODULE: (moduleName: [Dummy], " +
+ "properties: [{dummy-version=1}]). Could not find a suitable table factory for " +
+ "'org.apache.flink.table.factories.ModuleFactory' in\nthe classpath."))
+ }
+
+ val statement2 =
+ """
+ |LOAD MODULE dummy WITH (
+ |'dummy-version' = '2'
+ |)
+ """.stripMargin
+ val result = tableEnv.executeSql(statement2)
+ assertEquals(ResultKind.SUCCESS, result.getResultKind)
+ assert(tableEnv.listModules().sameElements(Array[String]("core", "dummy")))
+ }
+
+ @Test
+ def testExecuteSqlWithUnloadModuleTwice(): Unit = {
+ tableEnv.executeSql("LOAD MODULE dummy")
+ assert(tableEnv.listModules().sameElements(Array[String]("core", "dummy")))
+
+ val result = tableEnv.executeSql("UNLOAD MODULE dummy")
+ assertEquals(ResultKind.SUCCESS, result.getResultKind)
+
+ expectedException.expect(classOf[ValidationException])
+ expectedException.expectMessage(
+ "Could not execute UNLOAD MODULE dummy." +
+ " No module with name 'dummy' exists")
+ tableEnv.executeSql("UNLOAD MODULE dummy")
+ }
+
+ @Test
def testExecuteSqlWithCreateDropView(): Unit = {
val createTableStmt =
"""