You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/08/02 14:58:13 UTC
[flink] branch master updated: [FLINK-9833] [e2e] Add a SQL Client
end-to-end test with unified source/sink/format
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 404eda8 [FLINK-9833] [e2e] Add a SQL Client end-to-end test with unified source/sink/format
404eda8 is described below
commit 404eda81753788cda619abf3f42ded6ea5cf4f66
Author: Timo Walther <tw...@apache.org>
AuthorDate: Fri Jul 20 16:55:48 2018 +0200
[FLINK-9833] [e2e] Add a SQL Client end-to-end test with unified source/sink/format
Adds a SQL Client end-to-end test with Kafka/Filesystem and Avro/JSON/CSV components.
It reads JSON from Kafka, uses a UDF for transformation, writes to Kafka Avro, reads
from Kafka Avro, and writes to Filesystem CSV again. It also tests the available
SQL jars for correctness.
This closes #6422.
---
.../flink-sql-client-test/pom.xml | 124 +++++++++
.../table/toolbox/StringRegexReplaceFunction.java | 31 +++
flink-end-to-end-tests/pom.xml | 1 +
flink-end-to-end-tests/run-nightly-tests.sh | 2 +
.../test-scripts/test_sql_client.sh | 288 +++++++++++++++++++++
5 files changed, 446 insertions(+)
diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
new file mode 100644
index 0000000..dc8be37
--- /dev/null
+++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
@@ -0,0 +1,124 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>flink-end-to-end-tests</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.7-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-sql-client-test</artifactId>
+ <name>flink-sql-client-test</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- Build toolbox jar. -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.1.1</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <finalName>SqlToolbox</finalName>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Copy SQL jars into dedicated "sql-jars" directory. -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/sql-jars</outputDirectory>
+ <!-- List of currently provided SQL jars. -->
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-avro</artifactId>
+ <version>${project.version}</version>
+ <classifier>sql-jar</classifier>
+ <type>jar</type>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${project.version}</version>
+ <classifier>sql-jar</classifier>
+ <type>jar</type>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka-0.9_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <classifier>sql-jar</classifier>
+ <type>jar</type>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <classifier>sql-jar</classifier>
+ <type>jar</type>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <classifier>sql-jar</classifier>
+ <type>jar</type>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/flink-end-to-end-tests/flink-sql-client-test/src/main/java/org/apache/flink/table/toolbox/StringRegexReplaceFunction.java b/flink-end-to-end-tests/flink-sql-client-test/src/main/java/org/apache/flink/table/toolbox/StringRegexReplaceFunction.java
new file mode 100644
index 0000000..c74e46d
--- /dev/null
+++ b/flink-end-to-end-tests/flink-sql-client-test/src/main/java/org/apache/flink/table/toolbox/StringRegexReplaceFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.toolbox;
+
+import org.apache.flink.table.functions.ScalarFunction;
+
+/**
+ * Scalar function for replacing all occurrences of a regular expression with a replacement string.
+ */
+public class StringRegexReplaceFunction extends ScalarFunction {
+
+ public String eval(String input, String regex, String replacement) {
+ return input.replaceAll(regex, replacement);
+ }
+}
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 5c204d7..fea91d5 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -52,6 +52,7 @@ under the License.
<module>flink-quickstart-test</module>
<module>flink-confluent-schema-registry</module>
<module>flink-stream-state-ttl-test</module>
+ <module>flink-sql-client-test</module>
</modules>
<build>
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index b4f3789..8439bad 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -113,5 +113,7 @@ run_test "State TTL RocksDb backend end-to-end test" "$END_TO_END_DIR/test-scrip
run_test "Running Kerberized YARN on Docker test " "$END_TO_END_DIR/test-scripts/test_yarn_kerberos_docker.sh"
+run_test "SQL Client end-to-end test" "$END_TO_END_DIR/test-scripts/test_sql_client.sh"
+
printf "\n[PASS] All tests passed\n"
exit 0
diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
new file mode 100755
index 0000000..934f7d4
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
@@ -0,0 +1,288 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/kafka-common.sh
+
+SQL_TOOLBOX_JAR=$END_TO_END_DIR/flink-sql-client-test/target/SqlToolbox.jar
+SQL_JARS_DIR=$END_TO_END_DIR/flink-sql-client-test/target/sql-jars
+
+################################################################################
+# Verify existing SQL jars
+################################################################################
+
+EXTRACTED_JAR=$TEST_DATA_DIR/extracted
+
+mkdir -p $EXTRACTED_JAR
+
+for SQL_JAR in $SQL_JARS_DIR/*.jar; do
+ echo "Checking SQL JAR: $SQL_JAR"
+ unzip $SQL_JAR -d $EXTRACTED_JAR > /dev/null
+
+ # check for proper shading
+ for EXTRACTED_FILE in $(find $EXTRACTED_JAR -type f); do
+
+ if ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/org/apache/flink"* ]] && \
+ ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/META-INF"* ]] && \
+ ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/LICENSE"* ]] && \
+ ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/NOTICE"* ]] ; then
+ echo "Bad file in JAR: $EXTRACTED_FILE"
+ exit 1
+ fi
+ done
+
+ # check for proper factory
+ if [ ! -f $EXTRACTED_JAR/META-INF/services/org.apache.flink.table.factories.TableFactory ]; then
+ echo "No table factory found in JAR: $SQL_JAR"
+ exit 1
+ fi
+
+ # clean up
+ rm -r $EXTRACTED_JAR/*
+done
+
+################################################################################
+# Run a SQL statement
+################################################################################
+
+echo "Testing SQL statement..."
+
+function sql_cleanup() {
+ # don't call ourselves again for another signal interruption
+ trap "exit -1" INT
+ # don't call ourselves again for normal exit
+ trap "" EXIT
+
+ stop_kafka_cluster
+}
+trap sql_cleanup INT
+trap sql_cleanup EXIT
+
+# prepare Kafka
+echo "Preparing Kafka..."
+
+setup_kafka_dist
+
+start_kafka_cluster
+
+create_kafka_topic 1 1 test-json
+create_kafka_topic 1 1 test-avro
+
+# put JSON data into Kafka
+echo "Sending messages to Kafka..."
+
+send_messages_to_kafka '{"timestamp": "2018-03-12 08:00:00", "user": "Alice", "event": { "type": "WARNING", "message": "This is a warning."}}' test-json
+# duplicate
+send_messages_to_kafka '{"timestamp": "2018-03-12 08:10:00", "user": "Alice", "event": { "type": "WARNING", "message": "This is a warning."}}' test-json
+send_messages_to_kafka '{"timestamp": "2018-03-12 09:00:00", "user": "Bob", "event": { "type": "WARNING", "message": "This is another warning."}}' test-json
+send_messages_to_kafka '{"timestamp": "2018-03-12 09:10:00", "user": "Alice", "event": { "type": "INFO", "message": "This is a info."}}' test-json
+send_messages_to_kafka '{"timestamp": "2018-03-12 09:20:00", "user": "Steve", "event": { "type": "INFO", "message": "This is another info."}}' test-json
+# duplicate
+send_messages_to_kafka '{"timestamp": "2018-03-12 09:30:00", "user": "Steve", "event": { "type": "INFO", "message": "This is another info."}}' test-json
+# filtered in results
+send_messages_to_kafka '{"timestamp": "2018-03-12 09:30:00", "user": null, "event": { "type": "WARNING", "message": "This is a bad message because the user is missing."}}' test-json
+# pending in results
+send_messages_to_kafka '{"timestamp": "2018-03-12 10:40:00", "user": "Bob", "event": { "type": "ERROR", "message": "This is an error."}}' test-json
+
+# prepare Flink
+echo "Preparing Flink..."
+
+start_cluster
+start_taskmanagers 1
+
+# create session environment file
+RESULT=$TEST_DATA_DIR/result
+SQL_CONF=$TEST_DATA_DIR/sql-client-session.conf
+
+cat > $SQL_CONF << EOF
+tables:
+ - name: JsonSourceTable
+ type: source
+ update-mode: append
+ schema:
+ - name: rowtime
+ type: TIMESTAMP
+ rowtime:
+ timestamps:
+ type: from-field
+ from: timestamp
+ watermarks:
+ type: periodic-bounded
+ delay: 2000
+ - name: user
+ type: VARCHAR
+ - name: event
+ type: ROW(type VARCHAR, message VARCHAR)
+ connector:
+ type: kafka
+ version: "0.10"
+ topic: test-json
+ startup-mode: earliest-offset
+ properties:
+ - key: zookeeper.connect
+ value: localhost:2181
+ - key: bootstrap.servers
+ value: localhost:9092
+ format:
+ type: json
+ json-schema: >
+ {
+ "type": "object",
+ "properties": {
+ "timestamp": {
+ "type": "string"
+ },
+ "user": {
+ "type": ["string", "null"]
+ },
+ "event": {
+ "type": "object",
+ "properties": {
+ "type": {
+ "type": "string"
+ },
+ "message": {
+ "type": "string"
+ }
+ }
+ }
+ }
+ }
+ - name: AvroBothTable
+ type: both
+ update-mode: append
+ schema:
+ - name: event_timestamp
+ type: VARCHAR
+ - name: user
+ type: VARCHAR
+ - name: message
+ type: VARCHAR
+ - name: duplicate_count
+ type: BIGINT
+ connector:
+ type: kafka
+ version: "0.10"
+ topic: test-avro
+ startup-mode: earliest-offset
+ properties:
+ - key: zookeeper.connect
+ value: localhost:2181
+ - key: bootstrap.servers
+ value: localhost:9092
+ format:
+ type: avro
+ avro-schema: >
+ {
+ "namespace": "org.apache.flink.table.tests",
+ "type": "record",
+ "name": "NormalizedEvent",
+ "fields": [
+ {"name": "event_timestamp", "type": "string"},
+ {"name": "user", "type": ["string", "null"]},
+ {"name": "message", "type": "string"},
+ {"name": "duplicate_count", "type": "long"}
+ ]
+ }
+ - name: CsvSinkTable
+ type: sink
+ update-mode: append
+ schema:
+ - name: event_timestamp
+ type: VARCHAR
+ - name: user
+ type: VARCHAR
+ - name: message
+ type: VARCHAR
+ - name: duplicate_count
+ type: BIGINT
+ connector:
+ type: filesystem
+ path: $RESULT
+ format:
+ type: csv
+ fields:
+ - name: event_timestamp
+ type: VARCHAR
+ - name: user
+ type: VARCHAR
+ - name: message
+ type: VARCHAR
+ - name: duplicate_count
+ type: BIGINT
+
+functions:
+ - name: RegReplace
+ from: class
+ class: org.apache.flink.table.toolbox.StringRegexReplaceFunction
+EOF
+
+# submit SQL statements
+
+read -r -d '' SQL_STATEMENT_1 << EOF
+INSERT INTO AvroBothTable
+ SELECT
+ CAST(TUMBLE_START(rowtime, INTERVAL '1' HOUR) AS VARCHAR) AS event_timestamp,
+ user,
+ RegReplace(event.message, ' is ', ' was ') AS message,
+ COUNT(*) AS duplicate_count
+ FROM JsonSourceTable
+ WHERE user IS NOT NULL
+ GROUP BY
+ user,
+ event.message,
+ TUMBLE(rowtime, INTERVAL '1' HOUR)
+EOF
+
+echo "Executing SQL: Kafka JSON -> Kafka Avro"
+echo "$SQL_STATEMENT_1"
+
+$FLINK_DIR/bin/sql-client.sh embedded \
+ --library $SQL_JARS_DIR \
+ --jar $SQL_TOOLBOX_JAR \
+ --environment $SQL_CONF \
+ --update "$SQL_STATEMENT_1"
+
+read -r -d '' SQL_STATEMENT_2 << EOF
+INSERT INTO CsvSinkTable
+ SELECT *
+ FROM AvroBothTable
+EOF
+
+echo "Executing SQL: Kafka Avro -> Filesystem CSV"
+echo "$SQL_STATEMENT_2"
+
+$FLINK_DIR/bin/sql-client.sh embedded \
+ --library $SQL_JARS_DIR \
+ --jar $SQL_TOOLBOX_JAR \
+ --environment $SQL_CONF \
+ --update "$SQL_STATEMENT_2"
+
+echo "Waiting for CSV results..."
+for i in {1..10}; do
+ if [ -e $RESULT ]; then
+ CSV_LINE_COUNT=`cat $RESULT | wc -l`
+ if [ $((CSV_LINE_COUNT)) -eq 4 ]; then
+ break
+ fi
+ fi
+ sleep 5
+done
+
+check_result_hash "SQLClient" $RESULT "dca08a82cc09f6b19950291dbbef16bb"