You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/08/02 15:00:05 UTC

[GitHub] asfgit closed pull request #6422: [FLINK-9833] [e2e] Add a SQL Client end-to-end test with unified source/sink/format

asfgit closed pull request #6422: [FLINK-9833] [e2e] Add a SQL Client end-to-end test with unified source/sink/format
URL: https://github.com/apache/flink/pull/6422
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
new file mode 100644
index 00000000000..dc8be374e43
--- /dev/null
+++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
@@ -0,0 +1,124 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<parent>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.7-SNAPSHOT</version>
+	</parent>
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-sql-client-test</artifactId>
+	<name>flink-sql-client-test</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.scala-lang</groupId>
+			<artifactId>scala-compiler</artifactId>
+			<scope>provided</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<!-- Build toolbox jar. -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>3.1.1</version>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<finalName>SqlToolbox</finalName>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!-- Copy SQL jars into dedicated "sql-jars" directory. -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-dependency-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>copy</id>
+						<phase>package</phase>
+						<goals>
+							<goal>copy</goal>
+						</goals>
+						<configuration>
+							<outputDirectory>${project.build.directory}/sql-jars</outputDirectory>
+							<!-- List of currently provided SQL jars. -->
+							<artifactItems>
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-avro</artifactId>
+									<version>${project.version}</version>
+									<classifier>sql-jar</classifier>
+									<type>jar</type>
+								</artifactItem>
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-json</artifactId>
+									<version>${project.version}</version>
+									<classifier>sql-jar</classifier>
+									<type>jar</type>
+								</artifactItem>
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-connector-kafka-0.9_${scala.binary.version}</artifactId>
+									<version>${project.version}</version>
+									<classifier>sql-jar</classifier>
+									<type>jar</type>
+								</artifactItem>
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
+									<version>${project.version}</version>
+									<classifier>sql-jar</classifier>
+									<type>jar</type>
+								</artifactItem>
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
+									<version>${project.version}</version>
+									<classifier>sql-jar</classifier>
+									<type>jar</type>
+								</artifactItem>
+							</artifactItems>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>
diff --git a/flink-end-to-end-tests/flink-sql-client-test/src/main/java/org/apache/flink/table/toolbox/StringRegexReplaceFunction.java b/flink-end-to-end-tests/flink-sql-client-test/src/main/java/org/apache/flink/table/toolbox/StringRegexReplaceFunction.java
new file mode 100644
index 00000000000..c74e46da39c
--- /dev/null
+++ b/flink-end-to-end-tests/flink-sql-client-test/src/main/java/org/apache/flink/table/toolbox/StringRegexReplaceFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.toolbox;
+
+import org.apache.flink.table.functions.ScalarFunction;
+
+/**
+ * Scalar function for replacing all occurrences of a regular expression with a replacement string.
+ */
+public class StringRegexReplaceFunction extends ScalarFunction {
+
+	public String eval(String input, String regex, String replacement) {
+		return input.replaceAll(regex, replacement);
+	}
+}
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index b429c0181e1..814a30d551a 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -51,6 +51,7 @@ under the License.
 		<module>flink-quickstart-test</module>
 		<module>flink-confluent-schema-registry</module>
 		<module>flink-stream-state-ttl-test</module>
+		<module>flink-sql-client-test</module>
 	</modules>
 
 	<build>
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index d06e80c7315..091435a256b 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -112,5 +112,7 @@ run_test "State TTL RocksDb backend end-to-end test" "$END_TO_END_DIR/test-scrip
 
 run_test "Running Kerberized YARN on Docker test " "$END_TO_END_DIR/test-scripts/test_yarn_kerberos_docker.sh"
 
+run_test "SQL Client end-to-end test" "$END_TO_END_DIR/test-scripts/test_sql_client.sh"
+
 printf "\n[PASS] All tests passed\n"
 exit 0
diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
new file mode 100755
index 00000000000..934f7d43ed4
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
@@ -0,0 +1,288 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/kafka-common.sh
+
+SQL_TOOLBOX_JAR=$END_TO_END_DIR/flink-sql-client-test/target/SqlToolbox.jar
+SQL_JARS_DIR=$END_TO_END_DIR/flink-sql-client-test/target/sql-jars
+
+################################################################################
+# Verify existing SQL jars
+################################################################################
+
+EXTRACTED_JAR=$TEST_DATA_DIR/extracted
+
+mkdir -p $EXTRACTED_JAR
+
+for SQL_JAR in $SQL_JARS_DIR/*.jar; do
+  echo "Checking SQL JAR: $SQL_JAR"
+  unzip $SQL_JAR -d $EXTRACTED_JAR > /dev/null
+
+  # check for proper shading
+  for EXTRACTED_FILE in $(find $EXTRACTED_JAR -type f); do
+
+    if ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/org/apache/flink"* ]] && \
+        ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/META-INF"* ]] && \
+        ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/LICENSE"* ]] && \
+        ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/NOTICE"* ]] ; then
+      echo "Bad file in JAR: $EXTRACTED_FILE"
+      exit 1
+    fi
+  done
+
+  # check for proper factory
+  if [ ! -f $EXTRACTED_JAR/META-INF/services/org.apache.flink.table.factories.TableFactory ]; then
+    echo "No table factory found in JAR: $SQL_JAR"
+    exit 1
+  fi
+
+  # clean up
+  rm -r $EXTRACTED_JAR/*
+done
+
+################################################################################
+# Run a SQL statement
+################################################################################
+
+echo "Testing SQL statement..."
+
+function sql_cleanup() {
+  # don't call ourselves again for another signal interruption
+  trap "exit -1" INT
+  # don't call ourselves again for normal exit
+  trap "" EXIT
+
+  stop_kafka_cluster
+}
+trap sql_cleanup INT
+trap sql_cleanup EXIT
+
+# prepare Kafka
+echo "Preparing Kafka..."
+
+setup_kafka_dist
+
+start_kafka_cluster
+
+create_kafka_topic 1 1 test-json
+create_kafka_topic 1 1 test-avro
+
+# put JSON data into Kafka
+echo "Sending messages to Kafka..."
+
+send_messages_to_kafka '{"timestamp": "2018-03-12 08:00:00", "user": "Alice", "event": { "type": "WARNING", "message": "This is a warning."}}' test-json
+# duplicate
+send_messages_to_kafka '{"timestamp": "2018-03-12 08:10:00", "user": "Alice", "event": { "type": "WARNING", "message": "This is a warning."}}' test-json
+send_messages_to_kafka '{"timestamp": "2018-03-12 09:00:00", "user": "Bob", "event": { "type": "WARNING", "message": "This is another warning."}}' test-json
+send_messages_to_kafka '{"timestamp": "2018-03-12 09:10:00", "user": "Alice", "event": { "type": "INFO", "message": "This is a info."}}' test-json
+send_messages_to_kafka '{"timestamp": "2018-03-12 09:20:00", "user": "Steve", "event": { "type": "INFO", "message": "This is another info."}}' test-json
+# duplicate
+send_messages_to_kafka '{"timestamp": "2018-03-12 09:30:00", "user": "Steve", "event": { "type": "INFO", "message": "This is another info."}}' test-json
+# filtered in results
+send_messages_to_kafka '{"timestamp": "2018-03-12 09:30:00", "user": null, "event": { "type": "WARNING", "message": "This is a bad message because the user is missing."}}' test-json
+# pending in results
+send_messages_to_kafka '{"timestamp": "2018-03-12 10:40:00", "user": "Bob", "event": { "type": "ERROR", "message": "This is an error."}}' test-json
+
+# prepare Flink
+echo "Preparing Flink..."
+
+start_cluster
+start_taskmanagers 1
+
+# create session environment file
+RESULT=$TEST_DATA_DIR/result
+SQL_CONF=$TEST_DATA_DIR/sql-client-session.conf
+
+cat > $SQL_CONF << EOF
+tables:
+  - name: JsonSourceTable
+    type: source
+    update-mode: append
+    schema:
+      - name: rowtime
+        type: TIMESTAMP
+        rowtime:
+          timestamps:
+            type: from-field
+            from: timestamp
+          watermarks:
+            type: periodic-bounded
+            delay: 2000
+      - name: user
+        type: VARCHAR
+      - name: event
+        type: ROW(type VARCHAR, message VARCHAR)
+    connector:
+      type: kafka
+      version: "0.10"
+      topic: test-json
+      startup-mode: earliest-offset
+      properties:
+        - key: zookeeper.connect
+          value: localhost:2181
+        - key: bootstrap.servers
+          value: localhost:9092
+    format:
+      type: json
+      json-schema: >
+        {
+          "type": "object",
+          "properties": {
+            "timestamp": {
+              "type": "string"
+            },
+            "user": {
+              "type": ["string", "null"]
+            },
+            "event": {
+              "type": "object",
+              "properties": {
+                "type": {
+                  "type": "string"
+                },
+                "message": {
+                  "type": "string"
+                }
+              }
+            }
+          }
+        }
+  - name: AvroBothTable
+    type: both
+    update-mode: append
+    schema:
+      - name: event_timestamp
+        type: VARCHAR
+      - name: user
+        type: VARCHAR
+      - name: message
+        type: VARCHAR
+      - name: duplicate_count
+        type: BIGINT
+    connector:
+      type: kafka
+      version: "0.10"
+      topic: test-avro
+      startup-mode: earliest-offset
+      properties:
+        - key: zookeeper.connect
+          value: localhost:2181
+        - key: bootstrap.servers
+          value: localhost:9092
+    format:
+      type: avro
+      avro-schema: >
+        {
+          "namespace": "org.apache.flink.table.tests",
+          "type": "record",
+          "name": "NormalizedEvent",
+            "fields": [
+              {"name": "event_timestamp", "type": "string"},
+              {"name": "user", "type": ["string", "null"]},
+              {"name": "message", "type": "string"},
+              {"name": "duplicate_count", "type": "long"}
+            ]
+        }
+  - name: CsvSinkTable
+    type: sink
+    update-mode: append
+    schema:
+      - name: event_timestamp
+        type: VARCHAR
+      - name: user
+        type: VARCHAR
+      - name: message
+        type: VARCHAR
+      - name: duplicate_count
+        type: BIGINT
+    connector:
+      type: filesystem
+      path: $RESULT
+    format:
+      type: csv
+      fields:
+        - name: event_timestamp
+          type: VARCHAR
+        - name: user
+          type: VARCHAR
+        - name: message
+          type: VARCHAR
+        - name: duplicate_count
+          type: BIGINT
+
+functions:
+  - name: RegReplace
+    from: class
+    class: org.apache.flink.table.toolbox.StringRegexReplaceFunction
+EOF
+
+# submit SQL statements
+
+read -r -d '' SQL_STATEMENT_1 << EOF
+INSERT INTO AvroBothTable
+  SELECT
+    CAST(TUMBLE_START(rowtime, INTERVAL '1' HOUR) AS VARCHAR) AS event_timestamp,
+    user,
+    RegReplace(event.message, ' is ', ' was ') AS message,
+    COUNT(*) AS duplicate_count
+  FROM JsonSourceTable
+  WHERE user IS NOT NULL
+  GROUP BY
+    user,
+    event.message,
+    TUMBLE(rowtime, INTERVAL '1' HOUR)
+EOF
+
+echo "Executing SQL: Kafka JSON -> Kafka Avro"
+echo "$SQL_STATEMENT_1"
+
+$FLINK_DIR/bin/sql-client.sh embedded \
+  --library $SQL_JARS_DIR \
+  --jar $SQL_TOOLBOX_JAR \
+  --environment $SQL_CONF \
+  --update "$SQL_STATEMENT_1"
+
+read -r -d '' SQL_STATEMENT_2 << EOF
+INSERT INTO CsvSinkTable
+  SELECT *
+  FROM AvroBothTable
+EOF
+
+echo "Executing SQL: Kafka Avro -> Filesystem CSV"
+echo "$SQL_STATEMENT_2"
+
+$FLINK_DIR/bin/sql-client.sh embedded \
+  --library $SQL_JARS_DIR \
+  --jar $SQL_TOOLBOX_JAR \
+  --environment $SQL_CONF \
+  --update "$SQL_STATEMENT_2"
+
+echo "Waiting for CSV results..."
+for i in {1..10}; do
+  if [ -e $RESULT ]; then
+    CSV_LINE_COUNT=`cat $RESULT | wc -l`
+    if [ $((CSV_LINE_COUNT)) -eq 4 ]; then
+      break
+    fi
+  fi
+  sleep 5
+done
+
+check_result_hash "SQLClient" $RESULT "dca08a82cc09f6b19950291dbbef16bb"


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services