You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/08/03 07:08:40 UTC

[flink] 01/02: [FLINK-27770][sql-gateway] Introduce the script to start/stop/stop-all gateway

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c90d99fe2dd2a78214c1f16f6052ee17e30afd84
Author: Luning Wang <wa...@gmail.com>
AuthorDate: Mon Jul 4 10:05:47 2022 +0800

    [FLINK-27770][sql-gateway] Introduce the script to start/stop/stop-all gateway
---
 flink-dist/src/main/assemblies/bin.xml             |  7 ++
 flink-dist/src/main/assemblies/opt.xml             |  6 ++
 flink-dist/src/main/flink-bin/bin/config.sh        | 19 +++++
 flink-dist/src/main/flink-bin/bin/flink-console.sh |  9 ++-
 flink-dist/src/main/flink-bin/bin/flink-daemon.sh  |  9 ++-
 .../flink/tests/util/flink/FlinkDistribution.java  | 12 ++++
 .../util/flink/LocalStandaloneFlinkResource.java   |  5 ++
 .../flink-sql-gateway-test/pom.xml                 | 84 ++++++++++++++++++++++
 .../flink/table/gateway/SQLGatewayITCase.java      | 71 ++++++++++++++++++
 flink-end-to-end-tests/pom.xml                     |  3 +-
 flink-table/flink-sql-gateway/bin/sql-gateway.sh   | 84 ++++++++++++++++++++++
 11 files changed, 304 insertions(+), 5 deletions(-)

diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml
index dcbac513857..52172fe50a7 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -172,6 +172,13 @@ under the License.
 			<fileMode>0755</fileMode>
 		</fileSet>
 
+		<!-- copy SQL gateway -->
+		<fileSet>
+			<directory>../flink-table/flink-sql-gateway/bin/</directory>
+			<outputDirectory>bin</outputDirectory>
+			<fileMode>0755</fileMode>
+		</fileSet>
+
 		<!-- copy yarn start scripts -->
 		<fileSet>
 			<directory>src/main/flink-bin/yarn-bin</directory>
diff --git a/flink-dist/src/main/assemblies/opt.xml b/flink-dist/src/main/assemblies/opt.xml
index c56ac259380..cdb57563ecc 100644
--- a/flink-dist/src/main/assemblies/opt.xml
+++ b/flink-dist/src/main/assemblies/opt.xml
@@ -65,6 +65,12 @@
 			<destName>flink-sql-client-${project.version}.jar</destName>
 			<fileMode>0644</fileMode>
 		</file>
+		<file>
+			<source>../flink-table/flink-sql-gateway/target/flink-sql-gateway-${project.version}.jar</source>
+			<outputDirectory>opt/</outputDirectory>
+			<destName>flink-sql-gateway-${project.version}.jar</destName>
+			<fileMode>0644</fileMode>
+		</file>
 
 		<!-- State Processor API -->
 		<file>
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index e8b1e2a818a..f136f963c9b 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -66,6 +66,25 @@ findFlinkDistJar() {
     echo "$FLINK_DIST"
 }
 
+findSqlGatewayJar() {
+    local SQL_GATEWAY
+    SQL_GATEWAY="$(find "$FLINK_OPT_DIR" -name 'flink-sql-gateway*.jar')"
+    local SQL_GATEWAY_COUNT
+    SQL_GATEWAY_COUNT="$(echo "$SQL_GATEWAY" | wc -l)"
+
+    # If flink-sql-gateway*.jar cannot be resolved write error messages to stderr since stdout is stored
+    # as the classpath and exit function with empty classpath to force process failure
+    if [[ "$SQL_GATEWAY" == "" ]]; then
+        (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_OPT_DIR.")
+        exit 1
+    elif [[ "$SQL_GATEWAY_COUNT" -gt 1 ]]; then
+        (>&2 echo "[ERROR] Multiple flink-sql-gateway*.jar found in $FLINK_OPT_DIR. Please resolve.")
+        exit 1
+    fi
+
+    echo "$SQL_GATEWAY"
+}
+
 # These are used to mangle paths that are passed to java when using
 # cygwin. Cygwin paths are like linux paths, i.e. /path/to/somewhere
 # but the windows java version expects them in Windows Format, i.e. C:\bla\blub.
diff --git a/flink-dist/src/main/flink-bin/bin/flink-console.sh b/flink-dist/src/main/flink-bin/bin/flink-console.sh
index 2c84e60cc19..05b9d42632e 100755
--- a/flink-dist/src/main/flink-bin/bin/flink-console.sh
+++ b/flink-dist/src/main/flink-bin/bin/flink-console.sh
@@ -19,7 +19,7 @@
 
 # Start a Flink service as a console application. Must be stopped with Ctrl-C
 # or with SIGTERM by kill or the controlling process.
-USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|kubernetes-session|kubernetes-application|kubernetes-taskmanager) [args]"
+USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|kubernetes-session|kubernetes-application|kubernetes-taskmanager|sqlgateway) [args]"
 
 SERVICE=$1
 ARGS=("${@:2}") # get remaining arguments as array
