You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2020/08/24 11:35:49 UTC

[flink] branch master updated: [FLINK-18948][python][e2e] Add E2E test for Python DataStream API

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new fa2739d  [FLINK-18948][python][e2e] Add E2E test for Python DataStream API
fa2739d is described below

commit fa2739d691b0998ab2b83adad147f24d982d8476
Author: acqua.csq <ac...@alibaba-inc.com>
AuthorDate: Fri Aug 21 15:16:21 2020 +0800

    [FLINK-18948][python][e2e] Add E2E test for Python DataStream API
    
    This closes #13206.
---
 .../python/datastream/__init__.py                  |  17 ++
 .../python/datastream/data_stream_job.py           |  59 +++++++
 .../python/datastream/functions.py                 |  40 +++++
 flink-end-to-end-tests/run-nightly-tests.sh        |   3 +-
 .../test-scripts/test_pyflink_datastream.sh        | 179 +++++++++++++++++++++
 .../{test_pyflink.sh => test_pyflink_table.sh}     |   0
 6 files changed, 297 insertions(+), 1 deletion(-)

diff --git a/flink-end-to-end-tests/flink-python-test/python/datastream/__init__.py b/flink-end-to-end-tests/flink-python-test/python/datastream/__init__.py
new file mode 100644
index 0000000..65b48d4
--- /dev/null
+++ b/flink-end-to-end-tests/flink-python-test/python/datastream/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
diff --git a/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py b/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
new file mode 100644
index 0000000..798a77d
--- /dev/null
+++ b/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
@@ -0,0 +1,59 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pyflink.common.serialization import JsonRowSerializationSchema, \
+    JsonRowDeserializationSchema
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
+
+from functions import m_flat_map, add_one
+
+
+def python_data_stream_example():
+    env = StreamExecutionEnvironment.get_execution_environment()
+
+    source_type_info = Types.ROW([Types.STRING(), Types.INT()])
+    json_row_deserialization_schema = JsonRowDeserializationSchema.builder()\
+        .type_info(source_type_info).build()
+    source_topic = 'test-python-data-stream-source'
+    consumer_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'pyflink-e2e-source'}
+    kafka_consumer_1 = FlinkKafkaConsumer(source_topic, json_row_deserialization_schema,
+                                          consumer_props)
+    kafka_consumer_1.set_start_from_earliest()
+    source_stream_1 = env.add_source(kafka_consumer_1).name('kafka source 1')
+    mapped_type_info = Types.ROW([Types.STRING(), Types.INT(), Types.INT()])
+
+    keyed_stream = source_stream_1.map(add_one, output_type=mapped_type_info) \
+        .key_by(lambda x: x[2])
+
+    flat_mapped_stream = keyed_stream.flat_map(m_flat_map, result_type=mapped_type_info)
+    flat_mapped_stream.name("flat-map").set_parallelism(3)
+
+    sink_topic = 'test-python-data-stream-sink'
+    producer_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'pyflink-e2e-1'}
+    json_row_serialization_schema = JsonRowSerializationSchema.builder()\
+        .with_type_info(mapped_type_info).build()
+    kafka_producer = FlinkKafkaProducer(topic=sink_topic, producer_config=producer_props,
+                                        serialization_schema=json_row_serialization_schema)
+    flat_mapped_stream.add_sink(kafka_producer)
+    env.execute_async("test data stream to kafka")
+
+
+if __name__ == '__main__':
+    python_data_stream_example()
diff --git a/flink-end-to-end-tests/flink-python-test/python/datastream/functions.py b/flink-end-to-end-tests/flink-python-test/python/datastream/functions.py
new file mode 100644
index 0000000..228405e
--- /dev/null
+++ b/flink-end-to-end-tests/flink-python-test/python/datastream/functions.py
@@ -0,0 +1,40 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pyflink.datastream.functions import CoMapFunction
+
+
+def str_len(value):
+    return value[0], len(value[0]), value[1]
+
+
+def add_one(value):
+    return value[0], value[1] + 1, value[1]
+
+
+def m_flat_map(value):
+    for i in range(value[1]):
+        yield value[0], i, value[2]
+
+
+class MyCoMapFunction(CoMapFunction):
+    def map1(self, value):
+        return str_len(value)
+
+    def map2(self, value):
+        return add_one(value)
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index d879af3..002987f 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -217,7 +217,8 @@ run_test "Dependency shading of table modules test" "$END_TO_END_DIR/test-script
 run_test "Shaded Hadoop S3A with credentials provider end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_wordcount.sh hadoop_with_provider"
 
 if [[ `uname -i` != 'aarch64' ]]; then
