You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/08/02 14:58:13 UTC

[flink] branch master updated: [FLINK-9833] [e2e] Add a SQL Client end-to-end test with unified source/sink/format

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 404eda8  [FLINK-9833] [e2e] Add a SQL Client end-to-end test with unified source/sink/format
404eda8 is described below

commit 404eda81753788cda619abf3f42ded6ea5cf4f66
Author: Timo Walther <tw...@apache.org>
AuthorDate: Fri Jul 20 16:55:48 2018 +0200

    [FLINK-9833] [e2e] Add a SQL Client end-to-end test with unified source/sink/format
    
    Adds a SQL Client end-to-end test with Kafka/Filesystem and Avro/JSON/CSV components.
    It reads JSON from Kafka, uses a UDF for transformation, writes to Kafka Avro, reads
    from Kafka Avro, and writes to Filesystem CSV again. It also tests the available
    SQL jars for correctness.
    
    This closes #6422.
---
 .../flink-sql-client-test/pom.xml                  | 124 +++++++++
 .../table/toolbox/StringRegexReplaceFunction.java  |  31 +++
 flink-end-to-end-tests/pom.xml                     |   1 +
 flink-end-to-end-tests/run-nightly-tests.sh        |   2 +
 .../test-scripts/test_sql_client.sh                | 288 +++++++++++++++++++++
 5 files changed, 446 insertions(+)

diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
new file mode 100644
index 0000000..dc8be37
--- /dev/null
+++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
@@ -0,0 +1,124 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<parent>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.7-SNAPSHOT</version>
+	</parent>
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-sql-client-test</artifactId>
+	<name>flink-sql-client-test</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.scala-lang</groupId>
+			<artifactId>scala-compiler</artifactId>
+			<scope>provided</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<!-- Build toolbox jar. -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>3.1.1</version>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<finalName>SqlToolbox</finalName>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!-- Copy SQL jars into dedicated "sql-jars" directory. -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-dependency-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>copy</id>
+						<phase>package</phase>
+						<goals>
+							<goal>copy</goal>
+						</goals>
+						<configuration>
+							<outputDirectory>${project.build.directory}/sql-jars</outputDirectory>
+							<!-- List of currently provided SQL jars. -->
+							<artifactItems>
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-avro</artifactId>
+									<version>${project.version}</version>
+									<classifier>sql-jar</classifier>
+									<type>jar</type>
+								</artifactItem>
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-json</artifactId>
+									<version>${project.version}</version>
+									<classifier>sql-jar</classifier>
+									<type>jar</type>
+								</artifactItem>
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-connector-kafka-0.9_${scala.binary.version}</artifactId>
+									<version>${project.version}</version>
+									<classifier>sql-jar</classifier>
+									<type>jar</type>
+								</artifactItem>
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
+									<version>${project.version}</version>
+									<classifier>sql-jar</classifier>
+									<type>jar</type>
+								</artifactItem>
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
+									<version>${project.version}</version>
+									<classifier>sql-jar</classifier>
+									<type>jar</type>
+								</artifactItem>
+							</artifactItems>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>
diff --git a/flink-end-to-end-tests/flink-sql-client-test/src/main/java/org/apache/flink/table/toolbox/StringRegexReplaceFunction.java b/flink-end-to-end-tests/flink-sql-client-test/src/main/java/org/apache/flink/table/toolbox/StringRegexReplaceFunction.java
new file mode 100644
index 0000000..c74e46d
--- /dev/null
+++ b/flink-end-to-end-tests/flink-sql-client-test/src/main/java/org/apache/flink/table/toolbox/StringRegexReplaceFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.toolbox;
+
+import org.apache.flink.table.functions.ScalarFunction;
+
+/**
+ * Scalar function for replacing all occurrences of a regular expression with a replacement string.
+ */
+public class StringRegexReplaceFunction extends ScalarFunction {
+
+	public String eval(String input, String regex, String replacement) {
+		return input.replaceAll(regex, replacement);
+	}
+}
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 5c204d7..fea91d5 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -52,6 +52,7 @@ under the License.
 		<module>flink-quickstart-test</module>
 		<module>flink-confluent-schema-registry</module>
 		<module>flink-stream-state-ttl-test</module>
+		<module>flink-sql-client-test</module>
 	</modules>
 
 	<build>
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index b4f3789..8439bad 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -113,5 +113,7 @@ run_test "State TTL RocksDb backend end-to-end test" "$END_TO_END_DIR/test-scrip
 
 run_test "Running Kerberized YARN on Docker test " "$END_TO_END_DIR/test-scripts/test_yarn_kerberos_docker.sh"
 
