You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/07/06 12:15:53 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-27009] Add example for running SQL scripts using the operator

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 9bb83c0  [FLINK-27009] Add example for running SQL scripts using the operator
9bb83c0 is described below

commit 9bb83c0c9180b77c1ce6b3d96c0c3bb1f93d51de
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Wed Jul 6 14:15:47 2022 +0200

    [FLINK-27009] Add example for running SQL scripts using the operator
---
 examples/flink-sql-runner-example/Dockerfile       |  23 ++++
 examples/flink-sql-runner-example/README.md        |  80 ++++++++++++
 examples/flink-sql-runner-example/pom.xml          | 134 +++++++++++++++++++++
 examples/flink-sql-runner-example/sql-example.yaml |  41 +++++++
 .../sql-scripts/simple.sql                         |  27 +++++
 .../sql-scripts/statement-set.sql                  |  38 ++++++
 .../java/org/apache/flink/examples/SqlRunner.java  |  96 +++++++++++++++
 .../src/main/resources/log4j2.properties           |  25 ++++
 pom.xml                                            |   1 +
 9 files changed, 465 insertions(+)

diff --git a/examples/flink-sql-runner-example/Dockerfile b/examples/flink-sql-runner-example/Dockerfile
new file mode 100644
index 0000000..67bb563
--- /dev/null
+++ b/examples/flink-sql-runner-example/Dockerfile
@@ -0,0 +1,23 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+FROM flink:1.15.0
+
+RUN mkdir /opt/flink/usrlib
+ADD target/flink-sql-runner-example-*.jar /opt/flink/usrlib/sql-runner.jar
+ADD sql-scripts /opt/flink/usrlib/sql-scripts
diff --git a/examples/flink-sql-runner-example/README.md b/examples/flink-sql-runner-example/README.md
new file mode 100644
index 0000000..942cbb4
--- /dev/null
+++ b/examples/flink-sql-runner-example/README.md
@@ -0,0 +1,80 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Flink Kubernetes Operator SQL Example
+
+## Overview
+
+This is an end-to-end example of running Flink SQL scripts using the Flink Kubernetes Operator.
+
+It is only intended to serve as a showcase of how Flink SQL can be executed on the operator and users are expected to extend the implementation and dependencies based on their production needs. 
+
+Currently, it is not planned to add direct API support for SQL submission to the Kubernetes operator due to the complexity of image and dependency management that is specific to each use-case.
+At the same time we are confident that using these examples as a starting point the operator would cover all user needs. If Apache Flink itself extends the SQL support for Application mode in the future, the operator will aim to support that.
+
+*What's in this example?*
+
+ 1. SQL Script runner Flink Java application
+ 2. DockerFile to build custom image with script runner + SQL scripts
+ 3. Example YAML for submitting scripts using the operator
+
+## How does it work?
+
+As Flink doesn't support submitting SQL scripts directly as jobs, we have created a simple Flink Java application that takes the user script and executes it using the `TableEnvironment#executeSql` method.
+
+The SQL Runner will allow us to execute SQL scripts as if they were simple Flink Application jars, something that already works quite well with the operator. We package the included SQL Runner implementation together with the SQL scripts under `sql-scripts` into a docker image and we use it in our `FlinkDeployment` yaml file.
+
+***Note:*** *While the included SqlRunner should work for most simple cases, it is not expected to be very robust or battle tested. If you find any bugs or limitations, feel free to open Jira tickets and bugfix PRs.*
+
+## Usage
+
+The following steps assume that you have the Flink Kubernetes Operator installed and running in your environment.
+
+**Step 1**: Build Sql Runner maven project
+```bash
+cd examples/flink-sql-runner-example
+mvn clean package
+```
+
+**Step 2**: Add your SQL script files under the `sql-scripts` directory
+
+**Step 3**: Build docker image
+```bash
+# Uncomment when building for local minikube env:
+# eval $(minikube docker-env)
+
+DOCKER_BUILDKIT=1 docker build . -t flink-sql-runner-example:latest
+```
+This step will create an image based on an official Flink base image including the SQL runner jar and your user scripts.
+
+**Step 4**: Create FlinkDeployment Yaml and Submit
+
+Edit the included `sql-example.yaml` so that the `job.args` section points to the SQL script that you wish to execute, then submit it.
+
+```bash
+kubectl apply -f sql-example.yaml
+```
+
+## Connectors and other extensions
+
+This example will only work with the very basic table and connector types out of the box, however enabling new ones is very easy.
+
+Simply find your [required connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/overview/) and add it as compile dependency to the `flink-sql-runner-example` project `pom.xml`. This will ensure that is packaged into the sql-runner fatjar and will be available for you on the cluster.
+
+Once you dive deeper you will quickly find that the SqlRunner implementation is very basic and might not cover your more advanced needs. Feel free to simply extend or customise the code as necessary for your requirements.
diff --git a/examples/flink-sql-runner-example/pom.xml b/examples/flink-sql-runner-example/pom.xml
new file mode 100644
index 0000000..4c53652
--- /dev/null
+++ b/examples/flink-sql-runner-example/pom.xml
@@ -0,0 +1,134 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-kubernetes-operator-parent</artifactId>
+			<version>1.1-SNAPSHOT</version>
+			<relativePath>../..</relativePath>
+	</parent>
+
+	<artifactId>flink-sql-runner-example</artifactId>
+	<name>Flink SQL Runner Example</name>
+
+	<dependencies>
+		<!-- Apache Flink dependencies -->
+		<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${flink.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-api-java</artifactId>
+			<version>${flink.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<!-- Add connector dependencies here. They must be in the default scope (compile). -->
+
+		<!-- Example:
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-kafka</artifactId>
+			<version>${flink.version}</version>
+		</dependency>
+		-->
+
+		<!-- Add logging framework, to produce console output when running in the IDE. -->
+		<!-- These dependencies are excluded from the application JAR by default. -->
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-api</artifactId>
+			<version>${slf4j.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-slf4j-impl</artifactId>
+			<version>${log4j.version}</version>
+			<scope>runtime</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-api</artifactId>
+			<version>${log4j.version}</version>
+			<scope>runtime</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-core</artifactId>
+			<version>${log4j.version}</version>
+			<scope>runtime</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>3.1.1</version>
+				<executions>
+					<!-- Run shade goal on package phase -->
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<artifactSet>
+								<excludes>
+									<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
+									<exclude>com.google.code.findbugs:jsr305</exclude>
+									<exclude>org.slf4j:*</exclude>
+									<exclude>org.apache.logging.log4j:*</exclude>
+								</excludes>
+							</artifactSet>
+							<filters>
+								<filter>
+									<!-- Do not copy the signatures in the META-INF folder.
+									Otherwise, this might cause SecurityExceptions when using the JAR. -->
+									<artifact>*:*</artifact>
+									<excludes>
+										<exclude>META-INF/*.SF</exclude>
+										<exclude>META-INF/*.DSA</exclude>
+										<exclude>META-INF/*.RSA</exclude>
+									</excludes>
+								</filter>
+							</filters>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.apache.flink.examples.SqlRunner</mainClass>
+								</transformer>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>
diff --git a/examples/flink-sql-runner-example/sql-example.yaml b/examples/flink-sql-runner-example/sql-example.yaml
new file mode 100644
index 0000000..14865e0
--- /dev/null
+++ b/examples/flink-sql-runner-example/sql-example.yaml
@@ -0,0 +1,41 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkDeployment
+metadata:
+  name: sql-example
+spec:
+  image: flink-sql-runner-example:latest
+  flinkVersion: v1_15
+  flinkConfiguration:
+    taskmanager.numberOfTaskSlots: "1"
+  serviceAccount: flink
+  jobManager:
+    resource:
+      memory: "2048m"
+      cpu: 1
+  taskManager:
+    resource:
+      memory: "2048m"
+      cpu: 1
+  job:
+    jarURI: local:///opt/flink/usrlib/sql-runner.jar
+    args: ["/opt/flink/usrlib/sql-scripts/simple.sql"]
+    parallelism: 1
+    upgradeMode: stateless
diff --git a/examples/flink-sql-runner-example/sql-scripts/simple.sql b/examples/flink-sql-runner-example/sql-scripts/simple.sql
new file mode 100644
index 0000000..8b8634e
--- /dev/null
+++ b/examples/flink-sql-runner-example/sql-scripts/simple.sql
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE blackhole_table (
+  name STRING,
+  age INT
+) WITH (
+  'connector' = 'blackhole'
+);
+
+INSERT INTO blackhole_table
+  VALUES ('fred flintstone', 35), ('barney rubble', 32);
diff --git a/examples/flink-sql-runner-example/sql-scripts/statement-set.sql b/examples/flink-sql-runner-example/sql-scripts/statement-set.sql
new file mode 100644
index 0000000..ec45fd2
--- /dev/null
+++ b/examples/flink-sql-runner-example/sql-scripts/statement-set.sql
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE blackhole_table (
+  name STRING,
+  age INT
+) WITH (
+  'connector' = 'blackhole'
+);
+CREATE TABLE blackhole_table2 (
+  name STRING,
+  age INT
+) WITH (
+  'connector' = 'blackhole'
+);
+
+EXECUTE STATEMENT SET
+BEGIN
+INSERT INTO blackhole_table
+  VALUES ('fred flintstone', 35), ('barney rubble', 32);
+INSERT INTO blackhole_table2
+  VALUES ('fred flintstone', 35), ('barney rubble', 32);
+END;
diff --git a/examples/flink-sql-runner-example/src/main/java/org/apache/flink/examples/SqlRunner.java b/examples/flink-sql-runner-example/src/main/java/org/apache/flink/examples/SqlRunner.java
new file mode 100644
index 0000000..0717990
--- /dev/null
+++ b/examples/flink-sql-runner-example/src/main/java/org/apache/flink/examples/SqlRunner.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.util.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Main class for executing SQL scripts. */
+public class SqlRunner {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SqlRunner.class);
+
+    private static final String STATEMENT_DELIMITER = ";"; // a statement should end with `;`
+    private static final String LINE_DELIMITER = "\n";
+
+    private static final String COMMENT_PATTERN = "(--.*)|(((\\/\\*)+?[\\w\\W]+?(\\*\\/)+))";
+
+    public static void main(String[] args) throws Exception {
+        if (args.length != 1) {
+            throw new Exception("Exactly one argument is expected.");
+        }
+        var script = FileUtils.readFileUtf8(new File(args[0]));
+        var statements = parseStatements(script);
+
+        var tableEnv = TableEnvironment.create(new Configuration());
+
+        for (String statement : statements) {
+            LOG.info("Executing:\n{}", statement);
+            tableEnv.executeSql(statement);
+        }
+    }
+
+    public static List<String> parseStatements(String script) {
+        var formatted = formatSqlFile(script).replaceAll(COMMENT_PATTERN, "");
+        var statements = new ArrayList<String>();
+
+        StringBuilder current = null;
+        boolean statementSet = false;
+        for (String line : formatted.split("\n")) {
+            var trimmed = line.trim();
+            if (trimmed.isBlank()) {
+                continue;
+            }
+            if (current == null) {
+                current = new StringBuilder();
+            }
+            if (trimmed.startsWith("EXECUTE STATEMENT SET")) {
+                statementSet = true;
+            }
+            current.append(trimmed);
+            current.append("\n");
+            if (trimmed.endsWith(STATEMENT_DELIMITER)) {
+                if (!statementSet || trimmed.equals("END;")) {
+                    statements.add(current.toString());
+                    current = null;
+                    statementSet = false;
+                }
+            }
+        }
+        return statements;
+    }
+
+    public static String formatSqlFile(String content) {
+        String trimmed = content.trim();
+        StringBuilder formatted = new StringBuilder();
+        formatted.append(trimmed);
+        if (!trimmed.endsWith(STATEMENT_DELIMITER)) {
+            formatted.append(STATEMENT_DELIMITER);
+        }
+        formatted.append(LINE_DELIMITER);
+        return formatted.toString();
+    }
+}
diff --git a/examples/flink-sql-runner-example/src/main/resources/log4j2.properties b/examples/flink-sql-runner-example/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..cea22d8
--- /dev/null
+++ b/examples/flink-sql-runner-example/src/main/resources/log4j2.properties
@@ -0,0 +1,25 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+rootLogger.level=INFO
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/pom.xml b/pom.xml
index 981815f..68216c2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,6 +55,7 @@ under the License.
         <module>flink-kubernetes-operator</module>
         <module>flink-kubernetes-webhook</module>
         <module>flink-kubernetes-docs</module>
+        <module>examples/flink-sql-runner-example</module>
     </modules>
 
     <properties>