-    run_test "PyFlink end-to-end test" "$END_TO_END_DIR/test-scripts/test_pyflink.sh" "skip_check_exceptions"
+    run_test "PyFlink Table end-to-end test" "$END_TO_END_DIR/test-scripts/test_pyflink_table.sh" "skip_check_exceptions"
+    run_test "PyFlink DataStream end-to-end test" "$END_TO_END_DIR/test-scripts/test_pyflink_datastream.sh" "skip_check_exceptions"
 fi
 
 ################################################################################
diff --git a/flink-end-to-end-tests/test-scripts/test_pyflink_datastream.sh b/flink-end-to-end-tests/test-scripts/test_pyflink_datastream.sh
new file mode 100755
index 0000000..3b734c3
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_pyflink_datastream.sh
@@ -0,0 +1,179 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+set -Eeuo pipefail
+
+KAFKA_VERSION="2.2.0"
+CONFLUENT_VERSION="5.0.0"
+CONFLUENT_MAJOR_VERSION="5.0"
+KAFKA_SQL_VERSION="universal"
+SQL_JARS_DIR=$END_TO_END_DIR/flink-sql-client-test/target/sql-jars
+KAFKA_SQL_JAR=$(find "$SQL_JARS_DIR" | grep "kafka_" )
+
+function create_data_stream_kafka_source {
+    topicName="test-python-data-stream-source"
+    create_kafka_topic 1 1 $topicName
+
+    echo "Sending messages to Kafka..."
+
+    send_messages_to_kafka '{"f0": "a", "f1": 1}' $topicName
+    send_messages_to_kafka '{"f0": "ab", "f1": 2}' $topicName
+    send_messages_to_kafka '{"f0": "abc", "f1": 3}' $topicName
+    send_messages_to_kafka '{"f0": "abcd", "f1": 4}' $topicName
+    send_messages_to_kafka '{"f0": "abcde", "f1": 5}' $topicName
+}
+
+function sort_msg {
+    arr=()
+    while read line
+    do
+        value=$line
+        arr+=("$value")
+    done <<< "$1"
+    IFS=$'\n' sorted=($(sort <<< "${arr[*]}")); unset IFS
+    echo "${sorted[*]}"
+}
+
+function test_clean_up {
+    stop_cluster
+    stop_kafka_cluster
+}
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/kafka_sql_common.sh \
+  $KAFKA_VERSION \
+  $CONFLUENT_VERSION \
+  $CONFLUENT_MAJOR_VERSION \
+  $KAFKA_SQL_VERSION
+
+
+echo "Preparing Flink..."
+
+CURRENT_DIR=`cd "$(dirname "$0")" && pwd -P`
+source "${CURRENT_DIR}"/common.sh
+
+cp -r "${FLINK_DIR}/conf" "${TEST_DATA_DIR}/conf"
+
+echo "taskmanager.memory.task.off-heap.size: 768m" >> "${TEST_DATA_DIR}/conf/flink-conf.yaml"
+echo "taskmanager.memory.process.size: 3172m" >> "${TEST_DATA_DIR}/conf/flink-conf.yaml"
+echo "taskmanager.numberOfTaskSlots: 5" >> "${TEST_DATA_DIR}/conf/flink-conf.yaml"
+
+export FLINK_CONF_DIR="${TEST_DATA_DIR}/conf"
+
+FLINK_PYTHON_DIR=`cd "${CURRENT_DIR}/../../flink-python" && pwd -P`
+
+CONDA_HOME="${FLINK_PYTHON_DIR}/dev/.conda"
+
+"${FLINK_PYTHON_DIR}/dev/lint-python.sh" -s basic
+
+PYTHON_EXEC="${CONDA_HOME}/bin/python"
+
+source "${CONDA_HOME}/bin/activate"
+
+cd "${FLINK_PYTHON_DIR}"
+
+rm -rf dist
+
+python setup.py sdist
+
+pip install dist/*
+
+cd dev
+
+conda install -y -q zip=3.0
+
+rm -rf .conda/pkgs
+
+zip -q -r "${TEST_DATA_DIR}/venv.zip" .conda
+
+deactivate
+
+cd "${CURRENT_DIR}"
+
+start_cluster
+
+on_exit test_clean_up
+
+# prepare Kafka
+echo "Preparing Kafka..."
+
+setup_kafka_dist
+
+start_kafka_cluster
+
+create_data_stream_kafka_source
+
+create_kafka_topic 1 1 test-python-data-stream-sink
+
+FLINK_PYTHON_TEST_DIR=`cd "${CURRENT_DIR}/../flink-python-test" && pwd -P`
+REQUIREMENTS_PATH="${TEST_DATA_DIR}/requirements.txt"
+
+# Never used but added to test the '-pyreq' option.
+echo "scipy==1.4.1" > "${REQUIREMENTS_PATH}"
+
+echo "Test PyFlink DataStream job:"
+PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC}
+
+JOB_ID=$(${FLINK_DIR}/bin/flink run \
+    -pyfs "${FLINK_PYTHON_TEST_DIR}/python/datastream" \
+    -pyreq "${REQUIREMENTS_PATH}" \
+    -pyarch "${TEST_DATA_DIR}/venv.zip" \
+    -pyexec "venv.zip/.conda/bin/python" \
+    -pym "data_stream_job" \
+    -j "${KAFKA_SQL_JAR}")
+
+echo "${JOB_ID}"
+JOB_ID=`echo "${JOB_ID}" | sed 's/.* //g'`
+
+echo "Reading kafka messages..."
+READ_MSG=$(read_messages_from_kafka 20 test-python-data-stream-sink pyflink-e2e-test)
+
+# We use env.execute_async() to submit the job, cancel it after fetched results.
+cancel_job "${JOB_ID}"
+
+EXPECTED_MSG='{"f0":"a","f1":0,"f2":1}
+{"f0":"a","f1":1,"f2":1}
+{"f0":"ab","f1":0,"f2":2}
+{"f0":"ab","f1":1,"f2":2}
+{"f0":"ab","f1":2,"f2":2}
+{"f0":"abc","f1":0,"f2":3}
+{"f0":"abc","f1":1,"f2":3}
+{"f0":"abc","f1":2,"f2":3}
+{"f0":"abc","f1":3,"f2":3}
+{"f0":"abcde","f1":0,"f2":5}
+{"f0":"abcde","f1":1,"f2":5}
+{"f0":"abcde","f1":2,"f2":5}
+{"f0":"abcde","f1":3,"f2":5}
+{"f0":"abcde","f1":4,"f2":5}
+{"f0":"abcde","f1":5,"f2":5}
+{"f0":"abcd","f1":0,"f2":4}
+{"f0":"abcd","f1":1,"f2":4}
+{"f0":"abcd","f1":2,"f2":4}
+{"f0":"abcd","f1":3,"f2":4}
+{"f0":"abcd","f1":4,"f2":4}'
+
+EXPECTED_MSG=$(sort_msg "${EXPECTED_MSG[*]}")
+SORTED_READ_MSG=$(sort_msg "${READ_MSG[*]}")
+
+if [[ "${EXPECTED_MSG[*]}" != "${SORTED_READ_MSG[*]}" ]]; then
+    echo "Output from Flink program does not match expected output."
+    echo -e "EXPECTED Output: --${EXPECTED_MSG[*]}--"
+    echo -e "ACTUAL: --${SORTED_READ_MSG[*]}--"
+    exit 1
+fi
diff --git a/flink-end-to-end-tests/test-scripts/test_pyflink.sh b/flink-end-to-end-tests/test-scripts/test_pyflink_table.sh
similarity index 100%
rename from flink-end-to-end-tests/test-scripts/test_pyflink.sh
rename to flink-end-to-end-tests/test-scripts/test_pyflink_table.sh