You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/08/03 07:08:41 UTC

[flink] 02/02: [FLINK-27770][sql-gateway] Refactor SqlGateway E2E tests

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 31bb1f49bf4ccc657ad6879bdf5d954cc24062d1
Author: Shengkai <10...@qq.com>
AuthorDate: Mon Aug 1 11:55:51 2022 +0800

    [FLINK-27770][sql-gateway] Refactor SqlGateway E2E tests
    
    This closes #20174
---
 .../table/endpoint/hive/HiveServer2Endpoint.java   |   6 +-
 .../flink-sql-connector-hive-2.3.9/pom.xml         |   9 +
 flink-dist/src/main/flink-bin/bin/flink-console.sh |   4 +-
 flink-dist/src/main/flink-bin/bin/flink-daemon.sh  |   4 +-
 .../tests/util/kafka/SQLClientKafkaITCase.java     |  16 +-
 .../org/apache/flink/tests/util/TestUtils.java     |  16 ++
 .../flink/tests/util/flink/ClusterController.java  |   2 +-
 .../flink/tests/util/flink/FlinkDistribution.java  |  75 +++++--
 .../flink/tests/util/flink/FlinkResource.java      |   9 +
 ...usterController.java => GatewayController.java} |  17 +-
 .../util/flink/LocalStandaloneFlinkResource.java   |  35 +++-
 .../tests/util/hbase/SQLClientHBaseITCase.java     |   2 +-
 .../table/sql/codegen/PlannerScalaFreeITCase.java  |   2 +-
 .../flink-sql-gateway-test/pom.xml                 | 221 +++++++++++++++------
 .../flink/table/gateway/SQLGatewayITCase.java      |  71 -------
 .../flink/table/gateway/SqlGatewayE2ECase.java     | 197 ++++++++++++++++++
 .../table/gateway/containers/HiveContainer.java    |  53 +++++
 .../src/test/resources/gateway_e2e.sql             |  30 +++
 .../src/test/resources/hive-site.xml               |  86 ++++++++
 .../src/test/resources/log4j2-test.properties      |  32 +--
 flink-end-to-end-tests/pom.xml                     |   2 +-
 flink-table/flink-sql-gateway/bin/sql-gateway.sh   |  60 +++---
 .../org/apache/flink/table/gateway/SqlGateway.java |  46 +++--
 .../apache/flink/table/gateway/SqlGatewayTest.java |  53 +++--
 .../table/planner/delegation/PlannerBase.scala     |   2 +-
 .../org/apache/flink/util/DockerImageVersions.java |   2 +
 .../apache/flink/test/util/SQLJobSubmission.java   |  23 ++-
 .../modules-skipping-deployment.modulelist         |   1 +
 28 files changed, 795 insertions(+), 281 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index 07597450a4f..8ae5bb23954 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -318,9 +318,9 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
             resp.setServerProtocolVersion(sessionVersion.getVersion());
             resp.setSessionHandle(toTSessionHandle(sessionHandle));
             resp.setConfiguration(service.getSessionConfig(sessionHandle));
-        } catch (Exception e) {
-            LOG.error("Failed to OpenSession.", e);
-            resp.setStatus(toTStatus(e));
+        } catch (Throwable t) {
+            LOG.error("Failed to OpenSession.", t);
+            resp.setStatus(toTStatus(t));
         }
         return resp;
     }
diff --git a/flink-connectors/flink-sql-connector-hive-2.3.9/pom.xml b/flink-connectors/flink-sql-connector-hive-2.3.9/pom.xml
index e22865a8f0c..a948ed33bcf 100644
--- a/flink-connectors/flink-sql-connector-hive-2.3.9/pom.xml
+++ b/flink-connectors/flink-sql-connector-hive-2.3.9/pom.xml
@@ -104,6 +104,15 @@ under the License.
 									<shadedPattern>org.apache.flink.hive.shaded.com.google</shadedPattern>
 								</relocation>
 							</relocations>
