You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/08/03 07:08:40 UTC
[flink] 01/02: [FLINK-27770][sql-gateway] Introduce the script to start/stop/stop-all gateway
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit c90d99fe2dd2a78214c1f16f6052ee17e30afd84
Author: Luning Wang <wa...@gmail.com>
AuthorDate: Mon Jul 4 10:05:47 2022 +0800
[FLINK-27770][sql-gateway] Introduce the script to start/stop/stop-all gateway
---
flink-dist/src/main/assemblies/bin.xml | 7 ++
flink-dist/src/main/assemblies/opt.xml | 6 ++
flink-dist/src/main/flink-bin/bin/config.sh | 19 +++++
flink-dist/src/main/flink-bin/bin/flink-console.sh | 9 ++-
flink-dist/src/main/flink-bin/bin/flink-daemon.sh | 9 ++-
.../flink/tests/util/flink/FlinkDistribution.java | 12 ++++
.../util/flink/LocalStandaloneFlinkResource.java | 5 ++
.../flink-sql-gateway-test/pom.xml | 84 ++++++++++++++++++++++
.../flink/table/gateway/SQLGatewayITCase.java | 71 ++++++++++++++++++
flink-end-to-end-tests/pom.xml | 3 +-
flink-table/flink-sql-gateway/bin/sql-gateway.sh | 84 ++++++++++++++++++++++
11 files changed, 304 insertions(+), 5 deletions(-)
diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml
index dcbac513857..52172fe50a7 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -172,6 +172,13 @@ under the License.
<fileMode>0755</fileMode>
</fileSet>
+ <!-- copy SQL gateway -->
+ <fileSet>
+ <directory>../flink-table/flink-sql-gateway/bin/</directory>
+ <outputDirectory>bin</outputDirectory>
+ <fileMode>0755</fileMode>
+ </fileSet>
+
<!-- copy yarn start scripts -->
<fileSet>
<directory>src/main/flink-bin/yarn-bin</directory>
diff --git a/flink-dist/src/main/assemblies/opt.xml b/flink-dist/src/main/assemblies/opt.xml
index c56ac259380..cdb57563ecc 100644
--- a/flink-dist/src/main/assemblies/opt.xml
+++ b/flink-dist/src/main/assemblies/opt.xml
@@ -65,6 +65,12 @@
<destName>flink-sql-client-${project.version}.jar</destName>
<fileMode>0644</fileMode>
</file>
+ <file>
+ <source>../flink-table/flink-sql-gateway/target/flink-sql-gateway-${project.version}.jar</source>
+ <outputDirectory>opt/</outputDirectory>
+ <destName>flink-sql-gateway-${project.version}.jar</destName>
+ <fileMode>0644</fileMode>
+ </file>
<!-- State Processor API -->
<file>
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index e8b1e2a818a..f136f963c9b 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -66,6 +66,25 @@ findFlinkDistJar() {
echo "$FLINK_DIST"
}
+findSqlGatewayJar() {
+ local SQL_GATEWAY
+ SQL_GATEWAY="$(find "$FLINK_OPT_DIR" -name 'flink-sql-gateway*.jar')"
+ local SQL_GATEWAY_COUNT
+ SQL_GATEWAY_COUNT="$(echo "$SQL_GATEWAY" | wc -l)"
+
+ # If flink-sql-gateway*.jar cannot be resolved write error messages to stderr since stdout is stored
+ # as the classpath and exit function with empty classpath to force process failure
+ if [[ "$SQL_GATEWAY" == "" ]]; then
+ (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_OPT_DIR.")
+ exit 1
+ elif [[ "$SQL_GATEWAY_COUNT" -gt 1 ]]; then
+ (>&2 echo "[ERROR] Multiple flink-sql-gateway*.jar found in $FLINK_OPT_DIR. Please resolve.")
+ exit 1
+ fi
+
+ echo "$SQL_GATEWAY"
+}
+
# These are used to mangle paths that are passed to java when using
# cygwin. Cygwin paths are like linux paths, i.e. /path/to/somewhere
# but the windows java version expects them in Windows Format, i.e. C:\bla\blub.
diff --git a/flink-dist/src/main/flink-bin/bin/flink-console.sh b/flink-dist/src/main/flink-bin/bin/flink-console.sh
index 2c84e60cc19..05b9d42632e 100755
--- a/flink-dist/src/main/flink-bin/bin/flink-console.sh
+++ b/flink-dist/src/main/flink-bin/bin/flink-console.sh
@@ -19,7 +19,7 @@
# Start a Flink service as a console application. Must be stopped with Ctrl-C
# or with SIGTERM by kill or the controlling process.
-USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|kubernetes-session|kubernetes-application|kubernetes-taskmanager) [args]"
+USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|kubernetes-session|kubernetes-application|kubernetes-taskmanager|sqlgateway) [args]"
SERVICE=$1
ARGS=("${@:2}") # get remaining arguments as array
@@ -62,6 +62,11 @@ case $SERVICE in
CLASS_TO_RUN=org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner
;;
+ (sqlgateway)
+ CLASS_TO_RUN=org.apache.flink.table.gateway.SqlGateway
+ SQL_GATEWAY_CLASSPATH=`findSqlGatewayJar`
+ ;;
+
(*)
echo "Unknown service '${SERVICE}'. $USAGE."
exit 1
@@ -111,4 +116,4 @@ echo $$ >> "$pid" 2>/dev/null
# Evaluate user options for local variable expansion
FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS})
-exec "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}"
+exec "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$SQL_GATEWAY_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}"
diff --git a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
index c6d54420b1d..246b540a972 100644
--- a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
+++ b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
@@ -18,7 +18,7 @@
################################################################################
# Start/stop a Flink daemon.
-USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]"
+USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|sqlgateway) [args]"
STARTSTOP=$1
DAEMON=$2
@@ -50,6 +50,11 @@ case $DAEMON in
CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
;;
+ (sqlgateway)
+ CLASS_TO_RUN=org.apache.flink.table.gateway.SqlGateway
+ SQL_GATEWAY_CLASSPATH=`findSqlGatewayJar`
+ ;;
+
(*)
echo "Unknown daemon '${DAEMON}'. $USAGE."
exit 1
@@ -131,7 +136,7 @@ case $STARTSTOP in
FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS})
echo "Starting $DAEMON daemon on host $HOSTNAME."
- "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 200<&- 2>&1 < /dev/null &
+ "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$SQL_GATEWAY_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 200<&- 2>&1 < /dev/null &
mypid=$!
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
index 93c34a22993..c699adaeb8f 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
@@ -106,6 +106,18 @@ final class FlinkDistribution {
bin.resolve("taskmanager.sh").toAbsolutePath().toString(), "start");
}
+ public void startSQLGateway(String arg) throws IOException {
+ LOG.info("Starting Flink SQL Gateway.");
+ AutoClosableProcess.runBlocking(
+ bin.resolve("sql-gateway.sh").toAbsolutePath().toString(), "start", arg);
+ }
+
+ public void stopSQLGateway() throws IOException {
+ LOG.info("Stopping Flink SQL Gateway.");
+ AutoClosableProcess.runBlocking(
+ bin.resolve("sql-gateway.sh").toAbsolutePath().toString(), "stop");
+ }
+
public void setRootLogLevel(Level logLevel) throws IOException {
FileUtils.replace(
conf.resolve("log4j.properties"),
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
index a7fc0f22555..dc767d382e0 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
@@ -113,6 +113,7 @@ public class LocalStandaloneFlinkResource implements FlinkResource {
private void shutdownCluster() {
try {
distribution.stopFlinkCluster();
+ distribution.stopSQLGateway();
} catch (IOException e) {
LOG.warn("Error while shutting down Flink cluster.", e);
}
@@ -183,6 +184,10 @@ public class LocalStandaloneFlinkResource implements FlinkResource {
throw new RuntimeException("Cluster did not start in expected time-frame.");
}
+ public void startSQLGateway(String arg) throws IOException {
+ distribution.startSQLGateway(arg);
+ }
+
@Override
public Stream<String> searchAllLogs(Pattern pattern, Function<Matcher, String> matchProcessor)
throws IOException {
diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml b/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml
new file mode 100644
index 00000000000..3b8cc597bb2
--- /dev/null
+++ b/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml
@@ -0,0 +1,84 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>flink-end-to-end-tests</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.16-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-sql-gateway-test</artifactId>
+ <name>Flink : E2E Tests : SQL Gateway</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-end-to-end-tests-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-sql-connector-hive-${hive.version}_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-jdbc</artifactId>
+ <version>${hive.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils-junit</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy</id>
+ <phase>pre-integration-test</phase>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-sql-connector-hive-2.3.9_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <destFileName>flink-sql-connector-hive-2.3.9_${scala.binary.version}-${project.version}.jar</destFileName>
+ <type>jar</type>
+ <outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SQLGatewayITCase.java b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SQLGatewayITCase.java
new file mode 100644
index 00000000000..5b037b1ccae
--- /dev/null
+++ b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SQLGatewayITCase.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway;
+
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.JarLocation;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResource;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.nio.file.Path;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class SQLGatewayITCase extends TestLogger {
+
+ private static String JDBC_URL = "jdbc:hive2://localhost:8084/default;auth=noSasl";
+ private static String DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver";
+
+ private static final Path HIVE_SQL_CONNECOTR_JAR =
+ TestUtils.getResource(".*dependencies/flink-sql-connector-hive-.*.jar");
+
+ @Rule
+ public final FlinkResource flink =
+ new LocalStandaloneFlinkResourceFactory()
+ .create(
+ FlinkResourceSetup.builder()
+ .addJar(HIVE_SQL_CONNECOTR_JAR, JarLocation.LIB)
+ .build());
+
+ @Test
+ public void testGateway() throws Exception {
+ try (ClusterController clusterController = flink.startCluster(1)) {
+ ((LocalStandaloneFlinkResource) flink)
+ .startSQLGateway("-Dsql-gateway.endpoint.type=hiveserver2");
+ Thread.sleep(2000);
+ Class.forName(DRIVER_NAME);
+ try {
+ DriverManager.getConnection(JDBC_URL);
+ } catch (SQLException e) {
+ assertThat(e.getMessage())
+ .contains(
+ "Embedded metastore is not allowed. Make sure you have set a valid value for hive.metastore.uris");
+ }
+ }
+ }
+}
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 5fbcaf181f3..31bd7facf9a 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -83,7 +83,8 @@ under the License.
<module>flink-end-to-end-tests-elasticsearch7</module>
<module>flink-end-to-end-tests-common-elasticsearch</module>
<module>flink-end-to-end-tests-sql</module>
- </modules>
+ <module>flink-sql-gateway-test</module>
+ </modules>
<dependencyManagement>
<dependencies>
diff --git a/flink-table/flink-sql-gateway/bin/sql-gateway.sh b/flink-table/flink-sql-gateway/bin/sql-gateway.sh
new file mode 100644
index 00000000000..f925a25adcf
--- /dev/null
+++ b/flink-table/flink-sql-gateway/bin/sql-gateway.sh
@@ -0,0 +1,84 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Start/stop a Flink SQL Gateway.
+
+function usage() {
+ echo "Usage: bin/sql-gateway.sh command"
+ echo " commands:"
+ echo " start - Run a SQL Gateway as a daemon"
+ echo " start-foreground - Run a SQL Gateway as a console application"
+ echo " stop - Stop the SQL Gateway daemon"
+ echo " stop-all - Stop all the SQL Gateway daemons"
+ echo " -h | --help - Show this help message"
+}
+
+if [[ "$*" = *--help ]] || [[ "$*" = *-h ]]; then
+ usage
+ exit 0
+fi
+
+STARTSTOP=$1
+
+if [ -z "$STARTSTOP" ]; then
+ STARTSTOP="start"
+fi
+
+if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
+ usage
+ exit 1
+fi
+
+################################################################################
+# Adopted from "flink" bash script
+################################################################################
+
+target="$0"
+# For the case, the executable has been directly symlinked, figure out
+# the correct bin path by following its symlink up to an upper bound.
+# Note: we can't use the readlink utility here if we want to be POSIX
+# compatible.
+iteration=0
+while [ -L "$target" ]; do
+ if [ "$iteration" -gt 100 ]; then
+ echo "Cannot resolve path: You have a cyclic symlink in $target."
+ break
+ fi
+ ls=`ls -ld -- "$target"`
+ target=`expr "$ls" : '.* -> \(.*\)$'`
+ iteration=$((iteration + 1))
+done
+
+# Convert relative path to absolute path
+bin=`dirname "$target"`
+
+# get flink config
+. "$bin"/config.sh
+
+if [ "$FLINK_IDENT_STRING" = "" ]; then
+ FLINK_IDENT_STRING="$USER"
+fi
+
+ENTRYPOINT=sqlgateway
+
+if [[ $STARTSTOP == "start-foreground" ]]; then
+ exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${@:2}"
+else
+ "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${@:2}"
+fi