You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/15 10:21:51 UTC

[GitHub] asfgit closed pull request #6927: [FLINK-10624] Extend SQL client end-to-end to test new KafkaTableSink

asfgit closed pull request #6927: [FLINK-10624] Extend SQL client end-to-end to test new KafkaTableSink
URL: https://github.com/apache/flink/pull/6927
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
index b3a48697f6c..17a26a39cd6 100644
--- a/flink-end-to-end-tests/flink-sql-client-test/pom.xml
+++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
@@ -89,6 +89,14 @@ under the License.
 			<classifier>sql-jar</classifier>
 			<scope>provided</scope>
 		</dependency>
+		<dependency>
+			<!-- Used by maven-dependency-plugin -->
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<classifier>sql-jar</classifier>
+			<scope>provided</scope>
+		</dependency>
 		<dependency>
 			<!-- Used by maven-dependency-plugin -->
 			<groupId>org.apache.flink</groupId>
@@ -106,7 +114,7 @@ under the License.
 					as we neither access nor package the kafka dependencies -->
 				<groupId>org.apache.kafka</groupId>
 				<artifactId>kafka-clients</artifactId>
-				<version>0.11.0.2</version>
+				<version>2.0.0</version>
 			</dependency>
 		</dependencies>
 	</dependencyManagement>
@@ -130,19 +138,19 @@ under the License.
 				</executions>
 			</plugin>
 
-			<!-- Copy SQL jars into dedicated "sql-jars" directory. -->
+			<!-- Copy SQL jars into dedicated "sql-jars-kafka-0.10" directory. -->
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-dependency-plugin</artifactId>
 				<executions>
 					<execution>
-						<id>copy</id>
+						<id>copy-for-sql-jars-kafka-0.10</id>
 						<phase>package</phase>
 						<goals>
 							<goal>copy</goal>
 						</goals>
 						<configuration>
-							<outputDirectory>${project.build.directory}/sql-jars</outputDirectory>
+							<outputDirectory>${project.build.directory}/sql-jars-kafka-0.10</outputDirectory>
 							<!-- List of currently provided SQL jars. 
 								When extending this list please also add a dependency
 								for the respective module. -->
@@ -196,6 +204,73 @@ under the License.
 					</execution>
 				</executions>
 			</plugin>
+
+			<!-- Copy SQL jars into dedicated "sql-jars-kafka" directory. -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-dependency-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>copy-for-sql-jars-kafka</id>
+						<phase>package</phase>
+						<goals>
+							<goal>copy</goal>
+						</goals>
+						<configuration>
+							<outputDirectory>${project.build.directory}/sql-jars-kafka</outputDirectory>
+							<!-- List of currently provided SQL jars.
+								When extending this list please also add a dependency
+								for the respective module. -->
+							<artifactItems>
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-avro</artifactId>
+									<version>${project.version}</version>
+									<classifier>sql-jar</classifier>
+									<type>jar</type>
+								</artifactItem>
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-json</artifactId>
+									<version>${project.version}</version>
+									<classifier>sql-jar</classifier>
+									<type>jar</type>
+								</artifactItem>
+								<!-- This SQL JAR is not used for now to avoid dependency conflicts; see FLINK-10107.
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-connector-kafka-0.9_${scala.binary.version}</artifactId>
+									<version>${project.version}</version>
+									<classifier>sql-jar</classifier>
+									<type>jar</type>
+								</artifactItem>-->
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
+									<version>${project.version}</version>
+									<classifier>sql-jar</classifier>
+									<type>jar</type>
+								</artifactItem>
+								<!-- This SQL JAR is not used for now to avoid dependency conflicts; see FLINK-10107.
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
+									<version>${project.version}</version>
+									<classifier>sql-jar</classifier>
+									<type>jar</type>
+								</artifactItem>-->
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
+									<version>${project.version}</version>
+									<classifier>sql-jar</classifier>
+									<type>jar</type>
+								</artifactItem>
+							</artifactItems>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
 		</plugins>
 	</build>
 </project>
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 7e15870f1d3..181ef6105c6 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -142,6 +142,8 @@ run_test "State TTL Heap backend end-to-end test" "$END_TO_END_DIR/test-scripts/
 run_test "State TTL RocksDb backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh rocks"
 
 run_test "SQL Client end-to-end test" "$END_TO_END_DIR/test-scripts/test_sql_client.sh"