+							<!-- Table planner also contains calcite dependency. Exclude to prevent class conflicts. -->
+							<filters>
+								<filter>
+									<artifact>org.apache.hive:hive-exec</artifact>
+									<excludes>
+										<exclude>org/apache/calcite/**</exclude>
+									</excludes>
+								</filter>
+							</filters>
 						</configuration>
 					</execution>
 				</executions>
diff --git a/flink-dist/src/main/flink-bin/bin/flink-console.sh b/flink-dist/src/main/flink-bin/bin/flink-console.sh
index 05b9d42632e..68181d2023b 100755
--- a/flink-dist/src/main/flink-bin/bin/flink-console.sh
+++ b/flink-dist/src/main/flink-bin/bin/flink-console.sh
@@ -19,7 +19,7 @@
 
 # Start a Flink service as a console application. Must be stopped with Ctrl-C
 # or with SIGTERM by kill or the controlling process.
-USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|kubernetes-session|kubernetes-application|kubernetes-taskmanager|sqlgateway) [args]"
+USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|kubernetes-session|kubernetes-application|kubernetes-taskmanager|sql-gateway) [args]"
 
 SERVICE=$1
 ARGS=("${@:2}") # get remaining arguments as array
@@ -62,7 +62,7 @@ case $SERVICE in
         CLASS_TO_RUN=org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner
     ;;
 
-    (sqlgateway)
+    (sql-gateway)
         CLASS_TO_RUN=org.apache.flink.table.gateway.SqlGateway
         SQL_GATEWAY_CLASSPATH=`findSqlGatewayJar`
     ;;
diff --git a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
index 246b540a972..d6a27416594 100644
--- a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
+++ b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
@@ -18,7 +18,7 @@
 ################################################################################
 
 # Start/stop a Flink daemon.
-USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|sqlgateway) [args]"
+USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|sql-gateway) [args]"
 
 STARTSTOP=$1
 DAEMON=$2
@@ -50,7 +50,7 @@ case $DAEMON in
         CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
     ;;
 
-    (sqlgateway)
+    (sql-gateway)
         CLASS_TO_RUN=org.apache.flink.table.gateway.SqlGateway
         SQL_GATEWAY_CLASSPATH=`findSqlGatewayJar`
     ;;
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
index 4d88ccca7fd..eee3c478e36 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
@@ -59,6 +59,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import static org.apache.flink.tests.util.TestUtils.readCsvResultFiles;
 import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
 import static org.junit.Assert.assertThat;
 
@@ -174,7 +175,7 @@ public class SQLClientKafkaITCase extends TestLogger {
     }
 
     private void executeSqlStatements(ClusterController clusterController, List<String> sqlLines)
-            throws IOException {
+            throws Exception {
         LOG.info("Executing Kafka {} end-to-end SQL statements.", kafkaSQLVersion);
         clusterController.submitSQLJob(
                 new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
@@ -234,17 +235,4 @@ public class SQLClientKafkaITCase extends TestLogger {
         }
         Assert.assertTrue("Did not get expected results before timeout.", success);
     }
-
-    private static List<String> readCsvResultFiles(Path path) throws IOException {
-        File filePath = path.toFile();
-        // list all the non-hidden files
-        File[] csvFiles = filePath.listFiles((dir, name) -> !name.startsWith("."));
-        List<String> result = new ArrayList<>();
-        if (csvFiles != null) {
-            for (File file : csvFiles) {
-                result.addAll(Files.readAllLines(file.toPath()));
-            }
-        }
-        return result;
-    }
 }
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/TestUtils.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/TestUtils.java
index 4fe8c7a5b66..a76f2b8e420 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/TestUtils.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/TestUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.tests.util;
 
 import org.apache.flink.test.parameters.ParameterProperty;
 
+import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.file.FileAlreadyExistsException;
@@ -31,6 +32,7 @@ import java.nio.file.Paths;
 import java.nio.file.SimpleFileVisitor;
 import java.nio.file.StandardCopyOption;
 import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.regex.Pattern;
@@ -132,4 +134,18 @@ public enum TestUtils {
 
         return destination;
     }
+
+    /** Read the all files with the specified path. */
+    public static List<String> readCsvResultFiles(Path path) throws IOException {
+        File filePath = path.toFile();
+        // list all the non-hidden files
+        File[] csvFiles = filePath.listFiles((dir, name) -> !name.startsWith("."));
+        List<String> result = new ArrayList<>();
+        if (csvFiles != null) {
+            for (File file : csvFiles) {
+                result.addAll(Files.readAllLines(file.toPath()));
+            }
+        }
+        return result;
+    }
 }
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java
index f732e07ce37..3d7c7219b53 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java
@@ -45,5 +45,5 @@ public interface ClusterController extends AutoCloseableAsync {
      * @param timeout the maximum time to wait.
      * @throws IOException if any IO error happen.
      */
-    void submitSQLJob(SQLJobSubmission job, Duration timeout) throws IOException;
+    void submitSQLJob(SQLJobSubmission job, Duration timeout) throws Exception;
 }
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
index c699adaeb8f..86efed246f1 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
@@ -28,6 +28,7 @@ import org.apache.flink.test.util.SQLJobSubmission;
 import org.apache.flink.tests.util.AutoClosableProcess;
 import org.apache.flink.tests.util.TestUtils;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.function.FutureTaskWithException;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -45,14 +46,19 @@ import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.net.InetAddress;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -71,6 +77,7 @@ final class FlinkDistribution {
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
     private static final Pattern ROOT_LOGGER_PATTERN = Pattern.compile("(rootLogger.level =).*");
+    private static final String HIVE_DRIVER = "org.apache.hive.jdbc.HiveDriver";
 
     private final Path opt;
     private final Path lib;
@@ -106,13 +113,15 @@ final class FlinkDistribution {
                 bin.resolve("taskmanager.sh").toAbsolutePath().toString(), "start");
     }
 