@@ -62,6 +62,11 @@ case $SERVICE in
         CLASS_TO_RUN=org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner
     ;;
 
+    (sqlgateway)
+        CLASS_TO_RUN=org.apache.flink.table.gateway.SqlGateway
+        SQL_GATEWAY_CLASSPATH=`findSqlGatewayJar`
+    ;;
+
     (*)
         echo "Unknown service '${SERVICE}'. $USAGE."
         exit 1
@@ -111,4 +116,4 @@ echo $$ >> "$pid" 2>/dev/null
 # Evaluate user options for local variable expansion
 FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS})
 
-exec "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}"
+exec "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$SQL_GATEWAY_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}"
diff --git a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
index c6d54420b1d..246b540a972 100644
--- a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
+++ b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
@@ -18,7 +18,7 @@
 ################################################################################
 
 # Start/stop a Flink daemon.
-USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]"
+USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|sqlgateway) [args]"
 
 STARTSTOP=$1
 DAEMON=$2
@@ -50,6 +50,11 @@ case $DAEMON in
         CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
     ;;
 
+    (sqlgateway)
+        CLASS_TO_RUN=org.apache.flink.table.gateway.SqlGateway
+        SQL_GATEWAY_CLASSPATH=`findSqlGatewayJar`
+    ;;
+
     (*)
         echo "Unknown daemon '${DAEMON}'. $USAGE."
         exit 1
@@ -131,7 +136,7 @@ case $STARTSTOP in
         FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS})
 
         echo "Starting $DAEMON daemon on host $HOSTNAME."
-        "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 200<&- 2>&1 < /dev/null &
+        "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$SQL_GATEWAY_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 200<&- 2>&1 < /dev/null &
 
         mypid=$!
 
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
index 93c34a22993..c699adaeb8f 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
@@ -106,6 +106,18 @@ final class FlinkDistribution {
                 bin.resolve("taskmanager.sh").toAbsolutePath().toString(), "start");
     }
 
