You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2020/06/10 11:28:21 UTC
[flink] branch master updated: [FLINK-18218][python][e2e] Add
PyFlink YARN per-job e2e tests
This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 70bfb61 [FLINK-18218][python][e2e] Add PyFlink YARN per-job e2e tests
70bfb61 is described below
commit 70bfb61a48b1d1f5afce0ebc0f58a786e8013e29
Author: Wei Zhong <we...@gmail.com>
AuthorDate: Tue Jun 9 21:02:46 2020 +0800
[FLINK-18218][python][e2e] Add PyFlink YARN per-job e2e tests
This closes #12554.
---
.../flink-python-test/python/python_job.py | 13 +++--
.../org/apache/flink/python/tests/util/AddOne.java | 29 +++++++++++
.../test-scripts/test_pyflink.sh | 58 +++++++++++++++++++++-
flink-python/dev/lint-python.sh | 4 +-
4 files changed, 97 insertions(+), 7 deletions(-)
diff --git a/flink-end-to-end-tests/flink-python-test/python/python_job.py b/flink-end-to-end-tests/flink-python-test/python/python_job.py
index 8df2d12..a85e633 100644
--- a/flink-end-to-end-tests/flink-python-test/python/python_job.py
+++ b/flink-end-to-end-tests/flink-python-test/python/python_job.py
@@ -38,6 +38,11 @@ def word_count():
env = ExecutionEnvironment.get_execution_environment()
t_env = BatchTableEnvironment.create(env, t_config)
+ # used to test pipeline.jars and pipleline.classpaths
+ config_key = sys.argv[1]
+ config_value = sys.argv[2]
+ t_env.get_config().get_configuration().set_string(config_key, config_value)
+
# register Results table in table environment
tmp_dir = tempfile.gettempdir()
result_path = tmp_dir + '/result'
@@ -55,7 +60,8 @@ def word_count():
sink_ddl = """
create table Results(
word VARCHAR,
- `count` BIGINT
+ `count` BIGINT,
+ `count_java` BIGINT
) with (
'connector.type' = 'filesystem',
'format.type' = 'csv',
@@ -65,12 +71,13 @@ def word_count():
t_env.sql_update(sink_ddl)
t_env.sql_update("create temporary system function add_one as 'add_one.add_one' language python")
+ t_env.register_java_function("add_one_java", "org.apache.flink.python.tests.util.AddOne")
elements = [(word, 0) for word in content.split(" ")]
t_env.from_elements(elements, ["word", "count"]) \
- .select("word, add_one(count) as count") \
+ .select("word, add_one(count) as count, add_one_java(count) as count_java") \
.group_by("word") \
- .select("word, count(count) as count") \
+ .select("word, count(count) as count, count(count_java) as count_java") \
.insert_into("Results")
t_env.execute("word_count")
diff --git a/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/util/AddOne.java b/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/util/AddOne.java
new file mode 100644
index 0000000..f0b0cc0
--- /dev/null
+++ b/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/util/AddOne.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.tests.util;
+
+import org.apache.flink.table.functions.ScalarFunction;
+
+/**
+ * Scala UDF for testing.
+ */
+public class AddOne extends ScalarFunction {
+ public long eval(long input) {
+ return input + 1;
+ }
+}
diff --git a/flink-end-to-end-tests/test-scripts/test_pyflink.sh b/flink-end-to-end-tests/test-scripts/test_pyflink.sh
index ac521f1..1e58921 100755
--- a/flink-end-to-end-tests/test-scripts/test_pyflink.sh
+++ b/flink-end-to-end-tests/test-scripts/test_pyflink.sh
@@ -20,6 +20,7 @@
set -Eeuo pipefail
CURRENT_DIR=`cd "$(dirname "$0")" && pwd -P`
source "${CURRENT_DIR}"/common.sh
+source "${CURRENT_DIR}"/common_yarn_docker.sh
cp -r "${FLINK_DIR}/conf" "${TEST_DATA_DIR}/conf"
@@ -65,13 +66,23 @@ REQUIREMENTS_PATH="${TEST_DATA_DIR}/requirements.txt"
echo "scipy==1.4.1" > "${REQUIREMENTS_PATH}"
-echo "Test submitting python job:\n"
+echo "Test submitting python job with 'pipeline.jars':\n"
PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \
-pyfs "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" \
-pyreq "${REQUIREMENTS_PATH}" \
-pyarch "${TEST_DATA_DIR}/venv.zip" \
-pyexec "venv.zip/.conda/bin/python" \
- -py "${FLINK_PYTHON_TEST_DIR}/python/python_job.py"
+ -py "${FLINK_PYTHON_TEST_DIR}/python/python_job.py" \
+ pipeline.jars "file://${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar"
+
+echo "Test submitting python job with 'pipeline.classpaths':\n"
+PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \
+ -pyfs "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" \
+ -pyreq "${REQUIREMENTS_PATH}" \
+ -pyarch "${TEST_DATA_DIR}/venv.zip" \
+ -pyexec "venv.zip/.conda/bin/python" \
+ -py "${FLINK_PYTHON_TEST_DIR}/python/python_job.py" \
+ pipeline.classpaths "file://${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar"
echo "Test blink stream python udf sql job:\n"
PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \
@@ -150,3 +161,46 @@ JOB_ID=$($FLINK_DIR/bin/sql-client.sh embedded \
wait_job_terminal_state "$JOB_ID" "FINISHED"
stop_cluster
+
+# test submitting on yarn
+start_hadoop_cluster_and_prepare_flink
+
+# copy test files
+docker cp "${FLINK_PYTHON_DIR}/dev/lint-python.sh" master:/tmp/
+docker cp "${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar" master:/tmp/
+docker cp "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" master:/tmp/
+docker cp "${REQUIREMENTS_PATH}" master:/tmp/
+docker cp "${FLINK_PYTHON_TEST_DIR}/python/python_job.py" master:/tmp/
+PYFLINK_PACKAGE_FILE=$(basename "${FLINK_PYTHON_DIR}"/dist/apache-flink-*.tar.gz)
+docker cp "${FLINK_PYTHON_DIR}/dist/${PYFLINK_PACKAGE_FILE}" master:/tmp/
+
+# prepare environment
+docker exec master bash -c "
+/tmp/lint-python.sh -s miniconda
+source /tmp/.conda/bin/activate
+pip install /tmp/${PYFLINK_PACKAGE_FILE}
+conda install -y -q zip=3.0
+rm -rf /tmp/.conda/pkgs
+cd /tmp
+zip -q -r /tmp/venv.zip .conda
+echo \"taskmanager.memory.task.off-heap.size: 100m\" >> \"/home/hadoop-user/$FLINK_DIRNAME/conf/flink-conf.yaml\"
+"
+
+docker exec master bash -c "export HADOOP_CLASSPATH=\`hadoop classpath\` && \
+ export PYFLINK_CLIENT_EXECUTABLE=/tmp/.conda/bin/python && \
+ /home/hadoop-user/$FLINK_DIRNAME/bin/flink run -m yarn-cluster -ytm 1500 -yjm 1000 \
+ -pyfs /tmp/add_one.py \
+ -pyreq /tmp/requirements.txt \
+ -pyarch /tmp/venv.zip \
+ -pyexec venv.zip/.conda/bin/python \
+ /tmp/PythonUdfSqlJobExample.jar"
+
+docker exec master bash -c "export HADOOP_CLASSPATH=\`hadoop classpath\` && \
+ export PYFLINK_CLIENT_EXECUTABLE=/tmp/.conda/bin/python && \
+ /home/hadoop-user/$FLINK_DIRNAME/bin/flink run -m yarn-cluster -ytm 1500 -yjm 1000 \
+ -pyfs /tmp/add_one.py \
+ -pyreq /tmp/requirements.txt \
+ -pyarch /tmp/venv.zip \
+ -pyexec venv.zip/.conda/bin/python \
+ -py /tmp/python_job.py \
+ pipeline.jars file:/tmp/PythonUdfSqlJobExample.jar"
diff --git a/flink-python/dev/lint-python.sh b/flink-python/dev/lint-python.sh
index 1ad2a19..cb65ff8 100755
--- a/flink-python/dev/lint-python.sh
+++ b/flink-python/dev/lint-python.sh
@@ -28,10 +28,10 @@ function download() {
local DOWNLOAD_STATUS=
if hash "wget" 2>/dev/null; then
# because of the difference of all versions of wget, so we turn of the option --show-progress
- wget "$1" -O "$2" -q
+ wget "$1" -O "$2" -q -T20 -t3
DOWNLOAD_STATUS="$?"
else
- curl "$1" -o "$2" --progress-bar
+ curl "$1" -o "$2" --progress-bar --connect-timeout 20 --retry 3
DOWNLOAD_STATUS="$?"
fi
if [ $DOWNLOAD_STATUS -ne 0 ]; then