-    public void startSQLGateway(String arg) throws IOException {
+    public void startSqlGateway() throws IOException {
         LOG.info("Starting Flink SQL Gateway.");
-        AutoClosableProcess.runBlocking(
-                bin.resolve("sql-gateway.sh").toAbsolutePath().toString(), "start", arg);
+        AutoClosableProcess.create(
+                        bin.resolve("sql-gateway.sh").toAbsolutePath().toString(), "start")
+                .setStdoutProcessor(LOG::info)
+                .runBlocking();
     }
 
-    public void stopSQLGateway() throws IOException {
+    public void stopSqlGateway() throws IOException {
         LOG.info("Stopping Flink SQL Gateway.");
         AutoClosableProcess.runBlocking(
                 bin.resolve("sql-gateway.sh").toAbsolutePath().toString(), "stop");
@@ -221,18 +230,56 @@ final class FlinkDistribution {
         }
     }
 
-    public void submitSQLJob(SQLJobSubmission job, Duration timeout) throws IOException {
+    public void submitSQLJob(SQLJobSubmission job, Duration timeout) throws Exception {
         final List<String> commands = new ArrayList<>();
-        commands.add(bin.resolve("sql-client.sh").toAbsolutePath().toString());
-        for (String jar : job.getJars()) {
-            commands.add("--jar");
-            commands.add(jar);
-        }
 
-        AutoClosableProcess.create(commands.toArray(new String[0]))
-                .setStdInputs(job.getSqlLines().toArray(new String[0]))
-                .setStdoutProcessor(LOG::info) // logging the SQL statements and error message
-                .runBlocking(timeout);
+        if (job.getClientMode() == SQLJobSubmission.ClientMode.SQL_CLIENT) {
+            commands.add(bin.resolve("sql-client.sh").toAbsolutePath().toString());
+            for (String jar : job.getJars()) {
+                commands.add("--jar");
+                commands.add(jar);
+            }
+
+            AutoClosableProcess.create(commands.toArray(new String[0]))
+                    .setStdInputs(job.getSqlLines().toArray(new String[0]))
+                    .setStdoutProcessor(LOG::info) // logging the SQL statements and error message
+                    .runBlocking(timeout);
+        } else if (job.getClientMode() == SQLJobSubmission.ClientMode.HIVE_JDBC) {
+            FutureTaskWithException<Void> future =
+                    new FutureTaskWithException<>(
+                            () -> {
+                                // register HiveDriver to the DriverManager
+                                Class.forName(HIVE_DRIVER);
+                                Map<String, String> configMap =
+                                        GlobalConfiguration.loadConfiguration(
+                                                        conf.toAbsolutePath().toString())
+                                                .toMap();
+                                String host =
+                                        configMap.getOrDefault(
+                                                "sql-gateway.endpoint.hiveserver2.host",
+                                                InetAddress.getByName("localhost")
+                                                        .getHostAddress());
+                                String port =
+                                        configMap.getOrDefault(
+                                                "sql-gateway.endpoint.hiveserver2.thrift.port",
+                                                "10000");
+                                try (Connection connection =
+                                                DriverManager.getConnection(
+                                                        String.format(
+                                                                "jdbc:hive2://%s:%s/default;auth=noSasl;",
+                                                                host, port));
+                                        Statement statement = connection.createStatement()) {
+                                    for (String jar : job.getJars()) {
+                                        statement.execute(String.format("ADD JAR '%s'", jar));
+                                    }
+                                    for (String sql : job.getSqlLines()) {
+                                        statement.execute(sql);
+                                    }
+                                }
+                            });
+            new Thread(future).start();
+            future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+        }
     }
 
     public void performJarAddition(JarAddition addition) throws IOException {
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java
index 4bbf212235b..397d7428823 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java
@@ -46,6 +46,15 @@ public interface FlinkResource extends ExternalResource {
      */
     ClusterController startCluster(int numTaskManagers) throws IOException;
 
+    /**
+     * Starts a sqlserver and returns the {@link GatewayController} which can be used to shut down
+     * the process.
+     *
+     * @return controller for interacting with the cluster
+     * @throws IOException
+     */
+    GatewayController startSqlGateway() throws IOException;
+
     /**
      * Searches the logs of all processes for the given pattern, and applies the given processor for
      * every line for which {@link Matcher#matches()} returned true.
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/GatewayController.java
similarity index 72%
copy from flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java
copy to flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/GatewayController.java
index f732e07ce37..2b49face588 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/GatewayController.java
@@ -18,25 +18,14 @@
 
 package org.apache.flink.tests.util.flink;
 
-import org.apache.flink.test.util.JobSubmission;
 import org.apache.flink.test.util.SQLJobSubmission;
 import org.apache.flink.util.AutoCloseableAsync;
 
 import java.io.IOException;
 import java.time.Duration;
 
-/** Controller for interacting with a cluster. */
-public interface ClusterController extends AutoCloseableAsync {
-
-    /**
-     * Submits the given job to the cluster.
-     *
-     * @param job job to submit
-     * @param timeout the maximum time to wait.
-     * @return JobController for the submitted job
-     * @throws IOException
-     */
-    JobController submitJob(JobSubmission job, Duration timeout) throws IOException;
+/** Controller for interacting with a SqlGateway. */
+public interface GatewayController extends AutoCloseableAsync {
 
     /**
      * Submits the given SQL job to the cluster.
@@ -45,5 +34,5 @@ public interface ClusterController extends AutoCloseableAsync {
      * @param timeout the maximum time to wait.
      * @throws IOException if any IO error happen.
      */
-    void submitSQLJob(SQLJobSubmission job, Duration timeout) throws IOException;
+    void submitSQLJob(SQLJobSubmission job, Duration timeout) throws Exception;
 }
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
index dc767d382e0..7b5658867f4 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
@@ -113,7 +113,7 @@ public class LocalStandaloneFlinkResource implements FlinkResource {
     private void shutdownCluster() {
         try {
             distribution.stopFlinkCluster();
-            distribution.stopSQLGateway();
+            distribution.stopSqlGateway();
         } catch (IOException e) {
             LOG.warn("Error while shutting down Flink cluster.", e);
         }
@@ -184,8 +184,11 @@ public class LocalStandaloneFlinkResource implements FlinkResource {
         throw new RuntimeException("Cluster did not start in expected time-frame.");
     }
 
-    public void startSQLGateway(String arg) throws IOException {
-        distribution.startSQLGateway(arg);
+    @Override
+    public GatewayController startSqlGateway() throws IOException {
+        distribution.startSqlGateway();
+
+        return new GatewayClusterControllerImpl(distribution);
     }
 
     @Override
@@ -194,6 +197,30 @@ public class LocalStandaloneFlinkResource implements FlinkResource {
         return distribution.searchAllLogs(pattern, matchProcessor);
     }
 
+    private static class GatewayClusterControllerImpl implements GatewayController {
+
+        private final FlinkDistribution distribution;
+
+        public GatewayClusterControllerImpl(FlinkDistribution distribution) {
+            this.distribution = distribution;
+        }
+
+        @Override
+        public CompletableFuture<Void> closeAsync() {
+            try {
+                distribution.stopSqlGateway();
+                return CompletableFuture.completedFuture(null);
+            } catch (IOException e) {
+                return FutureUtils.completedExceptionally(e);
+            }
+        }
+
+        @Override
+        public void submitSQLJob(SQLJobSubmission job, Duration timeout) throws Exception {
+            distribution.submitSQLJob(job, timeout);
+        }
+    }
+
     private static class StandaloneClusterController implements ClusterController {
 
         private final FlinkDistribution distribution;
@@ -210,7 +237,7 @@ public class LocalStandaloneFlinkResource implements FlinkResource {
         }
 
         @Override
-        public void submitSQLJob(SQLJobSubmission job, Duration timeout) throws IOException {
+        public void submitSQLJob(SQLJobSubmission job, Duration timeout) throws Exception {
             distribution.submitSQLJob(job, timeout);
         }
 
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java
index 165d0f7d832..0f15579d156 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java
@@ -228,7 +228,7 @@ public class SQLClientHBaseITCase extends TestLogger {
     }
 
     private void executeSqlStatements(ClusterController clusterController, List<String> sqlLines)
-            throws IOException {
+            throws Exception {
         LOG.info("Executing SQL: HBase source table -> HBase sink table");
         clusterController.submitSQLJob(
                 new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java
index 79e6c59db08..683e72cb155 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java
@@ -131,7 +131,7 @@ public class PlannerScalaFreeITCase extends TestLogger {
     }
 
     private void executeSqlStatements(ClusterController clusterController, List<String> sqlLines)
-            throws IOException {
+            throws Exception {
         LOG.info("Executing end-to-end SQL statements {}.", sqlLines);
         clusterController.submitSQLJob(
                 new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml b/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml
index 3b8cc597bb2..9c30c64b880 100644
--- a/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml
+++ b/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml
@@ -17,68 +17,167 @@ specific language governing permissions and limitations
 under the License.
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
-		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<parent>
-		<artifactId>flink-end-to-end-tests</artifactId>
-		<groupId>org.apache.flink</groupId>
-		<version>1.16-SNAPSHOT</version>
-	</parent>
-	<modelVersion>4.0.0</modelVersion>
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>flink-end-to-end-tests</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>1.16-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
 
-	<artifactId>flink-sql-gateway-test</artifactId>
-	<name>Flink : E2E Tests : SQL Gateway</name>
-	<packaging>jar</packaging>
+    <artifactId>flink-sql-gateway-test</artifactId>
+    <name>Flink : E2E Tests : SQL Gateway</name>
+    <packaging>jar</packaging>
 
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-end-to-end-tests-common</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-sql-connector-hive-${hive.version}_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hive</groupId>
-			<artifactId>hive-jdbc</artifactId>
-			<version>${hive.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils-junit</artifactId>
-		</dependency>
-	</dependencies>
+    <properties>
+        <!-- The test container uses hive-2.1.0 -->
+        <hive.version>2.3.9</hive.version>
+        <flink.hadoop.version>2.8.5</flink.hadoop.version>
+    </properties>
 
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-dependency-plugin</artifactId>
-				<executions>
-					<execution>
-						<id>copy</id>
-						<phase>pre-integration-test</phase>
-						<goals>
-							<goal>copy</goal>
-						</goals>
-					</execution>
-				</executions>
-				<configuration>
-					<artifactItems>
-						<artifactItem>
-							<groupId>org.apache.flink</groupId>
-							<artifactId>flink-sql-connector-hive-2.3.9_${scala.binary.version}</artifactId>
-							<version>${project.version}</version>
-							<destFileName>flink-sql-connector-hive-2.3.9_${scala.binary.version}-${project.version}.jar</destFileName>
-							<type>jar</type>
-							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
-						</artifactItem>
-					</artifactItems>
-				</configuration>
-			</plugin>
-		</plugins>
-	</build>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-end-to-end-tests-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-sql-connector-hive-${hive.version}_${scala.binary.version}
+            </artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <!-- It contains jackson-annotations that is conflicts with TestContainer -->
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-exec</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-jdbc</artifactId>
+            <version>${hive.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils-junit</artifactId>
+        </dependency>
+
+        <!-- hadoop dependencies for end-to-end test -->
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${flink.hadoop.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <!-- This dependency is no longer shipped with the JDK since Java 9.-->
+                    <groupId>jdk.tools</groupId>
+                    <artifactId>jdk.tools</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-enforcer-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>dependency-convergence</id>
+                        <goals>
+                            <goal>enforce</goal>
+                        </goals>
+                        <configuration>
+                            <skip>true</skip>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>copy</id>
+                        <phase>pre-integration-test</phase>
+                        <goals>
+                            <goal>copy</goal>
+                        </goals>
+                        <configuration>
+                            <artifactItems>
+                                <artifactItem>
+                                    <groupId>org.apache.flink</groupId>
+                                    <artifactId>
+                                        flink-sql-connector-hive-${hive.version}_${scala.binary.version}
+                                    </artifactId>
+                                    <version>${project.version}</version>
+                                    <destFileName>
+                                        flink-sql-connector-hive-${hive.version}_${scala.binary.version}-${project.version}.jar
+                                    </destFileName>
+                                    <type>jar</type>
+                                    <outputDirectory>${project.build.directory}/dependencies
+                                    </outputDirectory>
+                                </artifactItem>
+                            </artifactItems>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>store-classpath-in-target-for-tests</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>build-classpath</goal>
+                        </goals>
+                        <configuration>
+                            <outputFile>${project.build.directory}/hadoop.classpath</outputFile>
+                            <excludeGroupIds>org.apache.flink</excludeGroupIds>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+    <profiles>
+        <profile>
+            <id>hive3</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-surefire-plugin</artifactId>
+                        <configuration>
+                            <skip>true</skip>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
 </project>
diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SQLGatewayITCase.java b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SQLGatewayITCase.java
deleted file mode 100644
index 5b037b1ccae..00000000000
--- a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SQLGatewayITCase.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.gateway;
-
-import org.apache.flink.tests.util.TestUtils;
-import org.apache.flink.tests.util.flink.ClusterController;
-import org.apache.flink.tests.util.flink.FlinkResource;
-import org.apache.flink.tests.util.flink.FlinkResourceSetup;
-import org.apache.flink.tests.util.flink.JarLocation;
-import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResource;
-import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Rule;
-import org.junit.Test;
-
-import java.nio.file.Path;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-public class SQLGatewayITCase extends TestLogger {
-
-    private static String JDBC_URL = "jdbc:hive2://localhost:8084/default;auth=noSasl";
-    private static String DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver";
-
-    private static final Path HIVE_SQL_CONNECOTR_JAR =
-            TestUtils.getResource(".*dependencies/flink-sql-connector-hive-.*.jar");
-
-    @Rule
-    public final FlinkResource flink =
-            new LocalStandaloneFlinkResourceFactory()
-                    .create(
-                            FlinkResourceSetup.builder()
-                                    .addJar(HIVE_SQL_CONNECOTR_JAR, JarLocation.LIB)
-                                    .build());
-
-    @Test
-    public void testGateway() throws Exception {
-        try (ClusterController clusterController = flink.startCluster(1)) {
-            ((LocalStandaloneFlinkResource) flink)
-                    .startSQLGateway("-Dsql-gateway.endpoint.type=hiveserver2");
-            Thread.sleep(2000);
-            Class.forName(DRIVER_NAME);
-            try {
-                DriverManager.getConnection(JDBC_URL);
-            } catch (SQLException e) {
-                assertThat(e.getMessage())
-                        .contains(
-                                "Embedded metastore is not allowed. Make sure you have set a valid value for hive.metastore.uris");
-            }
-        }
-    }
-}
diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java
new file mode 100644
index 00000000000..b7a4e903ffa
--- /dev/null
+++ b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.endpoint.hive.HiveServer2Endpoint;
+import org.apache.flink.table.gateway.containers.HiveContainer;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.GatewayController;
+import org.apache.flink.tests.util.flink.JarLocation;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.catalog.hive.HiveCatalog.isEmbeddedMetastore;
+import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.CATALOG_HIVE_CONF_DIR;
+import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_PORT;
+import static org.apache.flink.tests.util.TestUtils.readCsvResultFiles;
+import static org.junit.Assert.assertEquals;
+
+/** E2E Tests for {@code SqlGateway} with {@link HiveServer2Endpoint}. */
+public class SqlGatewayE2ECase extends TestLogger {
+
+    private static final Path HIVE_SQL_CONNECTOR_JAR =
+            TestUtils.getResource(".*dependencies/flink-sql-connector-hive-.*.jar");
+    private static final Path HADOOP_CLASS_PATH = TestUtils.getResource(".*hadoop.classpath");
+    private static final String GATEWAY_E2E_SQL = "gateway_e2e.sql";
+    private static final Configuration ENDPOINT_CONFIG = new Configuration();
+    private static final String RESULT_KEY = "$RESULT";
+
+    @ClassRule public static final TemporaryFolder FOLDER = new TemporaryFolder();
+    @ClassRule public static final HiveContainer HIVE_CONTAINER = new HiveContainer();
+    @Rule public final FlinkResource flinkResource = buildFlinkResource();
+
+    private static NetUtils.Port port;
+
+    @BeforeClass
+    public static void beforeClass() {
+        ENDPOINT_CONFIG.setString(
+                getPrefixedConfigOptionName(CATALOG_HIVE_CONF_DIR), createHiveConf().getParent());
+    }
+
+    @AfterClass
+    public static void afterClass() throws Exception {
+        port.close();
+    }
+
+    @Test
+    public void testExecuteStatement() throws Exception {
+        URL url = SqlGatewayE2ECase.class.getClassLoader().getResource(GATEWAY_E2E_SQL);
+        if (url == null) {
+            throw new FileNotFoundException(GATEWAY_E2E_SQL);
+        }
+        File result = FOLDER.newFolder("csv");
+        String sql =
+                Files.readAllLines(new File(url.getFile()).toPath()).stream()
+                        .filter(line -> !line.trim().startsWith("--"))
+                        .collect(Collectors.joining());
+        List<String> lines =
+                Arrays.stream(sql.split(";"))
+                        .map(line -> line.replace(RESULT_KEY, result.getAbsolutePath()))
+                        .collect(Collectors.toList());
+
+        try (GatewayController gateway = flinkResource.startSqlGateway();
+                ClusterController ignore = flinkResource.startCluster(1)) {
+            gateway.submitSQLJob(
+                    new SQLJobSubmission.SQLJobSubmissionBuilder(lines)
+                            .setClientMode(SQLJobSubmission.ClientMode.HIVE_JDBC)
+                            .build(),
+                    Duration.ofSeconds(60));
+        }
+        assertEquals(Collections.singletonList("1"), readCsvResultFiles(result.toPath()));
+    }
+
+    private static File createHiveConf() {
+        HiveConf hiveConf = new HiveConf();
+        try (InputStream inputStream =
+                new FileInputStream(
+                        new File(
+                                Objects.requireNonNull(
+                                                SqlGatewayE2ECase.class
+                                                        .getClassLoader()
+                                                        .getResource(HiveCatalog.HIVE_SITE_FILE))
+                                        .toURI()))) {
+            hiveConf.addResource(inputStream, HiveCatalog.HIVE_SITE_FILE);
+            // trigger a read from the conf so that the input stream is read
+            isEmbeddedMetastore(hiveConf);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to load hive-site.xml from specified path", e);
+        }
+        hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, HIVE_CONTAINER.getHiveMetastoreURI());
+        try {
+            File site = FOLDER.newFile(HiveCatalog.HIVE_SITE_FILE);
+            try (OutputStream out = new FileOutputStream(site)) {
+                hiveConf.writeXml(out);
+            }
+            return site;
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to create hive conf.", e);
+        }
+    }
+
+    /**
+     * Build required environment. It prepares all hadoop jars to mock HADOOP_CLASSPATH, use
+     * hadoop.classpath which contains all hadoop jars. It also moves planner to the lib and remove
+     * the planner load to make the Hive sql connector works.
+     */
+    private static FlinkResource buildFlinkResource() {
+        // add hive jar and planner jar
+        FlinkResourceSetup.FlinkResourceSetupBuilder builder =
+                FlinkResourceSetup.builder()
+                        .addJar(HIVE_SQL_CONNECTOR_JAR, JarLocation.LIB)
+                        .moveJar("flink-table-planner", JarLocation.OPT, JarLocation.LIB)
+                        .moveJar("flink-table-planner-loader", JarLocation.LIB, JarLocation.OPT);
+        // add hadoop jars
+        File hadoopClasspathFile = new File(HADOOP_CLASS_PATH.toAbsolutePath().toString());
+        if (!hadoopClasspathFile.exists()) {
+            throw new RuntimeException(
+                    "File that contains hadoop classpath "
+                            + HADOOP_CLASS_PATH
+                            + " does not exist.");
+        }
+        try {
+            String classPathContent = FileUtils.readFileUtf8(hadoopClasspathFile);
+            Arrays.stream(classPathContent.split(":"))
+                    .map(Paths::get)
+                    .forEach(jar -> builder.addJar(jar, JarLocation.LIB));
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to build the FlinkResource.", e);
+        }
+        // add hive server2 endpoint related configs
+        port = NetUtils.getAvailablePort();
+        Map<String, String> endpointConfig = new HashMap<>();
+        endpointConfig.put("sql-gateway.endpoint.type", "hiveserver2");
+        endpointConfig.put(
+                getPrefixedConfigOptionName(THRIFT_PORT), String.valueOf(port.getPort()));
+        ENDPOINT_CONFIG.addAll(Configuration.fromMap(endpointConfig));
+        builder.addConfiguration(ENDPOINT_CONFIG);
+
+        return new LocalStandaloneFlinkResourceFactory().create(builder.build());
+    }
+
+    private static String getPrefixedConfigOptionName(ConfigOption<?> option) {
+        String prefix = "sql-gateway.endpoint.hiveserver2.";
+        return prefix + option.key();
+    }
+}
diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/containers/HiveContainer.java b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/containers/HiveContainer.java
new file mode 100644
index 00000000000..35dc6325779
--- /dev/null
+++ b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/containers/HiveContainer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.containers;
+
+import org.apache.flink.util.DockerImageVersions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerImageName;
+
+/** Test Container for hive. */
+public class HiveContainer extends GenericContainer<HiveContainer> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HiveContainer.class);
+
+    public static final String HOST_NAME = "hadoop-master";
+    public static final int HIVE_METASTORE_PORT = 9083;
+
+    public HiveContainer() {
+        super(DockerImageName.parse(DockerImageVersions.HIVE2));
+        withExtraHost(HOST_NAME, "127.0.0.1");
+        addExposedPort(HIVE_METASTORE_PORT);
+    }
+
+    @Override
+    protected void doStart() {
+        super.doStart();
+        if (LOG.isInfoEnabled()) {
+            followOutput(new Slf4jLogConsumer(LOG));
+        }
+    }
+
+    public String getHiveMetastoreURI() {
+        return String.format("thrift://%s:%s", getHost(), getMappedPort(HIVE_METASTORE_PORT));
+    }
+}
diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/resources/gateway_e2e.sql b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/resources/gateway_e2e.sql
new file mode 100644
index 00000000000..88fd47f445e
--- /dev/null
+++ b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/resources/gateway_e2e.sql
@@ -0,0 +1,30 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+SET table.sql-dialect=default;
+
+CREATE TABLE CsvTable (
+  val INT
+) WITH (
+  'connector' = 'filesystem',
+  'path' = '$RESULT',
+  'sink.rolling-policy.rollover-interval' = '2s',
+  'sink.rolling-policy.check-interval' = '2s',
+  'format' = 'csv',
+  'csv.disable-quote-character' = 'true'
+);
+
+INSERT INTO CsvTable SELECT 1;
diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/resources/hive-site.xml b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/resources/hive-site.xml
new file mode 100644
index 00000000000..9f74734e801
--- /dev/null
+++ b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/resources/hive-site.xml
@@ -0,0 +1,86 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<configuration>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+    <property>
+        <name>hive.metastore.uris</name>
+        <value>thrift://localhost:9083</value>
+    </property>
+
+    <property>
+        <name>javax.jdo.option.ConnectionURL</name>
+        <value>jdbc:mysql://localhost:3306/metastore?useSSL=false</value>
+    </property>
+
+    <property>
+        <name>javax.jdo.option.ConnectionDriverName</name>
+        <value>com.mysql.cj.jdbc.Driver</value>
+    </property>
+
+    <property>
+        <name>javax.jdo.option.ConnectionUserName</name>
+        <value>root</value>
+    </property>
+
+    <property>
+        <name>javax.jdo.option.ConnectionPassword</name>
+        <value>root</value>
+    </property>
+
+    <property>
+        <name>hive.metastore.connect.retries</name>
+        <value>15</value>
+    </property>
+
+    <property>
+        <name>hive.metastore.disallow.incompatible.col.type.changes</name>
+        <value>false</value>
+    </property>
+
+    <property>
+        <!-- https://community.hortonworks.com/content/supportkb/247055/errorjavalangunsupportedoperationexception-storage.html -->
+        <name>metastore.storage.schema.reader.impl</name>
+        <value>org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader</value>
+    </property>
+
+    <property>
+        <name>hive.support.concurrency</name>
+        <value>true</value>
+    </property>
+
+    <property>
+        <name>hive.txn.manager</name>
+        <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
+    </property>
+
+    <property>
+        <name>hive.compactor.initiator.on</name>
+        <value>true</value>
+    </property>
+
+    <property>
+        <name>hive.compactor.worker.threads</name>
+        <value>1</value>
+    </property>
+
+    <property>
+        <name>hive.users.in.admin.role</name>
+        <value>hdfs,hive</value>
+    </property>
+
+</configuration>
diff --git a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/resources/log4j2-test.properties
similarity index 56%
copy from tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist
copy to flink-end-to-end-tests/flink-sql-gateway-test/src/test/resources/log4j2-test.properties
index a1ac43e7cad..835c2ec9a3d 100644
--- a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist
+++ b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/resources/log4j2-test.properties
@@ -16,27 +16,13 @@
 # limitations under the License.
 ################################################################################
 
-# These modules are not deployed to maven central, despite their use of the shade plugin.
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
 
-flink-examples-streaming-gcp-pubsub
-flink-yarn-tests
-flink-docs
-flink-datastream-allround-test
-flink-queryable-state-test
-flink-confluent-schema-registry
-flink-stream-stateful-job-upgrade-test
-flink-elasticsearch7-test
-flink-stream-state-ttl-test
-flink-state-evolution-test
-flink-elasticsearch6-test
-flink-rocksdb-state-memory-control-test
-flink-python-test
-flink-streaming-kinesis-test
-flink-tpch-test
-flink-streaming-kafka-test-base
-flink-heavy-deployment-stress-test
-flink-high-parallelism-iterations-test
-flink-end-to-end-tests-common-kafka
-flink-end-to-end-tests-pulsar
-flink-end-to-end-tests-elasticsearch7
-flink-end-to-end-tests-elasticsearch6
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 31bd7facf9a..bc477e349cf 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -55,6 +55,7 @@ under the License.
 		<module>flink-confluent-schema-registry</module>
 		<module>flink-stream-state-ttl-test</module>
 		<module>flink-sql-client-test</module>
+		<module>flink-sql-gateway-test</module>
 		<module>flink-file-sink-test</module>
 		<module>flink-state-evolution-test</module>
 		<module>flink-rocksdb-state-memory-control-test</module>
@@ -83,7 +84,6 @@ under the License.
 		<module>flink-end-to-end-tests-elasticsearch7</module>
 		<module>flink-end-to-end-tests-common-elasticsearch</module>
 		<module>flink-end-to-end-tests-sql</module>
-        <module>flink-sql-gateway-test</module>
     </modules>
 
 	<dependencyManagement>
diff --git a/flink-table/flink-sql-gateway/bin/sql-gateway.sh b/flink-table/flink-sql-gateway/bin/sql-gateway.sh
index f925a25adcf..5b300fb1c48 100644
--- a/flink-table/flink-sql-gateway/bin/sql-gateway.sh
+++ b/flink-table/flink-sql-gateway/bin/sql-gateway.sh
@@ -17,34 +17,16 @@
 # limitations under the License.
 #
 
-# Start/stop a Flink SQL Gateway.
-
 function usage() {
-  echo "Usage: bin/sql-gateway.sh command"
+  echo "Usage: sql-gateway.sh [start|start-foreground|stop|stop-all] [args]"
   echo "  commands:"
-  echo "    start            - Run a SQL Gateway as a daemon"
-  echo "    start-foreground - Run a SQL Gateway as a console application"
-  echo "    stop             - Stop the SQL Gateway daemon"
-  echo "    stop-all         - Stop all the SQL Gateway daemons"
-  echo "    -h | --help      - Show this help message"
+  echo "    start               - Run a SQL Gateway as a daemon"
+  echo "    start-foreground    - Run a SQL Gateway as a console application"
+  echo "    stop                - Stop the SQL Gateway daemon"
+  echo "    stop-all            - Stop all the SQL Gateway daemons"
+  echo "    -h | --help         - Show this help message"
 }
 
-if [[ "$*" = *--help ]] || [[ "$*" = *-h ]]; then
-  usage
-  exit 0
-fi
-
-STARTSTOP=$1
-
-if [ -z "$STARTSTOP" ]; then
-  STARTSTOP="start"
-fi
-
-if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
-  usage
-  exit 1
-fi
-
 ################################################################################
 # Adopted from "flink" bash script
 ################################################################################
@@ -75,7 +57,35 @@ if [ "$FLINK_IDENT_STRING" = "" ]; then
         FLINK_IDENT_STRING="$USER"
 fi
 
-ENTRYPOINT=sqlgateway
+################################################################################
+# SQL gateway specific logic
+################################################################################
+
+ENTRYPOINT=sql-gateway
+
+if [[ "$1" = *--help ]] || [[ "$1" = *-h ]]; then
+  usage
+  exit 0
+fi
+
+STARTSTOP=$1
+
+if [ -z "$STARTSTOP" ]; then
+  STARTSTOP="start"
+fi
+
+if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
+  usage
+  exit 1
+fi
+
+# ./sql-gateway.sh start --help, print the message to the console
+if [[ "$STARTSTOP" = start* ]] && ( [[ "$*" = *--help* ]] || [[ "$*" = *-h* ]] ); then
+  FLINK_TM_CLASSPATH=`constructFlinkClassPath`
+  SQL_GATEWAY_CLASSPATH=`findSqlGatewayJar`
+  "$JAVA_RUN"  -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$SQL_GATEWAY_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.table.gateway.SqlGateway "${@:2}"
+  exit 0
+fi
 
 if [[ $STARTSTOP == "start-foreground" ]]; then
     exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${@:2}"
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/SqlGateway.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/SqlGateway.java
index f2b2fa384f6..9f6f409063b 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/SqlGateway.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/SqlGateway.java
@@ -67,18 +67,16 @@ public class SqlGateway {
         sessionManager.start();
         SqlGatewayService sqlGatewayService = new SqlGatewayServiceImpl(sessionManager);
 
-        endpoints.addAll(
-                SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(
-                        sqlGatewayService, context.getFlinkConfig()));
-
-        for (SqlGatewayEndpoint endpoint : endpoints) {
-            try {
+        try {
+            endpoints.addAll(
+                    SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(
+                            sqlGatewayService, context.getFlinkConfig()));
+            for (SqlGatewayEndpoint endpoint : endpoints) {
                 endpoint.start();
-            } catch (Throwable t) {
-                LOG.error("Failed to start the endpoint.", t);
-                stop();
-                throw new SqlGatewayException("Failed to start the endpoint.", t);
             }
+        } catch (Throwable t) {
+            LOG.error("Failed to start the endpoints.", t);
+            throw new SqlGatewayException("Failed to start the endpoints.", t);
         }
     }
 
@@ -97,11 +95,6 @@ public class SqlGateway {
     }
 
     public static void main(String[] args) {
-        // startup checks and logging
-        EnvironmentInformation.logEnvironmentInfo(LOG, "SqlGateway", args);
-        SignalHandler.register(LOG);
-        JvmShutdownSafeguard.installAsShutdownHook(LOG);
-
         startSqlGateway(System.out, args);
     }
 
@@ -114,6 +107,11 @@ public class SqlGateway {
             return;
         }
 
+        // startup checks and logging
+        EnvironmentInformation.logEnvironmentInfo(LOG, "SqlGateway", args);
+        SignalHandler.register(LOG);
+        JvmShutdownSafeguard.installAsShutdownHook(LOG);
+
         SqlGateway gateway = new SqlGateway(cliOptions.getDynamicConfigs());
         try {
             Runtime.getRuntime().addShutdownHook(new ShutdownThread(gateway));
@@ -128,11 +126,19 @@ public class SqlGateway {
             // make space in terminal
             stream.println();
             stream.println();
-            LOG.error(
-                    "SqlGateway must stop. Unexpected exception. This is a bug. Please consider filing an issue.",
-                    t);
-            throw new SqlGatewayException(
-                    "Unexpected exception. This is a bug. Please consider filing an issue.", t);
+
+            if (t instanceof SqlGatewayException) {
+                // Exception that the gateway can not handle.
+                throw (SqlGatewayException) t;
+            } else {
+                LOG.error(
+                        "SqlGateway must stop. Unexpected exception. This is a bug. Please consider filing an issue.",
+                        t);
+                throw new SqlGatewayException(
+                        "Unexpected exception. This is a bug. Please consider filing an issue.", t);
+            }
+        } finally {
+            gateway.stop();
         }
     }
 
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/SqlGatewayTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/SqlGatewayTest.java
index 10095dd3d76..57d6bcac381 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/SqlGatewayTest.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/SqlGatewayTest.java
@@ -38,6 +38,7 @@ import java.util.UUID;
 
 import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
 
 /** Tests for the {@link SqlGateway}. */
 public class SqlGatewayTest {
@@ -95,26 +96,36 @@ public class SqlGatewayTest {
                     "-Dsql-gateway.endpoint.mocked.host=localhost",
                     "-Dsql-gateway.endpoint.mocked.port=9999"
                 };
-        PrintStream stream = new PrintStream(output);
-        Thread thread =
-                new ExecutorThreadFactory(
-                                "SqlGateway-thread-pool",
-                                (t, exception) -> exception.printStackTrace(stream))
-                        .newThread(() -> SqlGateway.startSqlGateway(stream, args));
-        thread.start();
-
-        CommonTestUtils.waitUtil(
-                () -> MockedSqlGatewayEndpoint.isRunning(id),
-                Duration.ofSeconds(10),
-                "Failed to get the endpoint starts.");
-
-        thread.interrupt();
-        CommonTestUtils.waitUtil(
-                () -> !thread.isAlive(),
-                Duration.ofSeconds(10),
-                "Failed to get the endpoint starts.");
-        assertThat(output.toString())
-                .doesNotContain(
-                        "Unexpected exception. This is a bug. Please consider filing an issue.");
+        try (PrintStream stream = new PrintStream(output)) {
+            Thread thread =
+                    new ExecutorThreadFactory(
+                                    "SqlGateway-thread-pool",
+                                    (t, exception) -> exception.printStackTrace(stream))
+                            .newThread(() -> SqlGateway.startSqlGateway(stream, args));
+            thread.start();
+
+            CommonTestUtils.waitUtil(
+                    () -> MockedSqlGatewayEndpoint.isRunning(id),
+                    Duration.ofSeconds(10),
+                    "Failed to get the endpoint starts.");
+
+            thread.interrupt();
+            CommonTestUtils.waitUtil(
+                    () -> !thread.isAlive(),
+                    Duration.ofSeconds(10),
+                    "Failed to get the endpoint starts.");
+            assertThat(output.toString())
+                    .doesNotContain(
+                            "Unexpected exception. This is a bug. Please consider filing an issue.");
+        }
+    }
+
+    @Test
+    public void testFailedToStartSqlGateway() {
+        try (PrintStream stream = new PrintStream(output)) {
+            assertThatThrownBy(() -> SqlGateway.startSqlGateway(stream, new String[0]))
+                    .doesNotHaveToString(
+                            "Unexpected exception. This is a bug. Please consider filing an issue.");
+        }
     }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 8d00017f46e..e2ef76b26eb 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -36,6 +36,7 @@ import org.apache.flink.table.planner.calcite._
 import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema
 import org.apache.flink.table.planner.connectors.DynamicSinkUtils
 import org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast
+import org.apache.flink.table.planner.delegation.DialectFactory.DefaultParserContext
 import org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl
 import org.apache.flink.table.planner.hint.FlinkHints
 import org.apache.flink.table.planner.operations.PlannerQueryOperation
@@ -55,7 +56,6 @@ import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
 
 import _root_.scala.collection.JavaConversions._
-import DialectFactory.DefaultParserContext
 import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
 import org.apache.calcite.plan.{RelTrait, RelTraitDef}
 import org.apache.calcite.rel.RelNode
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
index e59e939500a..f0f367f7bf9 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
@@ -54,4 +54,6 @@ public class DockerImageVersions {
 
     public static final String GOOGLE_CLOUD_PUBSUB_EMULATOR =
             "gcr.io/google.com/cloudsdktool/cloud-sdk:379.0.0";
+
+    public static final String HIVE2 = "prestodb/hdp2.6-hive:10";
 }
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SQLJobSubmission.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SQLJobSubmission.java
index cff93ccaf0d..5ec15f8dfe2 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SQLJobSubmission.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SQLJobSubmission.java
@@ -27,14 +27,20 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /** Programmatic definition of a SQL job-submission. */
 public class SQLJobSubmission {
 
+    private final ClientMode clientMode;
     private final List<String> sqlLines;
     private final List<String> jars;
 
-    private SQLJobSubmission(List<String> sqlLines, List<String> jars) {
+    private SQLJobSubmission(ClientMode clientMode, List<String> sqlLines, List<String> jars) {
+        this.clientMode = clientMode;
         this.sqlLines = checkNotNull(sqlLines);
         this.jars = checkNotNull(jars);
     }
 
+    public ClientMode getClientMode() {
+        return clientMode;
+    }
+
     public List<String> getJars() {
         return this.jars;
     }
@@ -45,6 +51,7 @@ public class SQLJobSubmission {
 
     /** Builder for the {@link SQLJobSubmission}. */
     public static class SQLJobSubmissionBuilder {
+        private ClientMode clientMode = ClientMode.SQL_CLIENT;
         private final List<String> sqlLines;
         private final List<String> jars = new ArrayList<>();
 
@@ -52,6 +59,11 @@ public class SQLJobSubmission {
             this.sqlLines = sqlLines;
         }
 
+        public SQLJobSubmissionBuilder setClientMode(ClientMode clientMode) {
+            this.clientMode = clientMode;
+            return this;
+        }
+
         public SQLJobSubmissionBuilder addJar(Path jarFile) {
             this.jars.add(jarFile.toAbsolutePath().toString());
             return this;
@@ -70,7 +82,14 @@ public class SQLJobSubmission {
         }
 
         public SQLJobSubmission build() {
-            return new SQLJobSubmission(sqlLines, jars);
+            return new SQLJobSubmission(clientMode, sqlLines, jars);
         }
     }
+
+    /** Use which client to submit job. */
+    public enum ClientMode {
+        SQL_CLIENT,
+
+        HIVE_JDBC
+    }
 }
diff --git a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist b/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist
index a1ac43e7cad..7b0be584fee 100644
--- a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist
+++ b/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist
@@ -40,3 +40,4 @@ flink-end-to-end-tests-common-kafka
 flink-end-to-end-tests-pulsar
 flink-end-to-end-tests-elasticsearch7
 flink-end-to-end-tests-elasticsearch6
+flink-sql-gateway-test