You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2020/08/24 11:35:49 UTC
[flink] branch master updated: [FLINK-18948][python][e2e] Add E2E
test for Python DataStream API
This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new fa2739d [FLINK-18948][python][e2e] Add E2E test for Python DataStream API
fa2739d is described below
commit fa2739d691b0998ab2b83adad147f24d982d8476
Author: acqua.csq <ac...@alibaba-inc.com>
AuthorDate: Fri Aug 21 15:16:21 2020 +0800
[FLINK-18948][python][e2e] Add E2E test for Python DataStream API
This closes #13206.
---
.../python/datastream/__init__.py | 17 ++
.../python/datastream/data_stream_job.py | 59 +++++++
.../python/datastream/functions.py | 40 +++++
flink-end-to-end-tests/run-nightly-tests.sh | 3 +-
.../test-scripts/test_pyflink_datastream.sh | 179 +++++++++++++++++++++
.../{test_pyflink.sh => test_pyflink_table.sh} | 0
6 files changed, 297 insertions(+), 1 deletion(-)
diff --git a/flink-end-to-end-tests/flink-python-test/python/datastream/__init__.py b/flink-end-to-end-tests/flink-python-test/python/datastream/__init__.py
new file mode 100644
index 0000000..65b48d4
--- /dev/null
+++ b/flink-end-to-end-tests/flink-python-test/python/datastream/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
diff --git a/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py b/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
new file mode 100644
index 0000000..798a77d
--- /dev/null
+++ b/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
@@ -0,0 +1,59 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pyflink.common.serialization import JsonRowSerializationSchema, \
+ JsonRowDeserializationSchema
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
+
+from functions import m_flat_map, add_one
+
+
+def python_data_stream_example():
+ env = StreamExecutionEnvironment.get_execution_environment()
+
+ source_type_info = Types.ROW([Types.STRING(), Types.INT()])
+ json_row_deserialization_schema = JsonRowDeserializationSchema.builder()\
+ .type_info(source_type_info).build()
+ source_topic = 'test-python-data-stream-source'
+ consumer_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'pyflink-e2e-source'}
+ kafka_consumer_1 = FlinkKafkaConsumer(source_topic, json_row_deserialization_schema,
+ consumer_props)
+ kafka_consumer_1.set_start_from_earliest()
+ source_stream_1 = env.add_source(kafka_consumer_1).name('kafka source 1')
+ mapped_type_info = Types.ROW([Types.STRING(), Types.INT(), Types.INT()])
+
+ keyed_stream = source_stream_1.map(add_one, output_type=mapped_type_info) \
+ .key_by(lambda x: x[2])
+
+ flat_mapped_stream = keyed_stream.flat_map(m_flat_map, result_type=mapped_type_info)
+ flat_mapped_stream.name("flat-map").set_parallelism(3)
+
+ sink_topic = 'test-python-data-stream-sink'
+ producer_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'pyflink-e2e-1'}
+ json_row_serialization_schema = JsonRowSerializationSchema.builder()\
+ .with_type_info(mapped_type_info).build()
+ kafka_producer = FlinkKafkaProducer(topic=sink_topic, producer_config=producer_props,
+ serialization_schema=json_row_serialization_schema)
+ flat_mapped_stream.add_sink(kafka_producer)
+ env.execute_async("test data stream to kafka")
+
+
+if __name__ == '__main__':
+ python_data_stream_example()
diff --git a/flink-end-to-end-tests/flink-python-test/python/datastream/functions.py b/flink-end-to-end-tests/flink-python-test/python/datastream/functions.py
new file mode 100644
index 0000000..228405e
--- /dev/null
+++ b/flink-end-to-end-tests/flink-python-test/python/datastream/functions.py
@@ -0,0 +1,40 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pyflink.datastream.functions import CoMapFunction
+
+
+def str_len(value):
+ return value[0], len(value[0]), value[1]
+
+
+def add_one(value):
+ return value[0], value[1] + 1, value[1]
+
+
+def m_flat_map(value):
+ for i in range(value[1]):
+ yield value[0], i, value[2]
+
+
+class MyCoMapFunction(CoMapFunction):
+ def map1(self, value):
+ return str_len(value)
+
+ def map2(self, value):
+ return add_one(value)
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index d879af3..002987f 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -217,7 +217,8 @@ run_test "Dependency shading of table modules test" "$END_TO_END_DIR/test-script
run_test "Shaded Hadoop S3A with credentials provider end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_wordcount.sh hadoop_with_provider"
if [[ `uname -i` != 'aarch64' ]]; then
- run_test "PyFlink end-to-end test" "$END_TO_END_DIR/test-scripts/test_pyflink.sh" "skip_check_exceptions"
+ run_test "PyFlink Table end-to-end test" "$END_TO_END_DIR/test-scripts/test_pyflink_table.sh" "skip_check_exceptions"
+ run_test "PyFlink DataStream end-to-end test" "$END_TO_END_DIR/test-scripts/test_pyflink_datastream.sh" "skip_check_exceptions"
fi
################################################################################
diff --git a/flink-end-to-end-tests/test-scripts/test_pyflink_datastream.sh b/flink-end-to-end-tests/test-scripts/test_pyflink_datastream.sh
new file mode 100755
index 0000000..3b734c3
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_pyflink_datastream.sh
@@ -0,0 +1,179 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+set -Eeuo pipefail
+
+KAFKA_VERSION="2.2.0"
+CONFLUENT_VERSION="5.0.0"
+CONFLUENT_MAJOR_VERSION="5.0"
+KAFKA_SQL_VERSION="universal"
+SQL_JARS_DIR=$END_TO_END_DIR/flink-sql-client-test/target/sql-jars
+KAFKA_SQL_JAR=$(find "$SQL_JARS_DIR" | grep "kafka_" )
+
+function create_data_stream_kafka_source {
+ topicName="test-python-data-stream-source"
+ create_kafka_topic 1 1 $topicName
+
+ echo "Sending messages to Kafka..."
+
+ send_messages_to_kafka '{"f0": "a", "f1": 1}' $topicName
+ send_messages_to_kafka '{"f0": "ab", "f1": 2}' $topicName
+ send_messages_to_kafka '{"f0": "abc", "f1": 3}' $topicName
+ send_messages_to_kafka '{"f0": "abcd", "f1": 4}' $topicName
+ send_messages_to_kafka '{"f0": "abcde", "f1": 5}' $topicName
+}
+
+function sort_msg {
+ arr=()
+ while read line
+ do
+ value=$line
+ arr+=("$value")
+ done <<< "$1"
+ IFS=$'\n' sorted=($(sort <<< "${arr[*]}")); unset IFS
+ echo "${sorted[*]}"
+}
+
+function test_clean_up {
+ stop_cluster
+ stop_kafka_cluster
+}
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/kafka_sql_common.sh \
+ $KAFKA_VERSION \
+ $CONFLUENT_VERSION \
+ $CONFLUENT_MAJOR_VERSION \
+ $KAFKA_SQL_VERSION
+
+
+echo "Preparing Flink..."
+
+CURRENT_DIR=`cd "$(dirname "$0")" && pwd -P`
+source "${CURRENT_DIR}"/common.sh
+
+cp -r "${FLINK_DIR}/conf" "${TEST_DATA_DIR}/conf"
+
+echo "taskmanager.memory.task.off-heap.size: 768m" >> "${TEST_DATA_DIR}/conf/flink-conf.yaml"
+echo "taskmanager.memory.process.size: 3172m" >> "${TEST_DATA_DIR}/conf/flink-conf.yaml"
+echo "taskmanager.numberOfTaskSlots: 5" >> "${TEST_DATA_DIR}/conf/flink-conf.yaml"
+
+export FLINK_CONF_DIR="${TEST_DATA_DIR}/conf"
+
+FLINK_PYTHON_DIR=`cd "${CURRENT_DIR}/../../flink-python" && pwd -P`
+
+CONDA_HOME="${FLINK_PYTHON_DIR}/dev/.conda"
+
+"${FLINK_PYTHON_DIR}/dev/lint-python.sh" -s basic
+
+PYTHON_EXEC="${CONDA_HOME}/bin/python"
+
+source "${CONDA_HOME}/bin/activate"
+
+cd "${FLINK_PYTHON_DIR}"
+
+rm -rf dist
+
+python setup.py sdist
+
+pip install dist/*
+
+cd dev
+
+conda install -y -q zip=3.0
+
+rm -rf .conda/pkgs
+
+zip -q -r "${TEST_DATA_DIR}/venv.zip" .conda
+
+deactivate
+
+cd "${CURRENT_DIR}"
+
+start_cluster
+
+on_exit test_clean_up
+
+# prepare Kafka
+echo "Preparing Kafka..."
+
+setup_kafka_dist
+
+start_kafka_cluster
+
+create_data_stream_kafka_source
+
+create_kafka_topic 1 1 test-python-data-stream-sink
+
+FLINK_PYTHON_TEST_DIR=`cd "${CURRENT_DIR}/../flink-python-test" && pwd -P`
+REQUIREMENTS_PATH="${TEST_DATA_DIR}/requirements.txt"
+
+# Never used but added to test the '-pyreq' option.
+echo "scipy==1.4.1" > "${REQUIREMENTS_PATH}"
+
+echo "Test PyFlink DataStream job:"
+PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC}
+
+JOB_ID=$(${FLINK_DIR}/bin/flink run \
+ -pyfs "${FLINK_PYTHON_TEST_DIR}/python/datastream" \
+ -pyreq "${REQUIREMENTS_PATH}" \
+ -pyarch "${TEST_DATA_DIR}/venv.zip" \
+ -pyexec "venv.zip/.conda/bin/python" \
+ -pym "data_stream_job" \
+ -j "${KAFKA_SQL_JAR}")
+
+echo "${JOB_ID}"
+JOB_ID=`echo "${JOB_ID}" | sed 's/.* //g'`
+
+echo "Reading kafka messages..."
+READ_MSG=$(read_messages_from_kafka 20 test-python-data-stream-sink pyflink-e2e-test)
+
+# We use env.execute_async() to submit the job, cancel it after fetched results.
+cancel_job "${JOB_ID}"
+
+EXPECTED_MSG='{"f0":"a","f1":0,"f2":1}
+{"f0":"a","f1":1,"f2":1}
+{"f0":"ab","f1":0,"f2":2}
+{"f0":"ab","f1":1,"f2":2}
+{"f0":"ab","f1":2,"f2":2}
+{"f0":"abc","f1":0,"f2":3}
+{"f0":"abc","f1":1,"f2":3}
+{"f0":"abc","f1":2,"f2":3}
+{"f0":"abc","f1":3,"f2":3}
+{"f0":"abcde","f1":0,"f2":5}
+{"f0":"abcde","f1":1,"f2":5}
+{"f0":"abcde","f1":2,"f2":5}
+{"f0":"abcde","f1":3,"f2":5}
+{"f0":"abcde","f1":4,"f2":5}
+{"f0":"abcde","f1":5,"f2":5}
+{"f0":"abcd","f1":0,"f2":4}
+{"f0":"abcd","f1":1,"f2":4}
+{"f0":"abcd","f1":2,"f2":4}
+{"f0":"abcd","f1":3,"f2":4}
+{"f0":"abcd","f1":4,"f2":4}'
+
+EXPECTED_MSG=$(sort_msg "${EXPECTED_MSG[*]}")
+SORTED_READ_MSG=$(sort_msg "${READ_MSG[*]}")
+
+if [[ "${EXPECTED_MSG[*]}" != "${SORTED_READ_MSG[*]}" ]]; then
+ echo "Output from Flink program does not match expected output."
+ echo -e "EXPECTED Output: --${EXPECTED_MSG[*]}--"
+ echo -e "ACTUAL: --${SORTED_READ_MSG[*]}--"
+ exit 1
+fi
diff --git a/flink-end-to-end-tests/test-scripts/test_pyflink.sh b/flink-end-to-end-tests/test-scripts/test_pyflink_table.sh
similarity index 100%
rename from flink-end-to-end-tests/test-scripts/test_pyflink.sh
rename to flink-end-to-end-tests/test-scripts/test_pyflink_table.sh