You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/08/03 07:08:41 UTC
[flink] 02/02: [FLINK-27770][sql-gateway] Refactor SqlGateway E2E tests
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 31bb1f49bf4ccc657ad6879bdf5d954cc24062d1
Author: Shengkai <10...@qq.com>
AuthorDate: Mon Aug 1 11:55:51 2022 +0800
[FLINK-27770][sql-gateway] Refactor SqlGateway E2E tests
This closes #20174
---
.../table/endpoint/hive/HiveServer2Endpoint.java | 6 +-
.../flink-sql-connector-hive-2.3.9/pom.xml | 9 +
flink-dist/src/main/flink-bin/bin/flink-console.sh | 4 +-
flink-dist/src/main/flink-bin/bin/flink-daemon.sh | 4 +-
.../tests/util/kafka/SQLClientKafkaITCase.java | 16 +-
.../org/apache/flink/tests/util/TestUtils.java | 16 ++
.../flink/tests/util/flink/ClusterController.java | 2 +-
.../flink/tests/util/flink/FlinkDistribution.java | 75 +++++--
.../flink/tests/util/flink/FlinkResource.java | 9 +
...usterController.java => GatewayController.java} | 17 +-
.../util/flink/LocalStandaloneFlinkResource.java | 35 +++-
.../tests/util/hbase/SQLClientHBaseITCase.java | 2 +-
.../table/sql/codegen/PlannerScalaFreeITCase.java | 2 +-
.../flink-sql-gateway-test/pom.xml | 221 +++++++++++++++------
.../flink/table/gateway/SQLGatewayITCase.java | 71 -------
.../flink/table/gateway/SqlGatewayE2ECase.java | 197 ++++++++++++++++++
.../table/gateway/containers/HiveContainer.java | 53 +++++
.../src/test/resources/gateway_e2e.sql | 30 +++
.../src/test/resources/hive-site.xml | 86 ++++++++
.../src/test/resources/log4j2-test.properties | 32 +--
flink-end-to-end-tests/pom.xml | 2 +-
flink-table/flink-sql-gateway/bin/sql-gateway.sh | 60 +++---
.../org/apache/flink/table/gateway/SqlGateway.java | 46 +++--
.../apache/flink/table/gateway/SqlGatewayTest.java | 53 +++--
.../table/planner/delegation/PlannerBase.scala | 2 +-
.../org/apache/flink/util/DockerImageVersions.java | 2 +
.../apache/flink/test/util/SQLJobSubmission.java | 23 ++-
.../modules-skipping-deployment.modulelist | 1 +
28 files changed, 795 insertions(+), 281 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index 07597450a4f..8ae5bb23954 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -318,9 +318,9 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
resp.setServerProtocolVersion(sessionVersion.getVersion());
resp.setSessionHandle(toTSessionHandle(sessionHandle));
resp.setConfiguration(service.getSessionConfig(sessionHandle));
- } catch (Exception e) {
- LOG.error("Failed to OpenSession.", e);
- resp.setStatus(toTStatus(e));
+ } catch (Throwable t) {
+ LOG.error("Failed to OpenSession.", t);
+ resp.setStatus(toTStatus(t));
}
return resp;
}
diff --git a/flink-connectors/flink-sql-connector-hive-2.3.9/pom.xml b/flink-connectors/flink-sql-connector-hive-2.3.9/pom.xml
index e22865a8f0c..a948ed33bcf 100644
--- a/flink-connectors/flink-sql-connector-hive-2.3.9/pom.xml
+++ b/flink-connectors/flink-sql-connector-hive-2.3.9/pom.xml
@@ -104,6 +104,15 @@ under the License.
<shadedPattern>org.apache.flink.hive.shaded.com.google</shadedPattern>
</relocation>
</relocations>
+ <!-- Table planner also contains calcite dependency. Exclude to prevent class conflicts. -->
+ <filters>
+ <filter>
+ <artifact>org.apache.hive:hive-exec</artifact>
+ <excludes>
+ <exclude>org/apache/calcite/**</exclude>
+ </excludes>
+ </filter>
+ </filters>
</configuration>
</execution>
</executions>
diff --git a/flink-dist/src/main/flink-bin/bin/flink-console.sh b/flink-dist/src/main/flink-bin/bin/flink-console.sh
index 05b9d42632e..68181d2023b 100755
--- a/flink-dist/src/main/flink-bin/bin/flink-console.sh
+++ b/flink-dist/src/main/flink-bin/bin/flink-console.sh
@@ -19,7 +19,7 @@
# Start a Flink service as a console application. Must be stopped with Ctrl-C
# or with SIGTERM by kill or the controlling process.
-USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|kubernetes-session|kubernetes-application|kubernetes-taskmanager|sqlgateway) [args]"
+USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|kubernetes-session|kubernetes-application|kubernetes-taskmanager|sql-gateway) [args]"
SERVICE=$1
ARGS=("${@:2}") # get remaining arguments as array
@@ -62,7 +62,7 @@ case $SERVICE in
CLASS_TO_RUN=org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner
;;
- (sqlgateway)
+ (sql-gateway)
CLASS_TO_RUN=org.apache.flink.table.gateway.SqlGateway
SQL_GATEWAY_CLASSPATH=`findSqlGatewayJar`
;;
diff --git a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
index 246b540a972..d6a27416594 100644
--- a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
+++ b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
@@ -18,7 +18,7 @@
################################################################################
# Start/stop a Flink daemon.
-USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|sqlgateway) [args]"
+USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|sql-gateway) [args]"
STARTSTOP=$1
DAEMON=$2
@@ -50,7 +50,7 @@ case $DAEMON in
CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
;;
- (sqlgateway)
+ (sql-gateway)
CLASS_TO_RUN=org.apache.flink.table.gateway.SqlGateway
SQL_GATEWAY_CLASSPATH=`findSqlGatewayJar`
;;
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
index 4d88ccca7fd..eee3c478e36 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
@@ -59,6 +59,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
+import static org.apache.flink.tests.util.TestUtils.readCsvResultFiles;
import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.junit.Assert.assertThat;
@@ -174,7 +175,7 @@ public class SQLClientKafkaITCase extends TestLogger {
}
private void executeSqlStatements(ClusterController clusterController, List<String> sqlLines)
- throws IOException {
+ throws Exception {
LOG.info("Executing Kafka {} end-to-end SQL statements.", kafkaSQLVersion);
clusterController.submitSQLJob(
new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
@@ -234,17 +235,4 @@ public class SQLClientKafkaITCase extends TestLogger {
}
Assert.assertTrue("Did not get expected results before timeout.", success);
}
-
- private static List<String> readCsvResultFiles(Path path) throws IOException {
- File filePath = path.toFile();
- // list all the non-hidden files
- File[] csvFiles = filePath.listFiles((dir, name) -> !name.startsWith("."));
- List<String> result = new ArrayList<>();
- if (csvFiles != null) {
- for (File file : csvFiles) {
- result.addAll(Files.readAllLines(file.toPath()));
- }
- }
- return result;
- }
}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/TestUtils.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/TestUtils.java
index 4fe8c7a5b66..a76f2b8e420 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/TestUtils.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/TestUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.tests.util;
import org.apache.flink.test.parameters.ParameterProperty;
+import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;
@@ -31,6 +32,7 @@ import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.regex.Pattern;
@@ -132,4 +134,18 @@ public enum TestUtils {
return destination;
}
+
+ /** Read the all files with the specified path. */
+ public static List<String> readCsvResultFiles(Path path) throws IOException {
+ File filePath = path.toFile();
+ // list all the non-hidden files
+ File[] csvFiles = filePath.listFiles((dir, name) -> !name.startsWith("."));
+ List<String> result = new ArrayList<>();
+ if (csvFiles != null) {
+ for (File file : csvFiles) {
+ result.addAll(Files.readAllLines(file.toPath()));
+ }
+ }
+ return result;
+ }
}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java
index f732e07ce37..3d7c7219b53 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java
@@ -45,5 +45,5 @@ public interface ClusterController extends AutoCloseableAsync {
* @param timeout the maximum time to wait.
* @throws IOException if any IO error happen.
*/
- void submitSQLJob(SQLJobSubmission job, Duration timeout) throws IOException;
+ void submitSQLJob(SQLJobSubmission job, Duration timeout) throws Exception;
}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
index c699adaeb8f..86efed246f1 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
@@ -28,6 +28,7 @@ import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.tests.util.AutoClosableProcess;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.function.FutureTaskWithException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -45,14 +46,19 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -71,6 +77,7 @@ final class FlinkDistribution {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final Pattern ROOT_LOGGER_PATTERN = Pattern.compile("(rootLogger.level =).*");
+ private static final String HIVE_DRIVER = "org.apache.hive.jdbc.HiveDriver";
private final Path opt;
private final Path lib;
@@ -106,13 +113,15 @@ final class FlinkDistribution {
bin.resolve("taskmanager.sh").toAbsolutePath().toString(), "start");
}
- public void startSQLGateway(String arg) throws IOException {
+ public void startSqlGateway() throws IOException {
LOG.info("Starting Flink SQL Gateway.");
- AutoClosableProcess.runBlocking(
- bin.resolve("sql-gateway.sh").toAbsolutePath().toString(), "start", arg);
+ AutoClosableProcess.create(
+ bin.resolve("sql-gateway.sh").toAbsolutePath().toString(), "start")
+ .setStdoutProcessor(LOG::info)
+ .runBlocking();
}
- public void stopSQLGateway() throws IOException {
+ public void stopSqlGateway() throws IOException {
LOG.info("Stopping Flink SQL Gateway.");
AutoClosableProcess.runBlocking(
bin.resolve("sql-gateway.sh").toAbsolutePath().toString(), "stop");
@@ -221,18 +230,56 @@ final class FlinkDistribution {
}
}
- public void submitSQLJob(SQLJobSubmission job, Duration timeout) throws IOException {
+ public void submitSQLJob(SQLJobSubmission job, Duration timeout) throws Exception {
final List<String> commands = new ArrayList<>();
- commands.add(bin.resolve("sql-client.sh").toAbsolutePath().toString());
- for (String jar : job.getJars()) {
- commands.add("--jar");
- commands.add(jar);
- }
- AutoClosableProcess.create(commands.toArray(new String[0]))
- .setStdInputs(job.getSqlLines().toArray(new String[0]))
- .setStdoutProcessor(LOG::info) // logging the SQL statements and error message
- .runBlocking(timeout);
+ if (job.getClientMode() == SQLJobSubmission.ClientMode.SQL_CLIENT) {
+ commands.add(bin.resolve("sql-client.sh").toAbsolutePath().toString());
+ for (String jar : job.getJars()) {
+ commands.add("--jar");
+ commands.add(jar);
+ }
+
+ AutoClosableProcess.create(commands.toArray(new String[0]))
+ .setStdInputs(job.getSqlLines().toArray(new String[0]))
+ .setStdoutProcessor(LOG::info) // logging the SQL statements and error message
+ .runBlocking(timeout);
+ } else if (job.getClientMode() == SQLJobSubmission.ClientMode.HIVE_JDBC) {
+ FutureTaskWithException<Void> future =
+ new FutureTaskWithException<>(
+ () -> {
+ // register HiveDriver to the DriverManager
+ Class.forName(HIVE_DRIVER);
+ Map<String, String> configMap =
+ GlobalConfiguration.loadConfiguration(
+ conf.toAbsolutePath().toString())
+ .toMap();
+ String host =
+ configMap.getOrDefault(
+ "sql-gateway.endpoint.hiveserver2.host",
+ InetAddress.getByName("localhost")
+ .getHostAddress());
+ String port =
+ configMap.getOrDefault(
+ "sql-gateway.endpoint.hiveserver2.thrift.port",
+ "10000");
+ try (Connection connection =
+ DriverManager.getConnection(
+ String.format(
+ "jdbc:hive2://%s:%s/default;auth=noSasl;",
+ host, port));
+ Statement statement = connection.createStatement()) {
+ for (String jar : job.getJars()) {
+ statement.execute(String.format("ADD JAR '%s'", jar));
+ }
+ for (String sql : job.getSqlLines()) {
+ statement.execute(sql);
+ }
+ }
+ });
+ new Thread(future).start();
+ future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ }
}
public void performJarAddition(JarAddition addition) throws IOException {
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java
index 4bbf212235b..397d7428823 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java
@@ -46,6 +46,15 @@ public interface FlinkResource extends ExternalResource {
*/
ClusterController startCluster(int numTaskManagers) throws IOException;
+ /**
+ * Starts a sqlserver and returns the {@link GatewayController} which can be used to shut down
+ * the process.
+ *
+ * @return controller for interacting with the cluster
+ * @throws IOException
+ */
+ GatewayController startSqlGateway() throws IOException;
+
/**
* Searches the logs of all processes for the given pattern, and applies the given processor for
* every line for which {@link Matcher#matches()} returned true.
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/GatewayController.java
similarity index 72%
copy from flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java
copy to flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/GatewayController.java
index f732e07ce37..2b49face588 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/GatewayController.java
@@ -18,25 +18,14 @@
package org.apache.flink.tests.util.flink;
-import org.apache.flink.test.util.JobSubmission;
import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.util.AutoCloseableAsync;
import java.io.IOException;
import java.time.Duration;
-/** Controller for interacting with a cluster. */
-public interface ClusterController extends AutoCloseableAsync {
-
- /**
- * Submits the given job to the cluster.
- *
- * @param job job to submit
- * @param timeout the maximum time to wait.
- * @return JobController for the submitted job
- * @throws IOException
- */
- JobController submitJob(JobSubmission job, Duration timeout) throws IOException;
+/** Controller for interacting with a SqlGateway. */
+public interface GatewayController extends AutoCloseableAsync {
/**
* Submits the given SQL job to the cluster.
@@ -45,5 +34,5 @@ public interface ClusterController extends AutoCloseableAsync {
* @param timeout the maximum time to wait.
* @throws IOException if any IO error happen.
*/
- void submitSQLJob(SQLJobSubmission job, Duration timeout) throws IOException;
+ void submitSQLJob(SQLJobSubmission job, Duration timeout) throws Exception;
}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
index dc767d382e0..7b5658867f4 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
@@ -113,7 +113,7 @@ public class LocalStandaloneFlinkResource implements FlinkResource {
private void shutdownCluster() {
try {
distribution.stopFlinkCluster();
- distribution.stopSQLGateway();
+ distribution.stopSqlGateway();
} catch (IOException e) {
LOG.warn("Error while shutting down Flink cluster.", e);
}
@@ -184,8 +184,11 @@ public class LocalStandaloneFlinkResource implements FlinkResource {
throw new RuntimeException("Cluster did not start in expected time-frame.");
}
- public void startSQLGateway(String arg) throws IOException {
- distribution.startSQLGateway(arg);
+ @Override
+ public GatewayController startSqlGateway() throws IOException {
+ distribution.startSqlGateway();
+
+ return new GatewayClusterControllerImpl(distribution);
}
@Override
@@ -194,6 +197,30 @@ public class LocalStandaloneFlinkResource implements FlinkResource {
return distribution.searchAllLogs(pattern, matchProcessor);
}
+ private static class GatewayClusterControllerImpl implements GatewayController {
+
+ private final FlinkDistribution distribution;
+
+ public GatewayClusterControllerImpl(FlinkDistribution distribution) {
+ this.distribution = distribution;
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ try {
+ distribution.stopSqlGateway();
+ return CompletableFuture.completedFuture(null);
+ } catch (IOException e) {
+ return FutureUtils.completedExceptionally(e);
+ }
+ }
+
+ @Override
+ public void submitSQLJob(SQLJobSubmission job, Duration timeout) throws Exception {
+ distribution.submitSQLJob(job, timeout);
+ }
+ }
+
private static class StandaloneClusterController implements ClusterController {
private final FlinkDistribution distribution;
@@ -210,7 +237,7 @@ public class LocalStandaloneFlinkResource implements FlinkResource {
}
@Override
- public void submitSQLJob(SQLJobSubmission job, Duration timeout) throws IOException {
+ public void submitSQLJob(SQLJobSubmission job, Duration timeout) throws Exception {
distribution.submitSQLJob(job, timeout);
}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java
index 165d0f7d832..0f15579d156 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java
@@ -228,7 +228,7 @@ public class SQLClientHBaseITCase extends TestLogger {
}
private void executeSqlStatements(ClusterController clusterController, List<String> sqlLines)
- throws IOException {
+ throws Exception {
LOG.info("Executing SQL: HBase source table -> HBase sink table");
clusterController.submitSQLJob(
new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java
index 79e6c59db08..683e72cb155 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java
@@ -131,7 +131,7 @@ public class PlannerScalaFreeITCase extends TestLogger {
}
private void executeSqlStatements(ClusterController clusterController, List<String> sqlLines)
- throws IOException {
+ throws Exception {
LOG.info("Executing end-to-end SQL statements {}.", sqlLines);
clusterController.submitSQLJob(
new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml b/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml
index 3b8cc597bb2..9c30c64b880 100644
--- a/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml
+++ b/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml
@@ -17,68 +17,167 @@ specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>flink-end-to-end-tests</artifactId>
- <groupId>org.apache.flink</groupId>
- <version>1.16-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>flink-end-to-end-tests</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.16-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
- <artifactId>flink-sql-gateway-test</artifactId>
- <name>Flink : E2E Tests : SQL Gateway</name>
- <packaging>jar</packaging>
+ <artifactId>flink-sql-gateway-test</artifactId>
+ <name>Flink : E2E Tests : SQL Gateway</name>
+ <packaging>jar</packaging>
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-end-to-end-tests-common</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-sql-connector-hive-${hive.version}_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-jdbc</artifactId>
- <version>${hive.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils-junit</artifactId>
- </dependency>
- </dependencies>
+ <properties>
+ <!-- The test container uses hive-2.1.0 -->
+ <hive.version>2.3.9</hive.version>
+ <flink.hadoop.version>2.8.5</flink.hadoop.version>
+ </properties>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <executions>
- <execution>
- <id>copy</id>
- <phase>pre-integration-test</phase>
- <goals>
- <goal>copy</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <artifactItems>
- <artifactItem>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-sql-connector-hive-2.3.9_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <destFileName>flink-sql-connector-hive-2.3.9_${scala.binary.version}-${project.version}.jar</destFileName>
- <type>jar</type>
- <outputDirectory>${project.build.directory}/dependencies</outputDirectory>
- </artifactItem>
- </artifactItems>
- </configuration>
- </plugin>
- </plugins>
- </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-end-to-end-tests-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-sql-connector-hive-${hive.version}_${scala.binary.version}
+ </artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <!-- It contains jackson-annotations that is conflicts with TestContainer -->
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-jdbc</artifactId>
+ <version>${hive.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils-junit</artifactId>
+ </dependency>
+
+ <!-- hadoop dependencies for end-to-end test -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${flink.hadoop.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <!-- This dependency is no longer shipped with the JDK since Java 9.-->
+ <groupId>jdk.tools</groupId>
+ <artifactId>jdk.tools</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>dependency-convergence</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy</id>
+ <phase>pre-integration-test</phase>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>
+ flink-sql-connector-hive-${hive.version}_${scala.binary.version}
+ </artifactId>
+ <version>${project.version}</version>
+ <destFileName>
+ flink-sql-connector-hive-${hive.version}_${scala.binary.version}-${project.version}.jar
+ </destFileName>
+ <type>jar</type>
+ <outputDirectory>${project.build.directory}/dependencies
+ </outputDirectory>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ <execution>
+ <id>store-classpath-in-target-for-tests</id>
+ <phase>package</phase>
+ <goals>
+ <goal>build-classpath</goal>
+ </goals>
+ <configuration>
+ <outputFile>${project.build.directory}/hadoop.classpath</outputFile>
+ <excludeGroupIds>org.apache.flink</excludeGroupIds>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>hive3</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
</project>
diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SQLGatewayITCase.java b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SQLGatewayITCase.java
deleted file mode 100644
index 5b037b1ccae..00000000000
--- a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SQLGatewayITCase.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.gateway;
-
-import org.apache.flink.tests.util.TestUtils;
-import org.apache.flink.tests.util.flink.ClusterController;
-import org.apache.flink.tests.util.flink.FlinkResource;
-import org.apache.flink.tests.util.flink.FlinkResourceSetup;
-import org.apache.flink.tests.util.flink.JarLocation;
-import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResource;
-import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Rule;
-import org.junit.Test;
-
-import java.nio.file.Path;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-public class SQLGatewayITCase extends TestLogger {
-
- private static String JDBC_URL = "jdbc:hive2://localhost:8084/default;auth=noSasl";
- private static String DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver";
-
- private static final Path HIVE_SQL_CONNECOTR_JAR =
- TestUtils.getResource(".*dependencies/flink-sql-connector-hive-.*.jar");
-
- @Rule
- public final FlinkResource flink =
- new LocalStandaloneFlinkResourceFactory()
- .create(
- FlinkResourceSetup.builder()
- .addJar(HIVE_SQL_CONNECOTR_JAR, JarLocation.LIB)
- .build());
-
- @Test
- public void testGateway() throws Exception {
- try (ClusterController clusterController = flink.startCluster(1)) {
- ((LocalStandaloneFlinkResource) flink)
- .startSQLGateway("-Dsql-gateway.endpoint.type=hiveserver2");
- Thread.sleep(2000);
- Class.forName(DRIVER_NAME);
- try {
- DriverManager.getConnection(JDBC_URL);
- } catch (SQLException e) {
- assertThat(e.getMessage())
- .contains(
- "Embedded metastore is not allowed. Make sure you have set a valid value for hive.metastore.uris");
- }
- }
- }
-}
diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java
new file mode 100644
index 00000000000..b7a4e903ffa
--- /dev/null
+++ b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.endpoint.hive.HiveServer2Endpoint;
+import org.apache.flink.table.gateway.containers.HiveContainer;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.GatewayController;
+import org.apache.flink.tests.util.flink.JarLocation;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.catalog.hive.HiveCatalog.isEmbeddedMetastore;
+import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.CATALOG_HIVE_CONF_DIR;
+import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_PORT;
+import static org.apache.flink.tests.util.TestUtils.readCsvResultFiles;
+import static org.junit.Assert.assertEquals;
+
+/** E2E Tests for {@code SqlGateway} with {@link HiveServer2Endpoint}. */
+public class SqlGatewayE2ECase extends TestLogger {
+
+ private static final Path HIVE_SQL_CONNECTOR_JAR =
+ TestUtils.getResource(".*dependencies/flink-sql-connector-hive-.*.jar");
+ private static final Path HADOOP_CLASS_PATH = TestUtils.getResource(".*hadoop.classpath");
+ private static final String GATEWAY_E2E_SQL = "gateway_e2e.sql";
+ private static final Configuration ENDPOINT_CONFIG = new Configuration();
+ private static final String RESULT_KEY = "$RESULT";
+
+ @ClassRule public static final TemporaryFolder FOLDER = new TemporaryFolder();
+ @ClassRule public static final HiveContainer HIVE_CONTAINER = new HiveContainer();
+ @Rule public final FlinkResource flinkResource = buildFlinkResource();
+
+ private static NetUtils.Port port;
+
+ @BeforeClass
+ public static void beforeClass() {
+ ENDPOINT_CONFIG.setString(
+ getPrefixedConfigOptionName(CATALOG_HIVE_CONF_DIR), createHiveConf().getParent());
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ port.close();
+ }
+
+ @Test
+ public void testExecuteStatement() throws Exception {
+ URL url = SqlGatewayE2ECase.class.getClassLoader().getResource(GATEWAY_E2E_SQL);
+ if (url == null) {
+ throw new FileNotFoundException(GATEWAY_E2E_SQL);
+ }
+ File result = FOLDER.newFolder("csv");
+ String sql =
+ Files.readAllLines(new File(url.getFile()).toPath()).stream()
+ .filter(line -> !line.trim().startsWith("--"))
+ .collect(Collectors.joining());
+ List<String> lines =
+ Arrays.stream(sql.split(";"))
+ .map(line -> line.replace(RESULT_KEY, result.getAbsolutePath()))
+ .collect(Collectors.toList());
+
+ try (GatewayController gateway = flinkResource.startSqlGateway();
+ ClusterController ignore = flinkResource.startCluster(1)) {
+ gateway.submitSQLJob(
+ new SQLJobSubmission.SQLJobSubmissionBuilder(lines)
+ .setClientMode(SQLJobSubmission.ClientMode.HIVE_JDBC)
+ .build(),
+ Duration.ofSeconds(60));
+ }
+ assertEquals(Collections.singletonList("1"), readCsvResultFiles(result.toPath()));
+ }
+
+ private static File createHiveConf() {
+ HiveConf hiveConf = new HiveConf();
+ try (InputStream inputStream =
+ new FileInputStream(
+ new File(
+ Objects.requireNonNull(
+ SqlGatewayE2ECase.class
+ .getClassLoader()
+ .getResource(HiveCatalog.HIVE_SITE_FILE))
+ .toURI()))) {
+ hiveConf.addResource(inputStream, HiveCatalog.HIVE_SITE_FILE);
+ // trigger a read from the conf so that the input stream is read
+ isEmbeddedMetastore(hiveConf);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to load hive-site.xml from specified path", e);
+ }
+ hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, HIVE_CONTAINER.getHiveMetastoreURI());
+ try {
+ File site = FOLDER.newFile(HiveCatalog.HIVE_SITE_FILE);
+ try (OutputStream out = new FileOutputStream(site)) {
+ hiveConf.writeXml(out);
+ }
+ return site;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create hive conf.", e);
+ }
+ }
+
+ /**
+ * Build required environment. It prepares all hadoop jars to mock HADOOP_CLASSPATH, use
+ * hadoop.classpath which contains all hadoop jars. It also moves planner to the lib and remove
+ * the planner load to make the Hive sql connector works.
+ */
+ private static FlinkResource buildFlinkResource() {
+ // add hive jar and planner jar
+ FlinkResourceSetup.FlinkResourceSetupBuilder builder =
+ FlinkResourceSetup.builder()
+ .addJar(HIVE_SQL_CONNECTOR_JAR, JarLocation.LIB)
+ .moveJar("flink-table-planner", JarLocation.OPT, JarLocation.LIB)
+ .moveJar("flink-table-planner-loader", JarLocation.LIB, JarLocation.OPT);
+ // add hadoop jars
+ File hadoopClasspathFile = new File(HADOOP_CLASS_PATH.toAbsolutePath().toString());
+ if (!hadoopClasspathFile.exists()) {
+ throw new RuntimeException(
+ "File that contains hadoop classpath "
+ + HADOOP_CLASS_PATH
+ + " does not exist.");
+ }
+ try {
+ String classPathContent = FileUtils.readFileUtf8(hadoopClasspathFile);
+ Arrays.stream(classPathContent.split(":"))
+ .map(Paths::get)
+ .forEach(jar -> builder.addJar(jar, JarLocation.LIB));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to build the FlinkResource.", e);
+ }
+ // add hive server2 endpoint related configs
+ port = NetUtils.getAvailablePort();
+ Map<String, String> endpointConfig = new HashMap<>();
+ endpointConfig.put("sql-gateway.endpoint.type", "hiveserver2");
+ endpointConfig.put(
+ getPrefixedConfigOptionName(THRIFT_PORT), String.valueOf(port.getPort()));
+ ENDPOINT_CONFIG.addAll(Configuration.fromMap(endpointConfig));
+ builder.addConfiguration(ENDPOINT_CONFIG);
+
+ return new LocalStandaloneFlinkResourceFactory().create(builder.build());
+ }
+
+ private static String getPrefixedConfigOptionName(ConfigOption<?> option) {
+ String prefix = "sql-gateway.endpoint.hiveserver2.";
+ return prefix + option.key();
+ }
+}
diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/containers/HiveContainer.java b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/containers/HiveContainer.java
new file mode 100644
index 00000000000..35dc6325779
--- /dev/null
+++ b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/containers/HiveContainer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.containers;
+
+import org.apache.flink.util.DockerImageVersions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerImageName;
+
+/** Test Container for hive. */
+public class HiveContainer extends GenericContainer<HiveContainer> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HiveContainer.class);
+
+ public static final String HOST_NAME = "hadoop-master";
+ public static final int HIVE_METASTORE_PORT = 9083;
+
+ public HiveContainer() {
+ super(DockerImageName.parse(DockerImageVersions.HIVE2));
+ withExtraHost(HOST_NAME, "127.0.0.1");
+ addExposedPort(HIVE_METASTORE_PORT);
+ }
+
+ @Override
+ protected void doStart() {
+ super.doStart();
+ if (LOG.isInfoEnabled()) {
+ followOutput(new Slf4jLogConsumer(LOG));
+ }
+ }
+
+ public String getHiveMetastoreURI() {
+ return String.format("thrift://%s:%s", getHost(), getMappedPort(HIVE_METASTORE_PORT));
+ }
+}
diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/resources/gateway_e2e.sql b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/resources/gateway_e2e.sql
new file mode 100644
index 00000000000..88fd47f445e
--- /dev/null
+++ b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/resources/gateway_e2e.sql
@@ -0,0 +1,30 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+SET table.sql-dialect=default;
+
+CREATE TABLE CsvTable (
+ val INT
+) WITH (
+ 'connector' = 'filesystem',
+ 'path' = '$RESULT',
+ 'sink.rolling-policy.rollover-interval' = '2s',
+ 'sink.rolling-policy.check-interval' = '2s',
+ 'format' = 'csv',
+ 'csv.disable-quote-character' = 'true'
+);
+
+INSERT INTO CsvTable SELECT 1;
diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/resources/hive-site.xml b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/resources/hive-site.xml
new file mode 100644
index 00000000000..9f74734e801
--- /dev/null
+++ b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/resources/hive-site.xml
@@ -0,0 +1,86 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<configuration>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+ <property>
+ <name>hive.metastore.uris</name>
+ <value>thrift://localhost:9083</value>
+ </property>
+
+ <property>
+ <name>javax.jdo.option.ConnectionURL</name>
+ <value>jdbc:mysql://localhost:3306/metastore?useSSL=false</value>
+ </property>
+
+ <property>
+ <name>javax.jdo.option.ConnectionDriverName</name>
+ <value>com.mysql.cj.jdbc.Driver</value>
+ </property>
+
+ <property>
+ <name>javax.jdo.option.ConnectionUserName</name>
+ <value>root</value>
+ </property>
+
+ <property>
+ <name>javax.jdo.option.ConnectionPassword</name>
+ <value>root</value>
+ </property>
+
+ <property>
+ <name>hive.metastore.connect.retries</name>
+ <value>15</value>
+ </property>
+
+ <property>
+ <name>hive.metastore.disallow.incompatible.col.type.changes</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <!-- https://community.hortonworks.com/content/supportkb/247055/errorjavalangunsupportedoperationexception-storage.html -->
+ <name>metastore.storage.schema.reader.impl</name>
+ <value>org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader</value>
+ </property>
+
+ <property>
+ <name>hive.support.concurrency</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>hive.txn.manager</name>
+ <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
+ </property>
+
+ <property>
+ <name>hive.compactor.initiator.on</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>hive.compactor.worker.threads</name>
+ <value>1</value>
+ </property>
+
+ <property>
+ <name>hive.users.in.admin.role</name>
+ <value>hdfs,hive</value>
+ </property>
+
+</configuration>
diff --git a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/resources/log4j2-test.properties
similarity index 56%
copy from tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist
copy to flink-end-to-end-tests/flink-sql-gateway-test/src/test/resources/log4j2-test.properties
index a1ac43e7cad..835c2ec9a3d 100644
--- a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist
+++ b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/resources/log4j2-test.properties
@@ -16,27 +16,13 @@
# limitations under the License.
################################################################################
-# These modules are not deployed to maven central, despite their use of the shade plugin.
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
-flink-examples-streaming-gcp-pubsub
-flink-yarn-tests
-flink-docs
-flink-datastream-allround-test
-flink-queryable-state-test
-flink-confluent-schema-registry
-flink-stream-stateful-job-upgrade-test
-flink-elasticsearch7-test
-flink-stream-state-ttl-test
-flink-state-evolution-test
-flink-elasticsearch6-test
-flink-rocksdb-state-memory-control-test
-flink-python-test
-flink-streaming-kinesis-test
-flink-tpch-test
-flink-streaming-kafka-test-base
-flink-heavy-deployment-stress-test
-flink-high-parallelism-iterations-test
-flink-end-to-end-tests-common-kafka
-flink-end-to-end-tests-pulsar
-flink-end-to-end-tests-elasticsearch7
-flink-end-to-end-tests-elasticsearch6
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 31bd7facf9a..bc477e349cf 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -55,6 +55,7 @@ under the License.
<module>flink-confluent-schema-registry</module>
<module>flink-stream-state-ttl-test</module>
<module>flink-sql-client-test</module>
+ <module>flink-sql-gateway-test</module>
<module>flink-file-sink-test</module>
<module>flink-state-evolution-test</module>
<module>flink-rocksdb-state-memory-control-test</module>
@@ -83,7 +84,6 @@ under the License.
<module>flink-end-to-end-tests-elasticsearch7</module>
<module>flink-end-to-end-tests-common-elasticsearch</module>
<module>flink-end-to-end-tests-sql</module>
- <module>flink-sql-gateway-test</module>
</modules>
<dependencyManagement>
diff --git a/flink-table/flink-sql-gateway/bin/sql-gateway.sh b/flink-table/flink-sql-gateway/bin/sql-gateway.sh
index f925a25adcf..5b300fb1c48 100644
--- a/flink-table/flink-sql-gateway/bin/sql-gateway.sh
+++ b/flink-table/flink-sql-gateway/bin/sql-gateway.sh
@@ -17,34 +17,16 @@
# limitations under the License.
#
-# Start/stop a Flink SQL Gateway.
-
function usage() {
- echo "Usage: bin/sql-gateway.sh command"
+ echo "Usage: sql-gateway.sh [start|start-foreground|stop|stop-all] [args]"
echo " commands:"
- echo " start - Run a SQL Gateway as a daemon"
- echo " start-foreground - Run a SQL Gateway as a console application"
- echo " stop - Stop the SQL Gateway daemon"
- echo " stop-all - Stop all the SQL Gateway daemons"
- echo " -h | --help - Show this help message"
+ echo " start - Run a SQL Gateway as a daemon"
+ echo " start-foreground - Run a SQL Gateway as a console application"
+ echo " stop - Stop the SQL Gateway daemon"
+ echo " stop-all - Stop all the SQL Gateway daemons"
+ echo " -h | --help - Show this help message"
}
-if [[ "$*" = *--help ]] || [[ "$*" = *-h ]]; then
- usage
- exit 0
-fi
-
-STARTSTOP=$1
-
-if [ -z "$STARTSTOP" ]; then
- STARTSTOP="start"
-fi
-
-if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
- usage
- exit 1
-fi
-
################################################################################
# Adopted from "flink" bash script
################################################################################
@@ -75,7 +57,35 @@ if [ "$FLINK_IDENT_STRING" = "" ]; then
FLINK_IDENT_STRING="$USER"
fi
-ENTRYPOINT=sqlgateway
+################################################################################
+# SQL gateway specific logic
+################################################################################
+
+ENTRYPOINT=sql-gateway
+
+if [[ "$1" = *--help ]] || [[ "$1" = *-h ]]; then
+ usage
+ exit 0
+fi
+
+STARTSTOP=$1
+
+if [ -z "$STARTSTOP" ]; then
+ STARTSTOP="start"
+fi
+
+if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
+ usage
+ exit 1
+fi
+
+# ./sql-gateway.sh start --help, print the message to the console
+if [[ "$STARTSTOP" = start* ]] && ( [[ "$*" = *--help* ]] || [[ "$*" = *-h* ]] ); then
+ FLINK_TM_CLASSPATH=`constructFlinkClassPath`
+ SQL_GATEWAY_CLASSPATH=`findSqlGatewayJar`
+ "$JAVA_RUN" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$SQL_GATEWAY_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.table.gateway.SqlGateway "${@:2}"
+ exit 0
+fi
if [[ $STARTSTOP == "start-foreground" ]]; then
exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${@:2}"
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/SqlGateway.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/SqlGateway.java
index f2b2fa384f6..9f6f409063b 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/SqlGateway.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/SqlGateway.java
@@ -67,18 +67,16 @@ public class SqlGateway {
sessionManager.start();
SqlGatewayService sqlGatewayService = new SqlGatewayServiceImpl(sessionManager);
- endpoints.addAll(
- SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(
- sqlGatewayService, context.getFlinkConfig()));
-
- for (SqlGatewayEndpoint endpoint : endpoints) {
- try {
+ try {
+ endpoints.addAll(
+ SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(
+ sqlGatewayService, context.getFlinkConfig()));
+ for (SqlGatewayEndpoint endpoint : endpoints) {
endpoint.start();
- } catch (Throwable t) {
- LOG.error("Failed to start the endpoint.", t);
- stop();
- throw new SqlGatewayException("Failed to start the endpoint.", t);
}
+ } catch (Throwable t) {
+ LOG.error("Failed to start the endpoints.", t);
+ throw new SqlGatewayException("Failed to start the endpoints.", t);
}
}
@@ -97,11 +95,6 @@ public class SqlGateway {
}
public static void main(String[] args) {
- // startup checks and logging
- EnvironmentInformation.logEnvironmentInfo(LOG, "SqlGateway", args);
- SignalHandler.register(LOG);
- JvmShutdownSafeguard.installAsShutdownHook(LOG);
-
startSqlGateway(System.out, args);
}
@@ -114,6 +107,11 @@ public class SqlGateway {
return;
}
+ // startup checks and logging
+ EnvironmentInformation.logEnvironmentInfo(LOG, "SqlGateway", args);
+ SignalHandler.register(LOG);
+ JvmShutdownSafeguard.installAsShutdownHook(LOG);
+
SqlGateway gateway = new SqlGateway(cliOptions.getDynamicConfigs());
try {
Runtime.getRuntime().addShutdownHook(new ShutdownThread(gateway));
@@ -128,11 +126,19 @@ public class SqlGateway {
// make space in terminal
stream.println();
stream.println();
- LOG.error(
- "SqlGateway must stop. Unexpected exception. This is a bug. Please consider filing an issue.",
- t);
- throw new SqlGatewayException(
- "Unexpected exception. This is a bug. Please consider filing an issue.", t);
+
+ if (t instanceof SqlGatewayException) {
+ // Exception that the gateway can not handle.
+ throw (SqlGatewayException) t;
+ } else {
+ LOG.error(
+ "SqlGateway must stop. Unexpected exception. This is a bug. Please consider filing an issue.",
+ t);
+ throw new SqlGatewayException(
+ "Unexpected exception. This is a bug. Please consider filing an issue.", t);
+ }
+ } finally {
+ gateway.stop();
}
}
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/SqlGatewayTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/SqlGatewayTest.java
index 10095dd3d76..57d6bcac381 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/SqlGatewayTest.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/SqlGatewayTest.java
@@ -38,6 +38,7 @@ import java.util.UUID;
import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
/** Tests for the {@link SqlGateway}. */
public class SqlGatewayTest {
@@ -95,26 +96,36 @@ public class SqlGatewayTest {
"-Dsql-gateway.endpoint.mocked.host=localhost",
"-Dsql-gateway.endpoint.mocked.port=9999"
};
- PrintStream stream = new PrintStream(output);
- Thread thread =
- new ExecutorThreadFactory(
- "SqlGateway-thread-pool",
- (t, exception) -> exception.printStackTrace(stream))
- .newThread(() -> SqlGateway.startSqlGateway(stream, args));
- thread.start();
-
- CommonTestUtils.waitUtil(
- () -> MockedSqlGatewayEndpoint.isRunning(id),
- Duration.ofSeconds(10),
- "Failed to get the endpoint starts.");
-
- thread.interrupt();
- CommonTestUtils.waitUtil(
- () -> !thread.isAlive(),
- Duration.ofSeconds(10),
- "Failed to get the endpoint starts.");
- assertThat(output.toString())
- .doesNotContain(
- "Unexpected exception. This is a bug. Please consider filing an issue.");
+ try (PrintStream stream = new PrintStream(output)) {
+ Thread thread =
+ new ExecutorThreadFactory(
+ "SqlGateway-thread-pool",
+ (t, exception) -> exception.printStackTrace(stream))
+ .newThread(() -> SqlGateway.startSqlGateway(stream, args));
+ thread.start();
+
+ CommonTestUtils.waitUtil(
+ () -> MockedSqlGatewayEndpoint.isRunning(id),
+ Duration.ofSeconds(10),
+ "Failed to get the endpoint starts.");
+
+ thread.interrupt();
+ CommonTestUtils.waitUtil(
+ () -> !thread.isAlive(),
+ Duration.ofSeconds(10),
+ "Failed to get the endpoint starts.");
+ assertThat(output.toString())
+ .doesNotContain(
+ "Unexpected exception. This is a bug. Please consider filing an issue.");
+ }
+ }
+
+ @Test
+ public void testFailedToStartSqlGateway() {
+ try (PrintStream stream = new PrintStream(output)) {
+ assertThatThrownBy(() -> SqlGateway.startSqlGateway(stream, new String[0]))
+ .doesNotHaveToString(
+ "Unexpected exception. This is a bug. Please consider filing an issue.");
+ }
}
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 8d00017f46e..e2ef76b26eb 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -36,6 +36,7 @@ import org.apache.flink.table.planner.calcite._
import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema
import org.apache.flink.table.planner.connectors.DynamicSinkUtils
import org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast
+import org.apache.flink.table.planner.delegation.DialectFactory.DefaultParserContext
import org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl
import org.apache.flink.table.planner.hint.FlinkHints
import org.apache.flink.table.planner.operations.PlannerQueryOperation
@@ -55,7 +56,6 @@ import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
import _root_.scala.collection.JavaConversions._
-import DialectFactory.DefaultParserContext
import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
import org.apache.calcite.plan.{RelTrait, RelTraitDef}
import org.apache.calcite.rel.RelNode
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
index e59e939500a..f0f367f7bf9 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
@@ -54,4 +54,6 @@ public class DockerImageVersions {
public static final String GOOGLE_CLOUD_PUBSUB_EMULATOR =
"gcr.io/google.com/cloudsdktool/cloud-sdk:379.0.0";
+
+ public static final String HIVE2 = "prestodb/hdp2.6-hive:10";
}
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SQLJobSubmission.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SQLJobSubmission.java
index cff93ccaf0d..5ec15f8dfe2 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SQLJobSubmission.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SQLJobSubmission.java
@@ -27,14 +27,20 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/** Programmatic definition of a SQL job-submission. */
public class SQLJobSubmission {
+ private final ClientMode clientMode;
private final List<String> sqlLines;
private final List<String> jars;
- private SQLJobSubmission(List<String> sqlLines, List<String> jars) {
+ private SQLJobSubmission(ClientMode clientMode, List<String> sqlLines, List<String> jars) {
+ this.clientMode = clientMode;
this.sqlLines = checkNotNull(sqlLines);
this.jars = checkNotNull(jars);
}
+ public ClientMode getClientMode() {
+ return clientMode;
+ }
+
public List<String> getJars() {
return this.jars;
}
@@ -45,6 +51,7 @@ public class SQLJobSubmission {
/** Builder for the {@link SQLJobSubmission}. */
public static class SQLJobSubmissionBuilder {
+ private ClientMode clientMode = ClientMode.SQL_CLIENT;
private final List<String> sqlLines;
private final List<String> jars = new ArrayList<>();
@@ -52,6 +59,11 @@ public class SQLJobSubmission {
this.sqlLines = sqlLines;
}
+ public SQLJobSubmissionBuilder setClientMode(ClientMode clientMode) {
+ this.clientMode = clientMode;
+ return this;
+ }
+
public SQLJobSubmissionBuilder addJar(Path jarFile) {
this.jars.add(jarFile.toAbsolutePath().toString());
return this;
@@ -70,7 +82,14 @@ public class SQLJobSubmission {
}
public SQLJobSubmission build() {
- return new SQLJobSubmission(sqlLines, jars);
+ return new SQLJobSubmission(clientMode, sqlLines, jars);
}
}
+
+ /** Use which client to submit job. */
+ public enum ClientMode {
+ SQL_CLIENT,
+
+ HIVE_JDBC
+ }
}
diff --git a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist b/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist
index a1ac43e7cad..7b0be584fee 100644
--- a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist
+++ b/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist
@@ -40,3 +40,4 @@ flink-end-to-end-tests-common-kafka
flink-end-to-end-tests-pulsar
flink-end-to-end-tests-elasticsearch7
flink-end-to-end-tests-elasticsearch6
+flink-sql-gateway-test