You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/11/02 08:33:00 UTC

[jira] [Commented] (FLINK-10632) Run general purpose test job with failures in per-job mode

    [ https://issues.apache.org/jira/browse/FLINK-10632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672761#comment-16672761 ] 

ASF GitHub Bot commented on FLINK-10632:
----------------------------------------

dawidwys closed pull request #6943: [FLINK-10632][e2e] Running general purpose testing job with failure in per-job mode
URL: https://github.com/apache/flink/pull/6943
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index bbb0e9e5885..6a6c8b6729b 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -61,6 +61,11 @@ run_test "Running HA (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts
 run_test "Running HA (rocks, non-incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_datastream.sh rocks true false" "skip_check_exceptions"
 run_test "Running HA (rocks, incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_datastream.sh rocks true true" "skip_check_exceptions"
 
+run_test "Running HA per-job cluster (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_per_job_cluster_datastream.sh file true false" "skip_check_exceptions"
+run_test "Running HA per-job cluster (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_per_job_cluster_datastream.sh file false false" "skip_check_exceptions"
+run_test "Running HA per-job cluster (rocks, non-incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_per_job_cluster_datastream.sh rocks true false" "skip_check_exceptions"
+run_test "Running HA per-job cluster (rocks, incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_per_job_cluster_datastream.sh rocks true true" "skip_check_exceptions"
+
 run_test "Resuming Savepoint (file, async, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 file true"
 run_test "Resuming Savepoint (file, sync, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 file false"
 run_test "Resuming Savepoint (file, async, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 file true"
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index e815a85afdd..09c42af8742 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -33,6 +33,7 @@ case "$(uname -s)" in
 esac
 
 export EXIT_CODE=0
+export TASK_SLOTS_PER_TM_HA=4
 
 echo "Flink dist directory: $FLINK_DIR"
 
@@ -123,7 +124,7 @@ function create_ha_config() {
     jobmanager.rpc.port: 6123
     jobmanager.heap.mb: 1024
     taskmanager.heap.mb: 1024
-    taskmanager.numberOfTaskSlots: 4
+    taskmanager.numberOfTaskSlots: ${TASK_SLOTS_PER_TM_HA}
 
     #==============================================================================
     # High Availability
@@ -238,9 +239,7 @@ function start_local_zk {
     done < <(grep "^server\." "${FLINK_DIR}/conf/zoo.cfg")
 }
 
-function start_cluster {
-  "$FLINK_DIR"/bin/start-cluster.sh
-
+function wait_dispatcher_running {
   # wait at most 10 seconds until the dispatcher is up
   local QUERY_URL
   if [ "x$USE_SSL" = "xON" ]; then
@@ -264,6 +263,11 @@ function start_cluster {
   done
 }
 
+function start_cluster {
+  "$FLINK_DIR"/bin/start-cluster.sh
+  wait_dispatcher_running
+}
+
 function start_taskmanagers {
     tmnum=$1
     echo "Start ${tmnum} more task managers"
@@ -599,6 +603,33 @@ function wait_oper_metric_num_in_records {
     done
 }
 
+function wait_num_of_occurence_in_logs {
+    local text=$1
+    local number=$2
+    local logs
+    if [ -z "$3" ]; then
+        logs="standalonesession"
+    else
+        logs="$3"
+    fi
+
+    echo "Waiting for text ${text} to appear ${number} of times in logs..."
+
+    while : ; do
+      N=$(grep -o "${text}" $FLINK_DIR/log/*${logs}*.log | wc -l)
+
+      if [ -z $N ]; then
+        N=0
+      fi
+
+      if (( N < number )); then
+        sleep 1
+      else
+        break
+      fi
+    done
+}
+
 function wait_num_checkpoints {
     JOB=$1
     NUM_CHECKPOINTS=$2
diff --git a/flink-end-to-end-tests/test-scripts/common_ha.sh b/flink-end-to-end-tests/test-scripts/common_ha.sh
index f476bac5e66..6521cb40127 100644
--- a/flink-end-to-end-tests/test-scripts/common_ha.sh
+++ b/flink-end-to-end-tests/test-scripts/common_ha.sh
@@ -24,7 +24,7 @@ CLEARED=0
 JM_WATCHDOG_PID=0
 TM_WATCHDOG_PID=0
 
-function stop_cluster_and_watchdog() {
+function stop_watchdogs() {
     if [ ${CLEARED} -eq 0 ]; then
 
         if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then
@@ -43,6 +43,15 @@ function stop_cluster_and_watchdog() {
     fi
 }
 
+function verify_num_occurences_in_logs() {
+    local log_pattern="$1"
+    local text="$2"
+    local expected_no="$3"
+
+    local actual_no=$(grep -r --include "*${log_pattern}*.log" -e "${text}" "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l)
+    [[ "${expected_no}" -eq "${actual_no}" ]]
+}
+
 function verify_logs() {
     local OUTPUT=$FLINK_DIR/log/*.out
     local JM_FAILURES=$1
@@ -56,14 +65,14 @@ function verify_logs() {
     fi
 
     # checks that all apart from the first JM recover the failed jobgraph.
-    if ! [ `grep -r --include '*standalonesession*.log' 'Recovered SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then
+    if ! verify_num_occurences_in_logs 'standalonesession' 'Recovered SubmittedJobGraph' ${JM_FAILURES}; then
         echo "FAILURE: A JM did not take over."
         EXIT_CODE=1
     fi
 
     if [ "$VERIFY_CHECKPOINTS" = true ]; then
     # search the logs for JMs that log completed checkpoints
-        if ! [ `grep -r --include '*standalonesession*.log' 'Completed checkpoint' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq $((JM_FAILURES + 1)) ]; then
+        if ! verify_num_occurences_in_logs 'standalonesession' 'Completed checkpoint' $((JM_FAILURES + 1)); then
             echo "FAILURE: A JM did not execute the job."
             EXIT_CODE=1
         fi
@@ -77,26 +86,39 @@ function verify_logs() {
 
 function jm_watchdog() {
     local EXPECTED_JMS=$1
-    local IP_PORT=$2
+    local PROCESS_NAME=$2
 
     while true; do
-        local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' | wc -l`;
+        local RUNNING_JMS=`jps | grep "${PROCESS_NAME}" | wc -l`;
         local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS))
         for (( c=0; c<MISSING_JMS; c++ )); do
-            "$FLINK_DIR"/bin/jobmanager.sh start "localhost" ${IP_PORT}
+            ${@:3}
         done
         sleep 1;
     done
 }
 
+function start_jm_cmd {
+    local IP_PORT=$1
+    "$FLINK_DIR/bin/jobmanager.sh" "start" "localhost" "${IP_PORT}"
+}
+
+#######################################
+# Start watchdog for JM process
+
+# Arguments:
+#   $1: expected number of jms to run
+#   $2: process name to monitor
+#   $3: command to start new jm
+#######################################
 function start_ha_jm_watchdog() {
-    jm_watchdog $1 $2 &
+    jm_watchdog $1 $2 ${@:3} &
     JM_WATCHDOG_PID=$!
     echo "Running JM watchdog @ ${JM_WATCHDOG_PID}"
 }
 
-function kill_jm {
-    local JM_PIDS=`jps | grep 'StandaloneSessionClusterEntrypoint' | cut -d " " -f 1`
+function kill_single {
+    local JM_PIDS=`jps | grep "$1" | cut -d " " -f 1`
     local JM_PIDS=(${JM_PIDS[@]})
     local PID=${JM_PIDS[0]}
     kill -9 ${PID}
diff --git a/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh b/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh
index 716b80cc775..979c1ecca0c 100755
--- a/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh
+++ b/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh
@@ -22,6 +22,19 @@ source "$(dirname "$0")"/common_ha.sh
 
 TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
 
+function ha_cleanup() {
+  # don't call ourselves again for another signal interruption
+  trap "exit -1" INT
+  # don't call ourselves again for normal exit
+  trap "" EXIT
+
+  # kill the cluster and zookeeper
+  stop_watchdogs
+}
+
+trap ha_cleanup INT
+trap ha_cleanup EXIT
+
 function run_ha_test() {
     local PARALLELISM=$1
 
@@ -47,12 +60,12 @@ function run_ha_test() {
     wait_job_running ${JOB_ID}
 
     # start the watchdog that keeps the number of JMs stable
-    start_ha_jm_watchdog 1 "8081"
+    start_ha_jm_watchdog 1 "StandaloneSessionClusterEntrypoint" start_jm_cmd "8081"
 
     for (( c=0; c<${JM_KILLS}; c++ )); do
         # kill the JM and wait for watchdog to
         # create a new one which will take over
-        kill_jm
+        kill_single 'StandaloneSessionClusterEntrypoint'
         wait_job_running ${JOB_ID}
     done
 
@@ -60,12 +73,6 @@ function run_ha_test() {
 
     # do not verify checkpoints in the logs
     verify_logs ${JM_KILLS} false
-
-    # kill the cluster and zookeeper
-    stop_cluster_and_watchdog
 }
 
-trap stop_cluster_and_watchdog INT
-trap stop_cluster_and_watchdog EXIT
-
 run_ha_test 4
diff --git a/flink-end-to-end-tests/test-scripts/test_ha_datastream.sh b/flink-end-to-end-tests/test-scripts/test_ha_datastream.sh
index 862055e2173..dc386fe3316 100755
--- a/flink-end-to-end-tests/test-scripts/test_ha_datastream.sh
+++ b/flink-end-to-end-tests/test-scripts/test_ha_datastream.sh
@@ -22,6 +22,19 @@ source "$(dirname "$0")"/common_ha.sh
 
 TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
 
+function ha_cleanup() {
+  # don't call ourselves again for another signal interruption
+  trap "exit -1" INT
+  # don't call ourselves again for normal exit
+  trap "" EXIT
+
+  # kill the cluster and zookeeper
+  stop_watchdogs
+}
+
+trap ha_cleanup INT
+trap ha_cleanup EXIT
+
 function run_ha_test() {
     local PARALLELISM=$1
     local BACKEND=$2
@@ -34,7 +47,12 @@ function run_ha_test() {
     CLEARED=0
 
     # start the cluster on HA mode
-    start_ha_cluster
+    create_ha_config
+    # change the pid dir to start log files always from 0, this is important for checks in the
+    # jm killing loop
+    set_conf "env.pid.dir" "${TEST_DATA_DIR}"
+    start_local_zk
+    start_cluster
 
     echo "Running on HA mode: parallelism=${PARALLELISM}, backend=${BACKEND}, asyncSnapshots=${ASYNC}, and incremSnapshots=${INCREM}."
 
@@ -58,7 +76,7 @@ function run_ha_test() {
     wait_job_running ${JOB_ID}
 
     # start the watchdog that keeps the number of JMs stable
-    start_ha_jm_watchdog 1 "8081"
+    start_ha_jm_watchdog 1 "StandaloneSessionClusterEntrypoint" start_jm_cmd "8081"
 
     sleep 5
 
@@ -66,26 +84,20 @@ function run_ha_test() {
     start_ha_tm_watchdog ${JOB_ID} 1
 
     # let the job run for a while to take some checkpoints
-    sleep 20
+    wait_num_of_occurence_in_logs "Completed checkpoint [1-9]* for job ${JOB_ID}" 2 "standalonesession"
 
-    for (( c=0; c<${JM_KILLS}; c++ )); do
+    for (( c=1; c<=${JM_KILLS}; c++ )); do
         # kill the JM and wait for watchdog to
         # create a new one which will take over
-        kill_jm
+        kill_single 'StandaloneSessionClusterEntrypoint'
         # let the job start and take some checkpoints
-        sleep 60
+        wait_num_of_occurence_in_logs "Completed checkpoint [1-9]* for job ${JOB_ID}" 2 "standalonesession-${c}"
     done
 
     # verify checkpoints in the logs
     verify_logs ${JM_KILLS} true
-
-    # kill the cluster and zookeeper
-    stop_cluster_and_watchdog
 }
 
-trap stop_cluster_and_watchdog INT
-trap stop_cluster_and_watchdog EXIT
-
 STATE_BACKEND_TYPE=${1:-file}
 STATE_BACKEND_FILE_ASYNC=${2:-true}
 STATE_BACKEND_ROCKS_INCREMENTAL=${3:-false}
diff --git a/flink-end-to-end-tests/test-scripts/test_ha_per_job_cluster_datastream.sh b/flink-end-to-end-tests/test-scripts/test_ha_per_job_cluster_datastream.sh
new file mode 100755
index 00000000000..61dd8bcf050
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_ha_per_job_cluster_datastream.sh
@@ -0,0 +1,159 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/common_ha.sh
+
+TEST_PROGRAM_JAR_NAME=DataStreamAllroundTestProgram.jar
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/${TEST_PROGRAM_JAR_NAME}
+FLINK_LIB_DIR=${FLINK_DIR}/lib
+JOB_ID="00000000000000000000000000000000"
+
+function ha_cleanup() {
+  # don't call ourselves again for another signal interruption
+  trap "exit -1" INT
+  # don't call ourselves again for normal exit
+  trap "" EXIT
+
+  stop_watchdogs
+  kill_all 'StandaloneJobClusterEntryPoint'
+  rm ${FLINK_LIB_DIR}/${TEST_PROGRAM_JAR_NAME}
+}
+
+trap ha_cleanup INT
+trap ha_cleanup EXIT
+
+function run_job() {
+    local PARALLELISM=$1
+    local BACKEND=$2
+    local ASYNC=$3
+    local INCREM=$4
+
+    local CHECKPOINT_DIR="${TEST_DATA_DIR}/checkpoints/"
+
+    ${FLINK_DIR}/bin/standalone-job.sh start \
+        --job-classname org.apache.flink.streaming.tests.DataStreamAllroundTestProgram \
+        --environment.parallelism ${PARALLELISM} \
+        --test.semantics exactly-once \
+        --test.simulate_failure true \
+        --test.simulate_failure.num_records 200 \
+        --test.simulate_failure.num_checkpoints 1 \
+        --test.simulate_failure.max_failures 20 \
+        --state_backend ${BACKEND} \
+        --state_backend.checkpoint_directory "file://${CHECKPOINT_DIR}" \
+        --state_backend.file.async ${ASYNC} \
+        --state_backend.rocks.incremental ${INCREM} \
+        --sequence_generator_source.sleep_time 15 \
+        --sequence_generator_source.sleep_after_elements 1
+}
+
+function verify_logs_per_job() {
+    local OUTPUT=$FLINK_DIR/log/*.out
+    local JM_FAILURES=$1
+    local EXIT_CODE=0
+
+    # verify that we have no alerts
+    if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
+        echo "FAILURE: Alerts found at the general purpose job."
+        EXIT_CODE=1
+    fi
+
+    # checks that all apart from the first JM recover the failed jobgraph.
+    if ! verify_num_occurences_in_logs 'standalonejob' 'Found 0 checkpoints in ZooKeeper' 1; then
+        echo "FAILURE: A JM did not take over, but started new job."
+        EXIT_CODE=1
+    fi
+
+    if ! verify_num_occurences_in_logs 'standalonejob' 'Found [[:digit:]]\+ checkpoints in ZooKeeper' $((JM_FAILURES + 1)); then
+        echo "FAILURE: A JM did not take over."
+        EXIT_CODE=1
+    fi
+
+    # search the logs for JMs that log completed checkpoints
+    if ! verify_num_occurences_in_logs 'standalonejob' 'Completed checkpoint' $((JM_FAILURES + 1)); then
+        echo "FAILURE: A JM did not execute the job."
+        EXIT_CODE=1
+    fi
+
+    if [[ $EXIT_CODE != 0 ]]; then
+        echo "One or more tests FAILED."
+        exit $EXIT_CODE
+    fi
+}
+
+function run_ha_test() {
+    local PARALLELISM=$1
+    local BACKEND=$2
+    local ASYNC=$3
+    local INCREM=$4
+
+    local JM_KILLS=3
+
+    CLEARED=0
+
+    # add job jar to cluster classpath
+    cp ${TEST_PROGRAM_JAR} ${FLINK_LIB_DIR}
+
+    # start the cluster on HA mode
+    create_ha_config
+
+    # change the pid dir to start log files always from 0, this is important for checks in the
+    # jm killing loop
+    set_conf "env.pid.dir" "${TEST_DATA_DIR}"
+
+    start_local_zk
+
+    echo "Running on HA mode: parallelism=${PARALLELISM}, backend=${BACKEND}, asyncSnapshots=${ASYNC}, and incremSnapshots=${INCREM}."
+
+    # submit a job in detached mode and let it run
+    run_job ${PARALLELISM} ${BACKEND} ${ASYNC} ${INCREM}
+
+    # divide parallelism by slots per tm with rounding up
+    local neededTaskmanagers=$(( (${PARALLELISM} + ${TASK_SLOTS_PER_TM_HA} - 1)  / ${TASK_SLOTS_PER_TM_HA} ))
+    start_taskmanagers ${neededTaskmanagers}
+
+    wait_job_running ${JOB_ID}
+
+    # start the watchdog that keeps the number of JMs stable
+    start_ha_jm_watchdog 1 "StandaloneJobClusterEntryPoint" run_job ${PARALLELISM} ${BACKEND} ${ASYNC} ${INCREM}
+
+    # start the watchdog that keeps the number of TMs stable
+    start_ha_tm_watchdog ${JOB_ID} ${neededTaskmanagers}
+
+    # let the job run for a while to take some checkpoints
+    wait_num_of_occurence_in_logs "Completed checkpoint [1-9]* for job ${JOB_ID}" 2 "standalonejob"
+
+    for (( c=1; c<=${JM_KILLS}; c++ )); do
+        # kill the JM and wait for watchdog to
+        # create a new one which will take over
+        kill_single 'StandaloneJobClusterEntryPoint'
+        # let the job start and take some checkpoints
+        wait_num_of_occurence_in_logs "Completed checkpoint [1-9]* for job ${JOB_ID}" 2 "standalonejob-${c}"
+    done
+
+    # verify checkpoints in the logs
+    verify_logs_per_job ${JM_KILLS}
+}
+
+
+STATE_BACKEND_TYPE=${1:-file}
+STATE_BACKEND_FILE_ASYNC=${2:-true}
+STATE_BACKEND_ROCKS_INCREMENTAL=${3:-false}
+
+run_ha_test 4 ${STATE_BACKEND_TYPE} ${STATE_BACKEND_FILE_ASYNC} ${STATE_BACKEND_ROCKS_INCREMENTAL}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Run general purpose test job with failures in per-job mode
> ----------------------------------------------------------
>
>                 Key: FLINK-10632
>                 URL: https://issues.apache.org/jira/browse/FLINK-10632
>             Project: Flink
>          Issue Type: Sub-task
>          Components: E2E Tests
>    Affects Versions: 1.7.0
>            Reporter: Till Rohrmann
>            Assignee: Dawid Wysakowicz
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> Similar to FLINK-8973, we should add an end-to-end which runs the general datastream job with failures on a per-job cluster with HA enabled (either directly the {{StandaloneJobClusterEntrypoint}} or a docker image based on this entrypoint).
> We should kill the TMs as well as the cluster entrypoint and verify that the job recovers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)