+run_test "SQL Client end-to-end test for kafka 0.10" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka010.sh"
+run_test "SQL Client end-to-end test for modern Kafka" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka.sh"
 
 run_test "Heavy deployment end-to-end test" "$END_TO_END_DIR/test-scripts/test_heavy_deployment.sh"
 
diff --git a/flink-end-to-end-tests/run-pre-commit-tests.sh b/flink-end-to-end-tests/run-pre-commit-tests.sh
index c9fab304916..879d29993fb 100755
--- a/flink-end-to-end-tests/run-pre-commit-tests.sh
+++ b/flink-end-to-end-tests/run-pre-commit-tests.sh
@@ -48,6 +48,9 @@ echo "Flink distribution directory: $FLINK_DIR"
 # those checks are disabled, one should take care that a proper checks are performed in the tests itself that ensure that the test finished
 # in an expected state.
 
+run_test "SQL Client end-to-end test for kafka 0.10" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka010.sh"
+run_test "SQL Client end-to-end test for modern Kafka" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka.sh"
+run_test "SQL Client end-to-end test" "$END_TO_END_DIR/test-scripts/test_sql_client.sh"
 run_test "State Migration end-to-end test from 1.6" "$END_TO_END_DIR/test-scripts/test_state_migration.sh"
 run_test "State Evolution end-to-end test" "$END_TO_END_DIR/test-scripts/test_state_evolution.sh"
 run_test "Batch Python Wordcount end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_python_wordcount.sh"
diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
index b9d0ecaa0c6..25d8724ce41 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
@@ -19,17 +19,23 @@
 
 set -Eeuo pipefail
 
+KAFKA_CONNECTOR_VERSION="2.0"
+KAFKA_VERSION="2.0.0"
+CONFLUENT_VERSION="5.0.0"
+CONFLUENT_MAJOR_VERSION="5.0"
+LIB_DIR_NAME="sql-jars-kafka"
+
 source "$(dirname "$0")"/common.sh
-source "$(dirname "$0")"/kafka-common.sh 0.10.2.0 3.2.0 3.2
+source "$(dirname "$0")"/kafka-common.sh $KAFKA_VERSION $CONFLUENT_VERSION $CONFLUENT_MAJOR_VERSION
 source "$(dirname "$0")"/elasticsearch-common.sh
 
-SQL_TOOLBOX_JAR=$END_TO_END_DIR/flink-sql-client-test/target/SqlToolbox.jar
-SQL_JARS_DIR=$END_TO_END_DIR/flink-sql-client-test/target/sql-jars
-
 ################################################################################
 # Verify existing SQL jars
 ################################################################################
 
+SQL_TOOLBOX_JAR=$END_TO_END_DIR/flink-sql-client-test/target/SqlToolbox.jar
+SQL_JARS_DIR=$END_TO_END_DIR/flink-sql-client-test/target/$LIB_DIR_NAME
+
 EXTRACTED_JAR=$TEST_DATA_DIR/extracted
 
 mkdir -p $EXTRACTED_JAR
@@ -152,7 +158,7 @@ tables:
         type: ROW<type VARCHAR, message VARCHAR>
     connector:
       type: kafka
-      version: "0.10"
+      version: "$KAFKA_CONNECTOR_VERSION"
       topic: test-json
       startup-mode: earliest-offset
       properties:
@@ -185,72 +191,6 @@ tables:
             }
           }
         }
-  - name: AvroBothTable
-    type: source-sink-table
-    update-mode: append
-    schema:
-      - name: event_timestamp
-        type: VARCHAR
-      - name: user
-        type: VARCHAR
-      - name: message
-        type: VARCHAR
-      - name: duplicate_count
-        type: BIGINT
-    connector:
-      type: kafka
-      version: "0.10"
-      topic: test-avro
-      startup-mode: earliest-offset
-      properties:
-        - key: zookeeper.connect
-          value: localhost:2181
-        - key: bootstrap.servers
-          value: localhost:9092
-    format:
-      type: avro
-      avro-schema: >
-        {
-          "namespace": "org.apache.flink.table.tests",
-          "type": "record",
-          "name": "NormalizedEvent",
-            "fields": [
-              {"name": "event_timestamp", "type": "string"},
-              {"name": "user", "type": ["string", "null"]},
-              {"name": "message", "type": "string"},
-              {"name": "duplicate_count", "type": "long"}
-            ]
-        }
-  - name: CsvSinkTable
-    type: sink-table
-    update-mode: append
-    schema:
-      - name: event_timestamp
-        type: VARCHAR
-      - name: user
-        type: VARCHAR
-      - name: message
-        type: VARCHAR
-      - name: duplicate_count
-        type: BIGINT
-      - name: constant
-        type: VARCHAR
-    connector:
-      type: filesystem
-      path: $RESULT
-    format:
-      type: csv
-      fields:
-        - name: event_timestamp
-          type: VARCHAR
-        - name: user
-          type: VARCHAR
-        - name: message
-          type: VARCHAR
-        - name: duplicate_count
-          type: BIGINT
-        - name: constant
-          type: VARCHAR
   - name: ElasticsearchUpsertSinkTable
     type: sink-table
     update-mode: upsert
