You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2021/02/22 06:13:44 UTC

[flink] branch master updated: [FLINK-21297][table] Support 'LOAD/UNLOAD MODULE' syntax

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e2e1121  [FLINK-21297][table] Support 'LOAD/UNLOAD MODULE' syntax
e2e1121 is described below

commit e2e11218516ad579bf2bd9c0fc5510f08cb6d243
Author: Jane <55...@users.noreply.github.com>
AuthorDate: Mon Feb 22 14:13:22 2021 +0800

    [FLINK-21297][table] Support 'LOAD/UNLOAD MODULE' syntax
    
    Support 'LOAD/UNLOAD MODULE' syntax both in SQL parser, TableEnvironment and SQL CLI.
    
    This closes #14944
---
 .../apache/flink/table/client/cli/CliClient.java   | 12 +++
 .../apache/flink/table/client/cli/CliStrings.java  | 17 ++++
 .../flink/table/client/cli/SqlCommandParser.java   | 10 +++
 .../table/client/cli/SqlCommandParserTest.java     | 33 +++++---
 .../client/gateway/local/ExecutionContextTest.java | 20 +++--
 .../client/gateway/local/LocalExecutorITCase.java  | 83 +++++++++++++++++++
 .../src/main/codegen/data/Parser.tdd               |  8 ++
 .../src/main/codegen/includes/parserImpls.ftl      | 41 ++++++++++
 .../parser/hive/FlinkHiveSqlParserImplTest.java    | 13 +++
 .../src/main/codegen/data/Parser.tdd               |  8 ++
 .../src/main/codegen/includes/parserImpls.ftl      | 41 ++++++++++
 .../apache/flink/sql/parser/dql/SqlLoadModule.java | 95 ++++++++++++++++++++++
 .../flink/sql/parser/dql/SqlUnloadModule.java      | 70 ++++++++++++++++
 .../flink/sql/parser/FlinkSqlParserImplTest.java   | 26 +++++-
 .../table/api/internal/TableEnvironmentImpl.java   | 51 +++++++++++-
 .../table/operations/LoadModuleOperation.java      | 54 ++++++++++++
 .../table/operations/UnloadModuleOperation.java    | 39 +++++++++
 .../table/factories/module/DummyModuleFactory.java | 66 +++++++++++++++
 .../org.apache.flink.table.factories.TableFactory  |  1 +
 .../operations/SqlToOperationConverter.java        | 25 ++++++
 .../table/planner/calcite/FlinkPlannerImpl.scala   |  4 +-
 .../operations/SqlToOperationConverterTest.java    | 30 +++++++
 .../flink/table/api/TableEnvironmentTest.scala     | 93 ++++++++++++++++++++-
 23 files changed, 812 insertions(+), 28 deletions(-)

diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index 201f6f0..826b96c 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -383,6 +383,18 @@ public class CliClient implements AutoCloseable {
             case DROP_CATALOG:
                 callDdl(cmdCall.operands[0], CliStrings.MESSAGE_CATALOG_REMOVED);
                 break;
+            case LOAD_MODULE:
+                callDdl(
+                        cmdCall.operands[0],
+                        CliStrings.MESSAGE_LOAD_MODULE_SUCCEEDED,
+                        CliStrings.MESSAGE_LOAD_MODULE_FAILED);
+                break;
+            case UNLOAD_MODULE:
+                callDdl(
+                        cmdCall.operands[0],
+                        CliStrings.MESSAGE_UNLOAD_MODULE_SUCCEEDED,
+                        CliStrings.MESSAGE_UNLOAD_MODULE_FAILED);
+                break;
             default:
                 throw new SqlClientException("Unsupported command: " + cmdCall.command);
         }
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
index 37b130f..f90821a 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
@@ -104,6 +104,15 @@ public final class CliStrings {
                             formatCommand(
                                     SqlCommand.USE,
                                     "Sets the current default database. Experimental! Syntax: 'USE <name>;'"))
+                    .append(
+                            formatCommand(
+                                    SqlCommand.LOAD_MODULE,
+                                    "Load a module. Syntax: 'LOAD MODULE <name> [WITH ('<key1>' = "
+                                            + "'<value1>' [, '<key2>' = '<value2>', ...])];'"))
+                    .append(
+                            formatCommand(
+                                    SqlCommand.UNLOAD_MODULE,
+                                    "Unload a module. Syntax: 'UNLOAD MODULE <name>;'"))
                     .style(AttributedStyle.DEFAULT.underline())
                     .append("\nHint")
                     .style(AttributedStyle.DEFAULT)