+run_test "SQL Client end-to-end test" "$END_TO_END_DIR/test-scripts/test_sql_client.sh"
+
 printf "\n[PASS] All tests passed\n"
 exit 0
diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
new file mode 100755
index 0000000..934f7d4
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
@@ -0,0 +1,288 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/kafka-common.sh
+
+SQL_TOOLBOX_JAR=$END_TO_END_DIR/flink-sql-client-test/target/SqlToolbox.jar
+SQL_JARS_DIR=$END_TO_END_DIR/flink-sql-client-test/target/sql-jars
+
+################################################################################
+# Verify existing SQL jars
+################################################################################
+
+EXTRACTED_JAR=$TEST_DATA_DIR/extracted
+
+mkdir -p $EXTRACTED_JAR
+
+for SQL_JAR in $SQL_JARS_DIR/*.jar; do
+  echo "Checking SQL JAR: $SQL_JAR"
+  unzip $SQL_JAR -d $EXTRACTED_JAR > /dev/null
+
+  # check for proper shading
+  for EXTRACTED_FILE in $(find $EXTRACTED_JAR -type f); do
+
+    if ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/org/apache/flink"* ]] && \
+        ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/META-INF"* ]] && \
+        ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/LICENSE"* ]] && \
+        ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/NOTICE"* ]] ; then
+      echo "Bad file in JAR: $EXTRACTED_FILE"
+      exit 1
+    fi
+  done
+
+  # check for proper factory
+  if [ ! -f $EXTRACTED_JAR/META-INF/services/org.apache.flink.table.factories.TableFactory ]; then
+    echo "No table factory found in JAR: $SQL_JAR"
+    exit 1
+  fi
+
+  # clean up
+  rm -r $EXTRACTED_JAR/*
+done
+
+################################################################################
+# Run a SQL statement
+################################################################################
+
+echo "Testing SQL statement..."
+
+function sql_cleanup() {
+  # don't call ourselves again for another signal interruption
+  trap "exit -1" INT
+  # don't call ourselves again for normal exit
+  trap "" EXIT
+
+  stop_kafka_cluster
+}
+trap sql_cleanup INT
+trap sql_cleanup EXIT
+
+# prepare Kafka
+echo "Preparing Kafka..."
+
+setup_kafka_dist
+
+start_kafka_cluster
+
+create_kafka_topic 1 1 test-json
+create_kafka_topic 1 1 test-avro
+
+# put JSON data into Kafka
+echo "Sending messages to Kafka..."
+
+send_messages_to_kafka '{"timestamp": "2018-03-12 08:00:00", "user": "Alice", "event": { "type": "WARNING", "message": "This is a warning."}}' test-json
+# duplicate
+send_messages_to_kafka '{"timestamp": "2018-03-12 08:10:00", "user": "Alice", "event": { "type": "WARNING", "message": "This is a warning."}}' test-json
+send_messages_to_kafka '{"timestamp": "2018-03-12 09:00:00", "user": "Bob", "event": { "type": "WARNING", "message": "This is another warning."}}' test-json
+send_messages_to_kafka '{"timestamp": "2018-03-12 09:10:00", "user": "Alice", "event": { "type": "INFO", "message": "This is a info."}}' test-json
+send_messages_to_kafka '{"timestamp": "2018-03-12 09:20:00", "user": "Steve", "event": { "type": "INFO", "message": "This is another info."}}' test-json
+# duplicate
+send_messages_to_kafka '{"timestamp": "2018-03-12 09:30:00", "user": "Steve", "event": { "type": "INFO", "message": "This is another info."}}' test-json
+# filtered in results
+send_messages_to_kafka '{"timestamp": "2018-03-12 09:30:00", "user": null, "event": { "type": "WARNING", "message": "This is a bad message because the user is missing."}}' test-json
+# pending in results
+send_messages_to_kafka '{"timestamp": "2018-03-12 10:40:00", "user": "Bob", "event": { "type": "ERROR", "message": "This is an error."}}' test-json
+
+# prepare Flink
+echo "Preparing Flink..."
+
+start_cluster
+start_taskmanagers 1
+
+# create session environment file
+RESULT=$TEST_DATA_DIR/result
+SQL_CONF=$TEST_DATA_DIR/sql-client-session.conf
+
+cat > $SQL_CONF << EOF
+tables:
+  - name: JsonSourceTable
+    type: source
+    update-mode: append
+    schema:
+      - name: rowtime
+        type: TIMESTAMP
+        rowtime:
+          timestamps:
+            type: from-field
+            from: timestamp
+          watermarks:
+            type: periodic-bounded
+            delay: 2000
+      - name: user
+        type: VARCHAR
+      - name: event
+        type: ROW(type VARCHAR, message VARCHAR)
+    connector:
+      type: kafka
+      version: "0.10"
+      topic: test-json
+      startup-mode: earliest-offset
+      properties:
+        - key: zookeeper.connect
+          value: localhost:2181
+        - key: bootstrap.servers
+          value: localhost:9092
+    format:
+      type: json
+      json-schema: >
+        {
+          "type": "object",
+          "properties": {
+            "timestamp": {
+              "type": "string"
+            },
+            "user": {
+              "type": ["string", "null"]
+            },
+            "event": {
+              "type": "object",
+              "properties": {
+                "type": {
+                  "type": "string"
+                },
+                "message": {
+                  "type": "string"
+                }
+              }
+            }
+          }
+        }
+  - name: AvroBothTable
+    type: both
+    update-mode: append
+    schema:
+      - name: event_timestamp
+        type: VARCHAR
+      - name: user
+        type: VARCHAR
+      - name: message
+        type: VARCHAR
+      - name: duplicate_count
+        type: BIGINT
+    connector:
+      type: kafka
+      version: "0.10"
+      topic: test-avro
+      startup-mode: earliest-offset
+      properties:
+        - key: zookeeper.connect
+          value: localhost:2181
+        - key: bootstrap.servers
+          value: localhost:9092
+    format:
+      type: avro
+      avro-schema: >
+        {
+          "namespace": "org.apache.flink.table.tests",
+          "type": "record",
+          "name": "NormalizedEvent",
+            "fields": [
+              {"name": "event_timestamp", "type": "string"},
+              {"name": "user", "type": ["string", "null"]},
+              {"name": "message", "type": "string"},
+              {"name": "duplicate_count", "type": "long"}
+            ]
+        }
+  - name: CsvSinkTable
+    type: sink
+    update-mode: append
+    schema:
+      - name: event_timestamp
+        type: VARCHAR
+      - name: user
+        type: VARCHAR
+      - name: message
+        type: VARCHAR
+      - name: duplicate_count
+        type: BIGINT
+    connector:
+      type: filesystem
+      path: $RESULT
+    format:
+      type: csv
+      fields:
+        - name: event_timestamp
+          type: VARCHAR
+        - name: user
+          type: VARCHAR
+        - name: message
+          type: VARCHAR
+        - name: duplicate_count
+          type: BIGINT
+
+functions:
+  - name: RegReplace
+    from: class
+    class: org.apache.flink.table.toolbox.StringRegexReplaceFunction
+EOF
+
+# submit SQL statements
+
+read -r -d '' SQL_STATEMENT_1 << EOF
+INSERT INTO AvroBothTable
+  SELECT
+    CAST(TUMBLE_START(rowtime, INTERVAL '1' HOUR) AS VARCHAR) AS event_timestamp,
+    user,
+    RegReplace(event.message, ' is ', ' was ') AS message,
+    COUNT(*) AS duplicate_count
+  FROM JsonSourceTable
+  WHERE user IS NOT NULL
+  GROUP BY
+    user,
+    event.message,
+    TUMBLE(rowtime, INTERVAL '1' HOUR)
+EOF
+
+echo "Executing SQL: Kafka JSON -> Kafka Avro"
+echo "$SQL_STATEMENT_1"
+
+$FLINK_DIR/bin/sql-client.sh embedded \
+  --library $SQL_JARS_DIR \
+  --jar $SQL_TOOLBOX_JAR \
+  --environment $SQL_CONF \
+  --update "$SQL_STATEMENT_1"
+
+read -r -d '' SQL_STATEMENT_2 << EOF
+INSERT INTO CsvSinkTable
+  SELECT *
+  FROM AvroBothTable
+EOF
+
+echo "Executing SQL: Kafka Avro -> Filesystem CSV"
+echo "$SQL_STATEMENT_2"
+
+$FLINK_DIR/bin/sql-client.sh embedded \
+  --library $SQL_JARS_DIR \
+  --jar $SQL_TOOLBOX_JAR \
+  --environment $SQL_CONF \
+  --update "$SQL_STATEMENT_2"
+
+echo "Waiting for CSV results..."
+for i in {1..10}; do
+  if [ -e $RESULT ]; then
+    CSV_LINE_COUNT=`cat $RESULT | wc -l`
+    if [ $((CSV_LINE_COUNT)) -eq 4 ]; then
+      break
+    fi
+  fi
+  sleep 5
+done
+
+check_result_hash "SQLClient" $RESULT "dca08a82cc09f6b19950291dbbef16bb"