@@ -308,62 +248,6 @@ EOF
 
 # submit SQL statements
 
-echo "Executing SQL: Kafka JSON -> Kafka Avro"
-
-SQL_STATEMENT_1=$(cat << EOF
-INSERT INTO AvroBothTable
-  SELECT
-    CAST(TUMBLE_START(rowtime, INTERVAL '1' HOUR) AS VARCHAR) AS event_timestamp,
-    user,
-    RegReplace(event.message, ' is ', ' was ') AS message,
-    COUNT(*) AS duplicate_count
-  FROM JsonSourceTable
-  WHERE user IS NOT NULL
-  GROUP BY
-    user,
-    event.message,
-    TUMBLE(rowtime, INTERVAL '1' HOUR)
-EOF
-)
-
-echo "$SQL_STATEMENT_1"
-
-$FLINK_DIR/bin/sql-client.sh embedded \
-  --library $SQL_JARS_DIR \
-  --jar $SQL_TOOLBOX_JAR \
-  --environment $SQL_CONF \
-  --update "$SQL_STATEMENT_1"
-
-echo "Executing SQL: Kafka Avro -> Filesystem CSV"
-
-SQL_STATEMENT_2=$(cat << EOF
-INSERT INTO CsvSinkTable
-  SELECT AvroBothTable.*, RegReplace('Test constant folding.', 'Test', 'Success') AS constant
-  FROM AvroBothTable
-EOF
-)
-
-echo "$SQL_STATEMENT_2"
-
-$FLINK_DIR/bin/sql-client.sh embedded \
-  --library $SQL_JARS_DIR \
-  --jar $SQL_TOOLBOX_JAR \
-  --environment $SQL_CONF \
-  --update "$SQL_STATEMENT_2"
-
-echo "Waiting for CSV results..."
-for i in {1..10}; do
-  if [ -e $RESULT ]; then
-    CSV_LINE_COUNT=`cat $RESULT | wc -l`
-    if [ $((CSV_LINE_COUNT)) -eq 4 ]; then
-      break
-    fi
-  fi
-  sleep 5
-done
-
-check_result_hash "SQL Client Kafka" $RESULT "0a1bf8bf716069b7269f575f87a802c0"
-
 echo "Executing SQL: Values -> Elasticsearch (upsert)"
 
 SQL_STATEMENT_3=$(cat << EOF
diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh
new file mode 100755
index 00000000000..e0594432be6
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka.sh
@@ -0,0 +1,22 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+set -Eeuo pipefail
+
+source "$(dirname "$0")"/test_sql_client_kafka_common.sh 2.0 2.0.0 5.0.0 5.0 sql-jars-kafka
diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka010.sh b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka010.sh
new file mode 100755
index 00000000000..c686ffbfb80
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka010.sh
@@ -0,0 +1,22 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+set -Eeuo pipefail
+
+source "$(dirname "$0")"/test_sql_client_kafka_common.sh 0.10 0.10.2.0 3.2.0 3.2 sql-jars-kafka-0.10
diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client_kafka_common.sh b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka_common.sh
new file mode 100755
index 00000000000..7156c3e5993
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client_kafka_common.sh
@@ -0,0 +1,310 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+set -Eeuo pipefail
+
+KAFKA_CONNECTOR_VERSION="$1"
+KAFKA_VERSION="$2"
+CONFLUENT_VERSION="$3"
+CONFLUENT_MAJOR_VERSION="$4"
+LIB_DIR_NAME="$5"
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/kafka-common.sh $KAFKA_VERSION $CONFLUENT_VERSION $CONFLUENT_MAJOR_VERSION
+
+################################################################################
+# Verify existing SQL jars
+################################################################################
+
+SQL_TOOLBOX_JAR=$END_TO_END_DIR/flink-sql-client-test/target/SqlToolbox.jar
+SQL_JARS_DIR=$END_TO_END_DIR/flink-sql-client-test/target/$LIB_DIR_NAME
+
+EXTRACTED_JAR=$TEST_DATA_DIR/extracted
+
+mkdir -p $EXTRACTED_JAR
+
+for SQL_JAR in $SQL_JARS_DIR/*.jar; do
+  echo "Checking SQL JAR: $SQL_JAR"
+  (cd $EXTRACTED_JAR && jar xf $SQL_JAR)
+
+  # check for proper shading
+  for EXTRACTED_FILE in $(find $EXTRACTED_JAR -type f); do
+
+    if ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/org/apache/flink"* ]] && \
+        ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/META-INF"* ]] && \
+        ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/LICENSE"* ]] && \
+        ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/NOTICE"* ]] ; then
+      echo "Bad file in JAR: $EXTRACTED_FILE"
+      exit 1
+    fi
+  done
+
+  # check for proper factory
+  if [ ! -f $EXTRACTED_JAR/META-INF/services/org.apache.flink.table.factories.TableFactory ]; then
+    echo "No table factory found in JAR: $SQL_JAR"
+    exit 1
+  fi
+
+  # clean up
+  rm -r $EXTRACTED_JAR/*
+done
+
+rm -r $EXTRACTED_JAR
+
+################################################################################
+# Prepare connectors
+################################################################################
+
+function sql_cleanup() {
+  # don't call ourselves again for another signal interruption
+  trap "exit -1" INT
+  # don't call ourselves again for normal exit
+  trap "" EXIT
+
+  stop_kafka_cluster
+}
+trap sql_cleanup INT
+trap sql_cleanup EXIT
+
+# prepare Kafka
+echo "Preparing Kafka..."
+
+setup_kafka_dist
+
+start_kafka_cluster
+
+create_kafka_topic 1 1 test-json
+create_kafka_topic 1 1 test-avro
+
+# put JSON data into Kafka
+echo "Sending messages to Kafka..."
+
+send_messages_to_kafka '{"timestamp": "2018-03-12 08:00:00", "user": "Alice", "event": { "type": "WARNING", "message": "This is a warning."}}' test-json
+# duplicate
+send_messages_to_kafka '{"timestamp": "2018-03-12 08:10:00", "user": "Alice", "event": { "type": "WARNING", "message": "This is a warning."}}' test-json
+send_messages_to_kafka '{"timestamp": "2018-03-12 09:00:00", "user": "Bob", "event": { "type": "WARNING", "message": "This is another warning."}}' test-json
+send_messages_to_kafka '{"timestamp": "2018-03-12 09:10:00", "user": "Alice", "event": { "type": "INFO", "message": "This is a info."}}' test-json
+send_messages_to_kafka '{"timestamp": "2018-03-12 09:20:00", "user": "Steve", "event": { "type": "INFO", "message": "This is another info."}}' test-json
+# duplicate
+send_messages_to_kafka '{"timestamp": "2018-03-12 09:30:00", "user": "Steve", "event": { "type": "INFO", "message": "This is another info."}}' test-json
+# filtered in results
+send_messages_to_kafka '{"timestamp": "2018-03-12 09:30:00", "user": null, "event": { "type": "WARNING", "message": "This is a bad message because the user is missing."}}' test-json
+# pending in results
+send_messages_to_kafka '{"timestamp": "2018-03-12 10:40:00", "user": "Bob", "event": { "type": "ERROR", "message": "This is an error."}}' test-json
+
+################################################################################
+# Run a SQL statement
+################################################################################
+
+echo "Testing SQL statement..."
+
+# prepare Flink
+echo "Preparing Flink..."
+
+start_cluster
+start_taskmanagers 2
+
+# create session environment file
+RESULT=$TEST_DATA_DIR/result
+SQL_CONF=$TEST_DATA_DIR/sql-client-session.conf
+
+cat > $SQL_CONF << EOF
+tables:
+  - name: JsonSourceTable
+    type: source-table
+    update-mode: append
+    schema:
+      - name: rowtime
+        type: TIMESTAMP
+        rowtime:
+          timestamps:
+            type: from-field
+            from: timestamp
+          watermarks:
+            type: periodic-bounded
+            delay: 2000
+      - name: user
+        type: VARCHAR
+      - name: event
+        type: ROW<type VARCHAR, message VARCHAR>
+    connector:
+      type: kafka
+      version: "$KAFKA_CONNECTOR_VERSION"
+      topic: test-json
+      startup-mode: earliest-offset
+      properties:
+        - key: zookeeper.connect
+          value: localhost:2181
+        - key: bootstrap.servers
+          value: localhost:9092
+    format:
+      type: json
+      json-schema: >
+        {
+          "type": "object",
+          "properties": {
+            "timestamp": {
+              "type": "string"
+            },
+            "user": {
+              "type": ["string", "null"]
+            },
+            "event": {
+              "type": "object",
+              "properties": {
+                "type": {
+                  "type": "string"
+                },
+                "message": {
+                  "type": "string"
+                }
+              }
+            }
+          }
+        }
+  - name: AvroBothTable
+    type: source-sink-table
+    update-mode: append
+    schema:
+      - name: event_timestamp
+        type: VARCHAR
+      - name: user
+        type: VARCHAR
+      - name: message
+        type: VARCHAR
+      - name: duplicate_count
+        type: BIGINT
+    connector:
+      type: kafka
+      version: "$KAFKA_CONNECTOR_VERSION"
+      topic: test-avro
+      startup-mode: earliest-offset
+      properties:
+        - key: zookeeper.connect
+          value: localhost:2181
+        - key: bootstrap.servers
+          value: localhost:9092
+    format:
+      type: avro
+      avro-schema: >
+        {
+          "namespace": "org.apache.flink.table.tests",
+          "type": "record",
+          "name": "NormalizedEvent",
+            "fields": [
+              {"name": "event_timestamp", "type": "string"},
+              {"name": "user", "type": ["string", "null"]},
+              {"name": "message", "type": "string"},
+              {"name": "duplicate_count", "type": "long"}
+            ]
+        }
+  - name: CsvSinkTable
+    type: sink-table
+    update-mode: append
+    schema:
+      - name: event_timestamp
+        type: VARCHAR
+      - name: user
+        type: VARCHAR
+      - name: message
+        type: VARCHAR
+      - name: duplicate_count
+        type: BIGINT
+      - name: constant
+        type: VARCHAR
+    connector:
+      type: filesystem
+      path: $RESULT
+    format:
+      type: csv
+      fields:
+        - name: event_timestamp
+          type: VARCHAR
+        - name: user
+          type: VARCHAR
+        - name: message
+          type: VARCHAR
+        - name: duplicate_count
+          type: BIGINT
+        - name: constant
+          type: VARCHAR
+
+functions:
+  - name: RegReplace
+    from: class
+    class: org.apache.flink.table.toolbox.StringRegexReplaceFunction
+EOF
+
+# submit SQL statements
+
+echo "Executing SQL: Kafka JSON -> Kafka Avro"
+
+SQL_STATEMENT_1=$(cat << EOF
+INSERT INTO AvroBothTable
+  SELECT
+    CAST(TUMBLE_START(rowtime, INTERVAL '1' HOUR) AS VARCHAR) AS event_timestamp,
+    user,
+    RegReplace(event.message, ' is ', ' was ') AS message,
+    COUNT(*) AS duplicate_count
+  FROM JsonSourceTable
+  WHERE user IS NOT NULL
+  GROUP BY
+    user,
+    event.message,
+    TUMBLE(rowtime, INTERVAL '1' HOUR)
+EOF
+)
+
+echo "$SQL_STATEMENT_1"
+
+$FLINK_DIR/bin/sql-client.sh embedded \
+  --library $SQL_JARS_DIR \
+  --jar $SQL_TOOLBOX_JAR \
+  --environment $SQL_CONF \
+  --update "$SQL_STATEMENT_1"
+
+echo "Executing SQL: Kafka Avro -> Filesystem CSV"
+
+SQL_STATEMENT_2=$(cat << EOF
+INSERT INTO CsvSinkTable
+  SELECT AvroBothTable.*, RegReplace('Test constant folding.', 'Test', 'Success') AS constant
+  FROM AvroBothTable
+EOF
+)
+
+echo "$SQL_STATEMENT_2"
+
+$FLINK_DIR/bin/sql-client.sh embedded \
+  --library $SQL_JARS_DIR \
+  --jar $SQL_TOOLBOX_JAR \
+  --environment $SQL_CONF \
+  --update "$SQL_STATEMENT_2"
+
+echo "Waiting for CSV results..."
+for i in {1..10}; do
+  if [ -e $RESULT ]; then
+    CSV_LINE_COUNT=`cat $RESULT | wc -l`
+    if [ $((CSV_LINE_COUNT)) -eq 4 ]; then
+      break
+    fi
+  fi
+  sleep 5
+done
+
+check_result_hash "SQL Client Kafka" $RESULT "0a1bf8bf716069b7269f575f87a802c0"


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services