+    public void startSQLGateway(String arg) throws IOException {
+        LOG.info("Starting Flink SQL Gateway.");
+        AutoClosableProcess.runBlocking(
+                bin.resolve("sql-gateway.sh").toAbsolutePath().toString(), "start", arg);
+    }
+
+    public void stopSQLGateway() throws IOException {
+        LOG.info("Stopping Flink SQL Gateway.");
+        AutoClosableProcess.runBlocking(
+                bin.resolve("sql-gateway.sh").toAbsolutePath().toString(), "stop");
+    }
+
     public void setRootLogLevel(Level logLevel) throws IOException {
         FileUtils.replace(
                 conf.resolve("log4j.properties"),
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
index a7fc0f22555..dc767d382e0 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
@@ -113,6 +113,7 @@ public class LocalStandaloneFlinkResource implements FlinkResource {
     private void shutdownCluster() {
         try {
             distribution.stopFlinkCluster();
+            distribution.stopSQLGateway();
         } catch (IOException e) {
             LOG.warn("Error while shutting down Flink cluster.", e);
         }
@@ -183,6 +184,10 @@ public class LocalStandaloneFlinkResource implements FlinkResource {
         throw new RuntimeException("Cluster did not start in expected time-frame.");
     }
 
+    public void startSQLGateway(String arg) throws IOException {
+        distribution.startSQLGateway(arg);
+    }
+
     @Override
     public Stream<String> searchAllLogs(Pattern pattern, Function<Matcher, String> matchProcessor)
             throws IOException {
diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml b/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml
new file mode 100644
index 00000000000..3b8cc597bb2
--- /dev/null
+++ b/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml
@@ -0,0 +1,84 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<parent>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.16-SNAPSHOT</version>
+	</parent>
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-sql-gateway-test</artifactId>
+	<name>Flink : E2E Tests : SQL Gateway</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-end-to-end-tests-common</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-sql-connector-hive-${hive.version}_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hive</groupId>
+			<artifactId>hive-jdbc</artifactId>
+			<version>${hive.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils-junit</artifactId>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-dependency-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>copy</id>
+						<phase>pre-integration-test</phase>
+						<goals>
+							<goal>copy</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<artifactItems>
+						<artifactItem>
+							<groupId>org.apache.flink</groupId>
+							<artifactId>flink-sql-connector-hive-2.3.9_${scala.binary.version}</artifactId>
+							<version>${project.version}</version>
+							<destFileName>flink-sql-connector-hive-2.3.9_${scala.binary.version}-${project.version}.jar</destFileName>
+							<type>jar</type>
+							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+						</artifactItem>
+					</artifactItems>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+</project>
diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SQLGatewayITCase.java b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SQLGatewayITCase.java
new file mode 100644
index 00000000000..5b037b1ccae
--- /dev/null
+++ b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SQLGatewayITCase.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway;
+
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.JarLocation;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResource;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.nio.file.Path;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class SQLGatewayITCase extends TestLogger {
+
+    private static String JDBC_URL = "jdbc:hive2://localhost:8084/default;auth=noSasl";
+    private static String DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver";
+
+    private static final Path HIVE_SQL_CONNECOTR_JAR =
+            TestUtils.getResource(".*dependencies/flink-sql-connector-hive-.*.jar");
+
+    @Rule
+    public final FlinkResource flink =
+            new LocalStandaloneFlinkResourceFactory()
+                    .create(
+                            FlinkResourceSetup.builder()
+                                    .addJar(HIVE_SQL_CONNECOTR_JAR, JarLocation.LIB)
+                                    .build());
+
+    @Test
+    public void testGateway() throws Exception {
+        try (ClusterController clusterController = flink.startCluster(1)) {
+            ((LocalStandaloneFlinkResource) flink)
+                    .startSQLGateway("-Dsql-gateway.endpoint.type=hiveserver2");
+            Thread.sleep(2000);
+            Class.forName(DRIVER_NAME);
+            try {
+                DriverManager.getConnection(JDBC_URL);
+            } catch (SQLException e) {
+                assertThat(e.getMessage())
+                        .contains(
+                                "Embedded metastore is not allowed. Make sure you have set a valid value for hive.metastore.uris");
+            }
+        }
+    }
+}
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 5fbcaf181f3..31bd7facf9a 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -83,7 +83,8 @@ under the License.
 		<module>flink-end-to-end-tests-elasticsearch7</module>
 		<module>flink-end-to-end-tests-common-elasticsearch</module>
 		<module>flink-end-to-end-tests-sql</module>
-	</modules>
+        <module>flink-sql-gateway-test</module>
+    </modules>
 
 	<dependencyManagement>
 		<dependencies>
diff --git a/flink-table/flink-sql-gateway/bin/sql-gateway.sh b/flink-table/flink-sql-gateway/bin/sql-gateway.sh
new file mode 100644
index 00000000000..f925a25adcf
--- /dev/null
+++ b/flink-table/flink-sql-gateway/bin/sql-gateway.sh
@@ -0,0 +1,84 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Start/stop a Flink SQL Gateway.
+
+function usage() {
+  echo "Usage: bin/sql-gateway.sh command"
+  echo "  commands:"
+  echo "    start            - Run a SQL Gateway as a daemon"
+  echo "    start-foreground - Run a SQL Gateway as a console application"
+  echo "    stop             - Stop the SQL Gateway daemon"
+  echo "    stop-all         - Stop all the SQL Gateway daemons"
+  echo "    -h | --help      - Show this help message"
+}
+
+if [[ "$*" = *--help ]] || [[ "$*" = *-h ]]; then
+  usage
+  exit 0
+fi
+
+STARTSTOP=$1
+
+if [ -z "$STARTSTOP" ]; then
+  STARTSTOP="start"
+fi
+
+if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
+  usage
+  exit 1
+fi
+
+################################################################################
+# Adopted from "flink" bash script
+################################################################################
+
+target="$0"
+# For the case, the executable has been directly symlinked, figure out
+# the correct bin path by following its symlink up to an upper bound.
+# Note: we can't use the readlink utility here if we want to be POSIX
+# compatible.
+iteration=0
+while [ -L "$target" ]; do
+    if [ "$iteration" -gt 100 ]; then
+        echo "Cannot resolve path: You have a cyclic symlink in $target."
+        break
+    fi
+    ls=`ls -ld -- "$target"`
+    target=`expr "$ls" : '.* -> \(.*\)$'`
+    iteration=$((iteration + 1))
+done
+
+# Convert relative path to absolute path
+bin=`dirname "$target"`
+
+# get flink config
+. "$bin"/config.sh
+
+if [ "$FLINK_IDENT_STRING" = "" ]; then
+        FLINK_IDENT_STRING="$USER"
+fi
+
+ENTRYPOINT=sqlgateway
+
+if [[ $STARTSTOP == "start-foreground" ]]; then
+    exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${@:2}"
+else
+    "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${@:2}"
+fi