@@ -220,6 +229,14 @@ public final class CliStrings {
 
     public static final String MESSAGE_CATALOG_REMOVED = "Catalog has been removed.";
 
+    public static final String MESSAGE_LOAD_MODULE_SUCCEEDED = "Load module succeeded!";
+
+    public static final String MESSAGE_UNLOAD_MODULE_SUCCEEDED = "Unload module succeeded!";
+
+    public static final String MESSAGE_LOAD_MODULE_FAILED = "Load module failed!";
+
+    public static final String MESSAGE_UNLOAD_MODULE_FAILED = "Unload module failed!";
+
     // --------------------------------------------------------------------------------------------
 
     public static final String RESULT_TITLE = "SQL Query Result";
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
index 5ae3333..62376ab 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
 import org.apache.flink.table.operations.DescribeTableOperation;
 import org.apache.flink.table.operations.ExplainOperation;
+import org.apache.flink.table.operations.LoadModuleOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.operations.ShowCatalogsOperation;
@@ -32,6 +33,7 @@ import org.apache.flink.table.operations.ShowDatabasesOperation;
 import org.apache.flink.table.operations.ShowFunctionsOperation;
 import org.apache.flink.table.operations.ShowPartitionsOperation;
 import org.apache.flink.table.operations.ShowTablesOperation;
+import org.apache.flink.table.operations.UnloadModuleOperation;
 import org.apache.flink.table.operations.UseCatalogOperation;
 import org.apache.flink.table.operations.UseDatabaseOperation;
 import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
@@ -168,6 +170,10 @@ public final class SqlCommandParser {
             cmd = SqlCommand.ALTER_FUNCTION;
         } else if (operation instanceof ExplainOperation) {
             cmd = SqlCommand.EXPLAIN;
+        } else if (operation instanceof LoadModuleOperation) {
+            cmd = SqlCommand.LOAD_MODULE;
+        } else if (operation instanceof UnloadModuleOperation) {
+            cmd = SqlCommand.UNLOAD_MODULE;
         } else if (operation instanceof DescribeTableOperation) {
             cmd = SqlCommand.DESCRIBE;
             operands =
@@ -252,6 +258,10 @@ public final class SqlCommandParser {
         // FLINK-17396
         SHOW_MODULES("SHOW\\s+MODULES", NO_OPERANDS),
 
+        LOAD_MODULE,
+
+        UNLOAD_MODULE,
+
         SHOW_PARTITIONS,
 
         USE_CATALOG,
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
index 62a087a..75cfbcc 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
@@ -282,19 +282,28 @@ public class SqlCommandParserTest {
                         // show current database
                         TestItem.validSql(
                                 "show current database", SqlCommand.SHOW_CURRENT_DATABASE),
+                        // load module with module name as identifier
                         TestItem.validSql(
-                                "show 	current 	database", SqlCommand.SHOW_CURRENT_DATABASE),
-                        // show tables
-                        TestItem.validSql("SHOW TABLES;", SqlCommand.SHOW_TABLES),
-                        TestItem.validSql("  SHOW   TABLES   ;", SqlCommand.SHOW_TABLES),
-                        // show functions
-                        TestItem.validSql("SHOW FUNCTIONS;", SqlCommand.SHOW_FUNCTIONS),
-                        TestItem.validSql("  SHOW    FUNCTIONS   ", SqlCommand.SHOW_FUNCTIONS),
-                        // show modules
-                        TestItem.validSql("SHOW MODULES", SqlCommand.SHOW_MODULES)
-                                .cannotParseComment(),
-                        TestItem.validSql("  SHOW    MODULES   ", SqlCommand.SHOW_MODULES)
-                                .cannotParseComment(),
+                                "LOAD MODULE dummy", SqlCommand.LOAD_MODULE, "LOAD MODULE dummy"),
+                        // load module with module name as reversed keyword
+                        TestItem.validSql(
+                                "LOAD MODULE `MODULE`",
+                                SqlCommand.LOAD_MODULE,
+                                "LOAD MODULE `MODULE`"),
+                        // load module with module name as literal
+                        TestItem.invalidSql(
+                                "LOAD MODULE 'dummy'",
+                                SqlExecutionException.class,
+                                "Encountered \"\\'dummy\\'\""),
+                        TestItem.validSql(
+                                "LOAD MODULE dummy WITH ('dummy-version' = '1')",
+                                SqlCommand.LOAD_MODULE,
+                                "LOAD MODULE dummy WITH ('dummy-version' = '1')"),
+                        // unload module
+                        TestItem.validSql(
+                                "UNLOAD MODULE dummy",
+                                SqlCommand.UNLOAD_MODULE,
+                                "UNLOAD MODULE dummy"),
                         // Test create function.
                         TestItem.invalidSql(
                                 "CREATE FUNCTION ",
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
index 04b3bec..4bfe17e 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
@@ -97,7 +97,7 @@ import static org.powermock.api.mockito.PowerMockito.when;
 public class ExecutionContextTest {
 
     private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-client-defaults.yaml";
-    private static final String MODULES_ENVIRONMENT_FILE = "test-sql-client-modules.yaml";
+    public static final String MODULES_ENVIRONMENT_FILE = "test-sql-client-modules.yaml";
     public static final String CATALOGS_ENVIRONMENT_FILE = "test-sql-client-catalogs.yaml";
     private static final String STREAMING_ENVIRONMENT_FILE = "test-sql-client-streaming.yaml";
     private static final String CONFIGURATION_ENVIRONMENT_FILE =
@@ -413,19 +413,23 @@ public class ExecutionContextTest {
         return replaceVars;
     }
 
+    static Map<String, String> createModuleReplaceVars() {
+        Map<String, String> replaceVars = new HashMap<>();
+        replaceVars.put("$VAR_PLANNER", "blink");
+        replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
+        replaceVars.put("$VAR_RESULT_MODE", "changelog");
+        replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
+        replaceVars.put("$VAR_MAX_ROWS", "100");
+        return replaceVars;
+    }
+
     private <T> ExecutionContext<T> createDefaultExecutionContext() throws Exception {
         final Map<String, String> replaceVars = createDefaultReplaceVars();
         return createExecutionContext(DEFAULTS_ENVIRONMENT_FILE, replaceVars);
     }
 
     private <T> ExecutionContext<T> createModuleExecutionContext() throws Exception {
-        final Map<String, String> replaceVars = new HashMap<>();
-        replaceVars.put("$VAR_PLANNER", "old");
-        replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
-        replaceVars.put("$VAR_RESULT_MODE", "changelog");
-        replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
-        replaceVars.put("$VAR_MAX_ROWS", "100");
-        return createExecutionContext(MODULES_ENVIRONMENT_FILE, replaceVars);
+        return createExecutionContext(MODULES_ENVIRONMENT_FILE, createModuleReplaceVars());
     }
 
     private <T> ExecutionContext<T> createCatalogExecutionContext() throws Exception {
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 4e03d91..449217b 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -87,7 +87,10 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
 import static org.apache.flink.table.client.gateway.local.ExecutionContextTest.CATALOGS_ENVIRONMENT_FILE;
+import static org.apache.flink.table.client.gateway.local.ExecutionContextTest.MODULES_ENVIRONMENT_FILE;
+import static org.apache.flink.table.client.gateway.local.ExecutionContextTest.createModuleReplaceVars;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.hamcrest.CoreMatchers.hasItems;
 import static org.hamcrest.CoreMatchers.is;
@@ -1618,6 +1621,86 @@ public class LocalExecutorITCase extends TestLogger {
         executor.closeSession(sessionId);
     }
 
+    @Test
+    public void testLoadModuleWithModuleConfEnabled() throws Exception {
+        // only blink planner supports LOAD MODULE syntax
+        Assume.assumeTrue(planner.equals("blink"));
+        final Executor executor =
+                createModifiedExecutor(
+                        MODULES_ENVIRONMENT_FILE, clusterClient, createModuleReplaceVars());
+        final SessionContext session = new SessionContext("test-session", new Environment());
+        String sessionId = executor.openSession(session);
+        assertEquals("test-session", sessionId);
+
+        assertThrows(
+                "Could not execute statement: load module core",
+                SqlExecutionException.class,
+                () -> executor.executeSql(sessionId, "load module core"));
+
+        executor.executeSql(sessionId, "load module hive");
+        assertEquals(
+                executor.listModules(sessionId),
+                Arrays.asList("core", "mymodule", "myhive", "myhive2", "hive"));
+    }
+
+    @Test
+    public void testUnloadModuleWithModuleConfEnabled() throws Exception {
+        // only blink planner supports UNLOAD MODULE syntax
+        Assume.assumeTrue(planner.equals("blink"));
+        final Executor executor =
+                createModifiedExecutor(
+                        MODULES_ENVIRONMENT_FILE, clusterClient, createModuleReplaceVars());
+        final SessionContext session = new SessionContext("test-session", new Environment());
+        String sessionId = executor.openSession(session);
+        assertEquals("test-session", sessionId);
+
+        executor.executeSql(sessionId, "unload module mymodule");
+        assertEquals(executor.listModules(sessionId), Arrays.asList("core", "myhive", "myhive2"));
+
+        exception.expect(SqlExecutionException.class);
+        exception.expectMessage("Could not execute statement: unload module mymodule");
+        executor.executeSql(sessionId, "unload module mymodule");
+    }
+
+    @Test
+    public void testHiveBuiltInFunctionWithHiveModuleEnabled() throws Exception {
+        // only blink planner supports LOAD MODULE syntax
+        Assume.assumeTrue(planner.equals("blink"));
+
+        final URL url = getClass().getClassLoader().getResource("test-data.csv");
+        Objects.requireNonNull(url);
+        final Map<String, String> replaceVars = new HashMap<>();
+        replaceVars.put("$VAR_PLANNER", planner);
+        replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
+        replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
+        replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
+        replaceVars.put("$VAR_MAX_ROWS", "100");
+        replaceVars.put("$VAR_RESULT_MODE", "table");
+
+        final Executor executor = createModifiedExecutor(clusterClient, replaceVars);
+        final SessionContext session = new SessionContext("test-session", new Environment());
+        String sessionId = executor.openSession(session);
+        assertEquals("test-session", sessionId);
+
+        // cannot use hive built-in function without loading hive module
+        assertThrows(
+                "Could not execute statement: select substring_index('www.apache.org', '.', 2) from TableNumber1",
+                SqlExecutionException.class,
+                () ->
+                        executor.executeSql(
+                                sessionId,
+                                "select substring_index('www.apache.org', '.', 2) from TableNumber1"));
+
+        executor.executeSql(sessionId, "load module hive");
+        assertEquals(executor.listModules(sessionId), Arrays.asList("core", "hive"));
+
+        assertShowResult(
+                executor.executeSql(
+                        sessionId,
+                        "select substring_index('www.apache.org', '.', 2) from TableNumber1"),
+                hasItems("www.apache"));
+    }
+
     private void executeStreamQueryTable(
             Map<String, String> replaceVars, String query, List<String> expectedResults)
             throws Exception {
diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd
index 57101a3..4ba2efd 100644
--- a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd
@@ -78,6 +78,7 @@
     "org.apache.flink.sql.parser.dml.RichSqlInsertKeyword"
     "org.apache.flink.sql.parser.dql.SqlDescribeCatalog"
     "org.apache.flink.sql.parser.dql.SqlDescribeDatabase"
+    "org.apache.flink.sql.parser.dql.SqlLoadModule"
     "org.apache.flink.sql.parser.dql.SqlShowCatalogs"
     "org.apache.flink.sql.parser.dql.SqlShowCurrentCatalog"
     "org.apache.flink.sql.parser.dql.SqlShowDatabases"
@@ -86,6 +87,7 @@
     "org.apache.flink.sql.parser.dql.SqlShowTables"
     "org.apache.flink.sql.parser.dql.SqlShowPartitions"
     "org.apache.flink.sql.parser.dql.SqlRichDescribeTable"
+    "org.apache.flink.sql.parser.dql.SqlUnloadModule"
     "org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec"
     "org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec"
     "org.apache.flink.sql.parser.type.SqlMapTypeNameSpec"
@@ -125,6 +127,7 @@
     "ITEMS"
     "KEYS"
     "LINES"
+    "LOAD"
     "LOCATION"
     "NORELY"
     "NOVALIDATE"
@@ -145,6 +148,7 @@
     "TABLES"
     "TBLPROPERTIES"
     "TERMINATED"
+    "UNLOAD"
     "USE"
     "VALIDATE"
   ]
@@ -272,6 +276,7 @@
     "LENGTH"
     "LEVEL"
     "LIBRARY"
+    "LOAD"
     "LOCATOR"
     "M"
     "MAP"
@@ -443,6 +448,7 @@
     "UNCOMMITTED"
     "UNCONDITIONAL"
     "UNDER"
+    "UNLOAD"
     "UNNAMED"
     "USAGE"
     "USER_DEFINED_TYPE_CATALOG"
@@ -510,6 +516,7 @@
   # Example: SqlShowDatabases(), SqlShowTables().
   statementParserMethods: [
     "RichSqlInsert()"
+    "SqlLoadModule()"
     "SqlShowCatalogs()"
     "SqlShowCurrentCatalogOrDatabase()"
     "SqlDescribeCatalog()"
@@ -524,6 +531,7 @@
     "SqlAlterTable()"
     "SqlAlterView()"
     "SqlShowPartitions()"
+    "SqlUnloadModule()"
   ]
 
   # List of methods for parsing custom literals.
diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
index f8a69a3..6646d6b 100644
--- a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
@@ -1510,3 +1510,44 @@ SqlShowPartitions SqlShowPartitions() :
     [ <PARTITION> { partitionSpec = new SqlNodeList(getPos()); PartitionSpecCommaList(new SqlNodeList(getPos()), partitionSpec); } ]
     { return new SqlShowPartitions(pos, tableIdentifier, partitionSpec); }
 }
+
+/**
+* Parses a load module statement.
+* LOAD MODULE module_name [WITH (property_name=property_value, ...)];
+*/
+SqlLoadModule SqlLoadModule() :
+{
+    SqlParserPos startPos;
+    SqlIdentifier moduleName;
+    SqlNodeList propertyList = SqlNodeList.EMPTY;
+}
+{
+    <LOAD> <MODULE> { startPos = getPos(); }
+    moduleName = SimpleIdentifier()
+    [
+        <WITH>
+        propertyList = TableProperties()
+    ]
+    {
+        return new SqlLoadModule(startPos.plus(getPos()),
+            moduleName,
+            propertyList);
+    }
+}
+
+/**
+* Parses an unload module statement.
+* UNLOAD MODULE module_name;
+*/
+SqlUnloadModule SqlUnloadModule() :
+{
+   SqlParserPos startPos;
+   SqlIdentifier moduleName;
+}
+{
+    <UNLOAD> <MODULE> { startPos = getPos(); }
+    moduleName = SimpleIdentifier()
+    {
+        return new SqlUnloadModule(startPos.plus(getPos()), moduleName);
+    }
+}
diff --git a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
index 9d41e68..0b566d1 100644
--- a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
@@ -441,4 +441,17 @@ public class FlinkHiveSqlParserImplTest extends SqlParserTest {
         sql("show partitions tbl").ok("SHOW PARTITIONS `TBL`");
         sql("show partitions tbl partition (p=1)").ok("SHOW PARTITIONS `TBL` PARTITION (`P` = 1)");
     }
+
+    @Test
+    public void testLoadModule() {
+        sql("load module hive").ok("LOAD MODULE `HIVE`");
+
+        sql("load module hive with ('hive-version' = '3.1.2')")
+                .ok("LOAD MODULE `HIVE` WITH (\n  'hive-version' = '3.1.2'\n)");
+    }
+
+    @Test
+    public void testUnloadModule() {
+        sql("unload module hive").ok("UNLOAD MODULE `HIVE`");
+    }
 }
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 0a086e9..1c9c865 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -57,6 +57,7 @@
     "org.apache.flink.sql.parser.dml.RichSqlInsertKeyword"
     "org.apache.flink.sql.parser.dql.SqlDescribeCatalog"
     "org.apache.flink.sql.parser.dql.SqlDescribeDatabase"
+    "org.apache.flink.sql.parser.dql.SqlLoadModule"
     "org.apache.flink.sql.parser.dql.SqlShowCatalogs"
     "org.apache.flink.sql.parser.dql.SqlShowCurrentCatalog"
     "org.apache.flink.sql.parser.dql.SqlShowDatabases"
@@ -65,6 +66,7 @@
     "org.apache.flink.sql.parser.dql.SqlShowTables"
     "org.apache.flink.sql.parser.dql.SqlShowViews"
     "org.apache.flink.sql.parser.dql.SqlRichDescribeTable"
+    "org.apache.flink.sql.parser.dql.SqlUnloadModule"
     "org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec"
     "org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec"
     "org.apache.flink.sql.parser.type.SqlMapTypeNameSpec"
@@ -91,6 +93,7 @@
     "EXTENDED"
     "FUNCTIONS"
     "IF"
+    "LOAD"
     "METADATA"
     "OVERWRITE"
     "OVERWRITING"
@@ -102,6 +105,7 @@
     "SCALA"
     "STRING"
     "TABLES"
+    "UNLOAD"
     "USE"
     "VIEWS"
     "VIRTUAL"
@@ -233,6 +237,7 @@
     "LENGTH"
     "LEVEL"
     "LIBRARY"
+    "LOAD"
     "LOCATOR"
     "M"
     "MAP"
@@ -405,6 +410,7 @@
     "UNCOMMITTED"
     "UNCONDITIONAL"
     "UNDER"
+    "UNLOAD"
     "UNNAMED"
     "USAGE"
     "USER_DEFINED_TYPE_CATALOG"
@@ -450,6 +456,7 @@
   # Example: SqlShowDatabases(), SqlShowTables().
   statementParserMethods: [
     "RichSqlInsert()"
+    "SqlLoadModule()"
     "SqlShowCatalogs()"
     "SqlShowCurrentCatalogOrDatabase()"
     "SqlDescribeCatalog()"
@@ -464,6 +471,7 @@
     "SqlRichDescribeTable()"
     "SqlAlterTable()"
     "SqlShowViews()"
+    "SqlUnloadModule()"
   ]
 
   # List of methods for parsing custom literals.
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 5281c1e..a9cacfc 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -1431,3 +1431,44 @@ SqlDrop SqlDropExtended(Span s, boolean replace) :
         return drop;
     }
 }
+
+/**
+* Parses a load module statement.
+* LOAD MODULE module_name [WITH (property_name=property_value, ...)];
+*/
+SqlLoadModule SqlLoadModule() :
+{
+    SqlParserPos startPos;
+    SqlIdentifier moduleName;
+    SqlNodeList propertyList = SqlNodeList.EMPTY;
+}
+{
+    <LOAD> <MODULE> { startPos = getPos(); }
+    moduleName = SimpleIdentifier()
+    [
+        <WITH>
+        propertyList = TableProperties()
+    ]
+    {
+        return new SqlLoadModule(startPos.plus(getPos()),
+            moduleName,
+            propertyList);
+    }
+}
+
+/**
+* Parses an unload module statement.
+* UNLOAD MODULE module_name;
+*/
+SqlUnloadModule SqlUnloadModule() :
+{
+   SqlParserPos startPos;
+   SqlIdentifier moduleName;
+}
+{
+    <UNLOAD> <MODULE> { startPos = getPos(); }
+    moduleName = SimpleIdentifier()
+    {
+        return new SqlUnloadModule(startPos.plus(getPos()), moduleName);
+    }
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlLoadModule.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlLoadModule.java
new file mode 100644
index 0000000..9f28fa4
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlLoadModule.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/** LOAD MODULE sql call. */
+public class SqlLoadModule extends SqlCall {
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("LOAD MODULE", SqlKind.OTHER);
+
+    private final SqlIdentifier moduleName;
+
+    private final SqlNodeList propertyList;
+
+    public SqlLoadModule(SqlParserPos pos, SqlIdentifier moduleType, SqlNodeList propertyList) {
+        super(pos);
+        this.moduleName = requireNonNull(moduleType, "moduleName cannot be null");
+        this.propertyList = requireNonNull(propertyList, "propertyList cannot be null");
+    }
+
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(moduleName, propertyList);
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        writer.keyword("LOAD MODULE");
+        moduleName.unparse(writer, leftPrec, rightPrec);
+
+        if (this.propertyList.size() > 0) {
+            writer.keyword("WITH");
+            SqlWriter.Frame withFrame = writer.startList("(", ")");
+            for (SqlNode property : propertyList) {
+                printIndent(writer);
+                property.unparse(writer, leftPrec, rightPrec);
+            }
+            writer.newlineAndIndent();
+            writer.endList(withFrame);
+        }
+    }
+
+    private void printIndent(SqlWriter writer) {
+        writer.sep(",", false);
+        writer.newlineAndIndent();
+        writer.print("  ");
+    }
+
+    public SqlIdentifier getModuleName() {
+        return moduleName;
+    }
+
+    public SqlNodeList getPropertyList() {
+        return propertyList;
+    }
+
+    public String moduleName() {
+        return moduleName.getSimple();
+    }
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlUnloadModule.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlUnloadModule.java
new file mode 100644
index 0000000..a5fe601
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlUnloadModule.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+
+/** UNLOAD MODULE sql call. */
+public class SqlUnloadModule extends SqlCall {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("UNLOAD MODULE", SqlKind.OTHER);
+
+    private final SqlIdentifier moduleName;
+
+    public SqlUnloadModule(SqlParserPos pos, SqlIdentifier moduleName) {
+        super(pos);
+        this.moduleName = moduleName;
+    }
+
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(moduleName);
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        writer.keyword("UNLOAD");
+        writer.keyword("MODULE");
+        moduleName.unparse(writer, leftPrec, rightPrec);
+    }
+
+    public SqlIdentifier getModuleName() {
+        return moduleName;
+    }
+
+    public String moduleName() {
+        return moduleName.getSimple();
+    }
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index e166a6a..a9e2bf7 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -175,7 +175,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
     }
 
     @Test
-    public void testShowFuntions() {
+    public void testShowFunctions() {
         sql("show functions").ok("SHOW FUNCTIONS");
         sql("show functions db1").ok("SHOW FUNCTIONS `DB1`");
         sql("show functions catalog1.db1").ok("SHOW FUNCTIONS `CATALOG1`.`DB1`");
@@ -1153,6 +1153,30 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
                 .ok("DROP TEMPORARY SYSTEM FUNCTION IF EXISTS `CATALOG1`.`DB1`.`FUNCTION1`");
     }
 
+    @Test
+    public void testLoadModule() {
+        sql("load module core").ok("LOAD MODULE `CORE`");
+
+        sql("load module dummy with ('k1' = 'v1', 'k2' = 'v2')")
+                .ok(
+                        "LOAD MODULE `DUMMY`"
+                                + " WITH (\n"
+                                + "  'k1' = 'v1',\n"
+                                + "  'k2' = 'v2'\n"
+                                + ")");
+
+        sql("load module ^'core'^")
+                .fails("(?s).*Encountered \"\\\\'core\\\\'\" at line 1, column 13.\n.*");
+    }
+
+    @Test
+    public void testUnloadModule() {
+        sql("unload module core").ok("UNLOAD MODULE `CORE`");
+
+        sql("unload module ^'core'^")
+                .fails("(?s).*Encountered \"\\\\'core\\\\'\" at line 1, column 15.\n.*");
+    }
+
     public static BaseMatcher<SqlNode> validated(String validatedSql) {
         return new TypeSafeDiagnosingMatcher<SqlNode>() {
             @Override
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index c4d254a..7f6e888 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -74,6 +74,7 @@ import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.flink.table.factories.ComponentFactoryService;
+import org.apache.flink.table.factories.ModuleFactory;
 import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.UserDefinedFunction;
@@ -85,6 +86,7 @@ import org.apache.flink.table.operations.CatalogQueryOperation;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
 import org.apache.flink.table.operations.DescribeTableOperation;
 import org.apache.flink.table.operations.ExplainOperation;
+import org.apache.flink.table.operations.LoadModuleOperation;
 import org.apache.flink.table.operations.ModifyOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.QueryOperation;
@@ -98,6 +100,7 @@ import org.apache.flink.table.operations.ShowPartitionsOperation;
 import org.apache.flink.table.operations.ShowTablesOperation;
 import org.apache.flink.table.operations.ShowViewsOperation;
 import org.apache.flink.table.operations.TableSourceQueryOperation;
+import org.apache.flink.table.operations.UnloadModuleOperation;
 import org.apache.flink.table.operations.UseCatalogOperation;
 import org.apache.flink.table.operations.UseDatabaseOperation;
 import org.apache.flink.table.operations.ddl.AddPartitionsOperation;
@@ -148,6 +151,8 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
+import static org.apache.flink.table.descriptors.ModuleDescriptorValidator.MODULE_TYPE;
+
 /**
  * Implementation of {@link TableEnvironment} that works exclusively with Table API interfaces. Only
  * {@link TableSource} is supported as an input and {@link TableSink} as an output. It also does not
@@ -175,13 +180,13 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
             "Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type "
                     + "INSERT, CREATE TABLE, DROP TABLE, ALTER TABLE, USE CATALOG, USE [CATALOG.]DATABASE, "
                     + "CREATE DATABASE, DROP DATABASE, ALTER DATABASE, CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, "
-                    + "CREATE CATALOG, DROP CATALOG, CREATE VIEW, DROP VIEW.";
+                    + "CREATE CATALOG, DROP CATALOG, CREATE VIEW, DROP VIEW, LOAD MODULE, UNLOAD MODULE.";
     private static final String UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG =
             "Unsupported SQL query! executeSql() only accepts a single SQL statement of type "
                     + "CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, "
                     + "CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, CREATE CATALOG, DROP CATALOG, "
                     + "USE CATALOG, USE [CATALOG.]DATABASE, SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW FUNCTIONS, SHOW PARTITIONS"
-                    + "CREATE VIEW, DROP VIEW, SHOW VIEWS, INSERT, DESCRIBE.";
+                    + "CREATE VIEW, DROP VIEW, SHOW VIEWS, INSERT, DESCRIBE, LOAD MODULE, UNLOAD MODULE.";
 
     /** Provides necessary methods for {@link ConnectTableDescriptor}. */
     private final Registration registration =
@@ -780,7 +785,9 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
                 || operation instanceof CreateCatalogOperation
                 || operation instanceof DropCatalogOperation
                 || operation instanceof UseCatalogOperation
-                || operation instanceof UseDatabaseOperation) {
+                || operation instanceof UseDatabaseOperation
+                || operation instanceof LoadModuleOperation
+                || operation instanceof UnloadModuleOperation) {
             executeOperation(operation);
         } else {
             throw new TableException(UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG);
@@ -1052,6 +1059,10 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
             } catch (CatalogException e) {
                 throw new ValidationException(exMsg, e);
             }
+        } else if (operation instanceof LoadModuleOperation) {
+            return loadModule((LoadModuleOperation) operation);
+        } else if (operation instanceof UnloadModuleOperation) {
+            return unloadModule((UnloadModuleOperation) operation);
         } else if (operation instanceof UseCatalogOperation) {
             UseCatalogOperation useCatalogOperation = (UseCatalogOperation) operation;
             catalogManager.setCurrentCatalog(useCatalogOperation.getCatalogName());
@@ -1151,6 +1162,40 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
         }
     }
 
+    private TableResult loadModule(LoadModuleOperation operation) {
+        String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());
+        try {
+            // find module by name
+            Map<String, String> properties = new HashMap<>(operation.getProperties());
+            if (properties.containsKey(MODULE_TYPE)) {
+                throw new ValidationException(
+                        String.format(
+                                "Property 'type' = '%s' is not supported since module name "
+                                        + "is used to find module",
+                                properties.get(MODULE_TYPE)));
+            }
+            properties.put(MODULE_TYPE, operation.getModuleName());
+            final ModuleFactory factory =
+                    TableFactoryService.find(ModuleFactory.class, properties, userClassLoader);
+            moduleManager.loadModule(operation.getModuleName(), factory.createModule(properties));
+            return TableResultImpl.TABLE_RESULT_OK;
+        } catch (ValidationException e) {
+            throw new ValidationException(String.format("%s. %s", exMsg, e.getMessage()), e);
+        } catch (Exception e) {
+            throw new TableException(String.format("%s. %s", exMsg, e.getMessage()), e);
+        }
+    }
+
+    private TableResult unloadModule(UnloadModuleOperation operation) {
+        String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());
+        try {
+            moduleManager.unloadModule(operation.getModuleName());
+            return TableResultImpl.TABLE_RESULT_OK;
+        } catch (ValidationException e) {
+            throw new ValidationException(String.format("%s. %s", exMsg, e.getMessage()), e);
+        }
+    }
+
     private TableResult buildShowResult(String columnName, String[] objects) {
         return buildResult(
                 new String[] {columnName},
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/LoadModuleOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/LoadModuleOperation.java
new file mode 100644
index 0000000..2409ca0
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/LoadModuleOperation.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Operation to describe a LOAD MODULE statement. */
+public class LoadModuleOperation implements Operation {
+    private final String moduleName;
+
+    private final Map<String, String> properties;
+
+    public LoadModuleOperation(String moduleName, Map<String, String> properties) {
+        this.moduleName = checkNotNull(moduleName);
+        this.properties = checkNotNull(properties);
+    }
+
+    public String getModuleName() {
+        return moduleName;
+    }
+
+    public Map<String, String> getProperties() {
+        return Collections.unmodifiableMap(properties);
+    }
+
+    @Override
+    public String asSummaryString() {
+        Map<String, Object> params = new LinkedHashMap<>();
+        params.put("moduleName", moduleName);
+        params.put("properties", properties);
+        return OperationUtils.formatWithChildren(
+                "LOAD MODULE", params, Collections.emptyList(), Operation::asSummaryString);
+    }
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UnloadModuleOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UnloadModuleOperation.java
new file mode 100644
index 0000000..46dc8bb
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UnloadModuleOperation.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Operation to describe an UNLOAD MODULE statement. */
+public class UnloadModuleOperation implements Operation {
+    private final String moduleName;
+
+    public UnloadModuleOperation(String moduleName) {
+        this.moduleName = checkNotNull(moduleName);
+    }
+
+    public String getModuleName() {
+        return moduleName;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return String.format("UNLOAD MODULE %s", moduleName);
+    }
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/module/DummyModuleFactory.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/module/DummyModuleFactory.java
new file mode 100644
index 0000000..65e6100
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/module/DummyModuleFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.module;
+
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.ModuleDescriptorValidator;
+import org.apache.flink.table.factories.ModuleFactory;
+import org.apache.flink.table.module.Module;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.descriptors.ModuleDescriptorValidator.MODULE_TYPE;
+
+/** Test implementation for {@link ModuleFactory}. */
+public class DummyModuleFactory implements ModuleFactory {
+
+    @Override
+    public Module createModule(Map<String, String> properties) {
+        return new Module() {};
+    }
+
+    @Override
+    public Map<String, String> requiredContext() {
+        Map<String, String> context = new HashMap<>();
+        context.put(MODULE_TYPE, DummyModuleDescriptorValidator.MODULE_TYPE_DUMMY);
+
+        return context;
+    }
+
+    @Override
+    public List<String> supportedProperties() {
+        return Collections.singletonList(DummyModuleDescriptorValidator.MODULE_DUMMY_VERSION);
+    }
+
+    /** Test implementation for {@link ModuleDescriptorValidator}. */
+    public static class DummyModuleDescriptorValidator extends ModuleDescriptorValidator {
+        public static final String MODULE_TYPE_DUMMY = "dummy";
+        public static final String MODULE_DUMMY_VERSION = "dummy-version";
+
+        @Override
+        public void validate(DescriptorProperties properties) {
+            super.validate(properties);
+            properties.validateValue(MODULE_TYPE, MODULE_TYPE_DUMMY, false);
+            properties.validateString(MODULE_DUMMY_VERSION, true, 1);
+        }
+    }
+}
diff --git a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index 1094713..625488d 100644
--- a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -13,4 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+org.apache.flink.table.factories.module.DummyModuleFactory
 org.apache.flink.table.factories.TestTableSinkFactory
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index dfed888..1c54d4f 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -48,6 +48,7 @@ import org.apache.flink.sql.parser.ddl.SqlUseCatalog;
 import org.apache.flink.sql.parser.ddl.SqlUseDatabase;
 import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
 import org.apache.flink.sql.parser.dml.RichSqlInsert;
+import org.apache.flink.sql.parser.dql.SqlLoadModule;
 import org.apache.flink.sql.parser.dql.SqlRichDescribeTable;
 import org.apache.flink.sql.parser.dql.SqlShowCatalogs;
 import org.apache.flink.sql.parser.dql.SqlShowCurrentCatalog;
@@ -57,6 +58,7 @@ import org.apache.flink.sql.parser.dql.SqlShowFunctions;
 import org.apache.flink.sql.parser.dql.SqlShowPartitions;
 import org.apache.flink.sql.parser.dql.SqlShowTables;
 import org.apache.flink.sql.parser.dql.SqlShowViews;
+import org.apache.flink.sql.parser.dql.SqlUnloadModule;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
@@ -80,6 +82,7 @@ import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
 import org.apache.flink.table.operations.DescribeTableOperation;
 import org.apache.flink.table.operations.ExplainOperation;
+import org.apache.flink.table.operations.LoadModuleOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ShowCatalogsOperation;
 import org.apache.flink.table.operations.ShowCurrentCatalogOperation;
@@ -89,6 +92,7 @@ import org.apache.flink.table.operations.ShowFunctionsOperation;
 import org.apache.flink.table.operations.ShowPartitionsOperation;
 import org.apache.flink.table.operations.ShowTablesOperation;
 import org.apache.flink.table.operations.ShowViewsOperation;
+import org.apache.flink.table.operations.UnloadModuleOperation;
 import org.apache.flink.table.operations.UseCatalogOperation;
 import org.apache.flink.table.operations.UseDatabaseOperation;
 import org.apache.flink.table.operations.ddl.AddPartitionsOperation;
@@ -193,11 +197,15 @@ public class SqlToOperationConverter {
             return Optional.of(converter.convertCreateCatalog((SqlCreateCatalog) validated));
         } else if (validated instanceof SqlDropCatalog) {
             return Optional.of(converter.convertDropCatalog((SqlDropCatalog) validated));
+        } else if (validated instanceof SqlLoadModule) {
+            return Optional.of(converter.convertLoadModule((SqlLoadModule) validated));
         } else if (validated instanceof SqlShowCatalogs) {
             return Optional.of(converter.convertShowCatalogs((SqlShowCatalogs) validated));
         } else if (validated instanceof SqlShowCurrentCatalog) {
             return Optional.of(
                     converter.convertShowCurrentCatalog((SqlShowCurrentCatalog) validated));
+        } else if (validated instanceof SqlUnloadModule) {
+            return Optional.of(converter.convertUnloadModule((SqlUnloadModule) validated));
         } else if (validated instanceof SqlUseCatalog) {
             return Optional.of(converter.convertUseCatalog((SqlUseCatalog) validated));
         } else if (validated instanceof SqlCreateDatabase) {
@@ -864,6 +872,23 @@ public class SqlToOperationConverter {
         return new DescribeTableOperation(identifier, sqlRichDescribeTable.isExtended());
     }
 
+    /** Convert LOAD MODULE statement. */
+    private Operation convertLoadModule(SqlLoadModule sqlLoadModule) {
+        String moduleName = sqlLoadModule.moduleName();
+        Map<String, String> properties = new HashMap<>();
+        for (SqlNode node : sqlLoadModule.getPropertyList().getList()) {
+            SqlTableOption option = (SqlTableOption) node;
+            properties.put(option.getKeyString(), option.getValueString());
+        }
+        return new LoadModuleOperation(moduleName, properties);
+    }
+
+    /** Convert UNLOAD MODULE statement. */
+    private Operation convertUnloadModule(SqlUnloadModule sqlUnloadModule) {
+        String moduleName = sqlUnloadModule.moduleName();
+        return new UnloadModuleOperation(moduleName);
+    }
+
     /** Fallback method for sql query. */
     private Operation convertSqlQuery(SqlNode node) {
         return toQueryOperation(flinkPlanner, node);
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index 38adb91..4b7e5f8 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -127,6 +127,7 @@ class FlinkPlannerImpl(
         || sqlNode.getKind == SqlKind.CREATE_FUNCTION
         || sqlNode.getKind == SqlKind.DROP_FUNCTION
         || sqlNode.getKind == SqlKind.OTHER_DDL
+        || sqlNode.isInstanceOf[SqlLoadModule]
         || sqlNode.isInstanceOf[SqlShowCatalogs]
         || sqlNode.isInstanceOf[SqlShowCurrentCatalog]
         || sqlNode.isInstanceOf[SqlShowDatabases]
@@ -135,7 +136,8 @@ class FlinkPlannerImpl(
         || sqlNode.isInstanceOf[SqlShowFunctions]
         || sqlNode.isInstanceOf[SqlShowViews]
         || sqlNode.isInstanceOf[SqlShowPartitions]
-        || sqlNode.isInstanceOf[SqlRichDescribeTable]) {
+        || sqlNode.isInstanceOf[SqlRichDescribeTable]
+        || sqlNode.isInstanceOf[SqlUnloadModule]) {
         return sqlNode
       }
       sqlNode match {
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index 58cde58..d165f20 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -47,7 +47,9 @@ import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.LoadModuleOperation;
 import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.UnloadModuleOperation;
 import org.apache.flink.table.operations.UseCatalogOperation;
 import org.apache.flink.table.operations.UseDatabaseOperation;
 import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
@@ -291,6 +293,34 @@ public class SqlToOperationConverterTest {
     }
 
     @Test
+    public void testLoadModule() {
+        final String sql = "LOAD MODULE dummy WITH ('k1' = 'v1', 'k2' = 'v2')";
+        final String expectedModuleName = "dummy";
+        final Map<String, String> expectedProperties = new HashMap<>();
+        expectedProperties.put("k1", "v1");
+        expectedProperties.put("k2", "v2");
+
+        Operation operation = parse(sql, SqlDialect.DEFAULT);
+        assert operation instanceof LoadModuleOperation;
+        final LoadModuleOperation loadModuleOperation = (LoadModuleOperation) operation;
+
+        assertEquals(expectedModuleName, loadModuleOperation.getModuleName());
+        assertEquals(expectedProperties, loadModuleOperation.getProperties());
+    }
+
+    @Test
+    public void testUnloadModule() {
+        final String sql = "UNLOAD MODULE dummy";
+        final String expectedModuleName = "dummy";
+
+        Operation operation = parse(sql, SqlDialect.DEFAULT);
+        assert operation instanceof UnloadModuleOperation;
+        final UnloadModuleOperation unloadModuleOperation = (UnloadModuleOperation) operation;
+
+        assertEquals(expectedModuleName, unloadModuleOperation.getModuleName());
+    }
+
+    @Test
     public void testCreateTable() {
         final String sql =
                 "CREATE TABLE tbl1 (\n"
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 71f8a53..1f38489 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -30,15 +30,14 @@ import org.apache.flink.table.planner.runtime.stream.table.FunctionITCase.Simple
 import org.apache.flink.table.planner.utils.TableTestUtil.replaceStageId
 import org.apache.flink.table.planner.utils.{TableTestUtil, TestTableSourceSinks}
 import org.apache.flink.types.Row
-
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.sql.SqlExplainLevel
-import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail}
+import org.apache.flink.core.testutils.FlinkMatchers.containsMessage
+import org.junit.Assert.{assertEquals, assertFalse, assertThat, assertTrue, fail}
 import org.junit.rules.ExpectedException
 import org.junit.{Rule, Test}
 
 import _root_.java.util
-
 import _root_.scala.collection.JavaConverters._
 
 class TableEnvironmentTest {
@@ -506,6 +505,94 @@ class TableEnvironmentTest {
   }
 
   @Test
+  def testExecuteSqlWithLoadModule: Unit = {
+    val result = tableEnv.executeSql("LOAD MODULE dummy")
+    assertEquals(ResultKind.SUCCESS, result.getResultKind)
+    assert(tableEnv.listModules().sameElements(Array[String]("core", "dummy")))
+
+    val statement =
+      """
+        |LOAD MODULE dummy WITH (
+        |'type' = 'dummy'
+        |)
+      """.stripMargin
+    expectedException.expect(classOf[ValidationException])
+    expectedException.expectMessage(
+      "Property 'type' = 'dummy' is not supported since module name is used to find module")
+    tableEnv.executeSql(statement)
+  }
+
+  @Test
+  def testExecuteSqlWithLoadParameterizedModule(): Unit = {
+    val statement1 =
+      """
+        |LOAD MODULE dummy WITH (
+        |  'dummy-version' = '1'
+        |)
+      """.stripMargin
+    val result = tableEnv.executeSql(statement1)
+    assertEquals(ResultKind.SUCCESS, result.getResultKind)
+    assert(tableEnv.listModules().sameElements(Array[String]("core", "dummy")))
+
+    val statement2 =
+      """
+        |LOAD MODULE dummy WITH (
+        |'dummy-version' = '2'
+        |)
+      """.stripMargin
+    expectedException.expect(classOf[ValidationException])
+    expectedException.expectMessage(
+      "Could not execute LOAD MODULE: (moduleName: [dummy], properties: [{dummy-version=2}])." +
+        " A module with name 'dummy' already exists")
+    tableEnv.executeSql(statement2)
+  }
+
+  @Test
+  def testExecuteSqlWithLoadCaseSensitiveModuleName(): Unit = {
+    val statement1 =
+      """
+        |LOAD MODULE Dummy WITH (
+        |  'dummy-version' = '1'
+        |)
+      """.stripMargin
+
+    try {
+      tableEnv.executeSql(statement1)
+      fail("Expected an exception")
+    } catch {
+      case t: Throwable =>
+        assertThat(t, containsMessage("Could not execute LOAD MODULE: (moduleName: [Dummy], " +
+          "properties: [{dummy-version=1}]). Could not find a suitable table factory for " +
+          "'org.apache.flink.table.factories.ModuleFactory' in\nthe classpath."))
+    }
+
+    val statement2 =
+      """
+        |LOAD MODULE dummy WITH (
+        |'dummy-version' = '2'
+        |)
+      """.stripMargin
+    val result = tableEnv.executeSql(statement2)
+    assertEquals(ResultKind.SUCCESS, result.getResultKind)
+    assert(tableEnv.listModules().sameElements(Array[String]("core", "dummy")))
+  }
+
+  @Test
+  def testExecuteSqlWithUnloadModuleTwice(): Unit = {
+    tableEnv.executeSql("LOAD MODULE dummy")
+    assert(tableEnv.listModules().sameElements(Array[String]("core", "dummy")))
+
+    val result = tableEnv.executeSql("UNLOAD MODULE dummy")
+    assertEquals(ResultKind.SUCCESS, result.getResultKind)
+
+    expectedException.expect(classOf[ValidationException])
+    expectedException.expectMessage(
+      "Could not execute UNLOAD MODULE dummy." +
+        " No module with name 'dummy' exists")
+    tableEnv.executeSql("UNLOAD MODULE dummy")
+  }
+
+  @Test
   def testExecuteSqlWithCreateDropView(): Unit = {
     val createTableStmt =
       """