You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/05/22 08:43:38 UTC

[02/17] flink git commit: [FLINK-8977] [e2e] Extend externalized checkpoint e2e to simulate job failures

[FLINK-8977] [e2e] Extend externalized checkpoint e2e to simulate job failures

This closes #6004.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/97a66388
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/97a66388
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/97a66388

Branch: refs/heads/master
Commit: 97a66388522db25ef03655c25311a05f92204d35
Parents: c8187e8
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon May 14 11:56:41 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 22 15:50:54 2018 +0800

----------------------------------------------------------------------
 flink-end-to-end-tests/run-nightly-tests.sh     | 21 +++++++--
 flink-end-to-end-tests/test-scripts/common.sh   | 17 +++++++
 .../test_resume_externalized_checkpoints.sh     | 48 +++++++++++++-------
 3 files changed, 66 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/97a66388/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 4cfd778..2898682 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -113,17 +113,32 @@ if [ $EXIT_CODE == 0 ]; then
 fi
 
 if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Externalized Checkpoint (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true"
+  run_test "Resuming Externalized Checkpoint (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true false"
   EXIT_CODE=$?
 fi
 
 if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Externalized Checkpoint (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false"
+  run_test "Resuming Externalized Checkpoint (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false false"
   EXIT_CODE=$?
 fi
 
 if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Externalized Checkpoint (rocks) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks"
+  run_test "Resuming Externalized Checkpoint (rocks) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks false"
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  run_test "Resuming Externalized Checkpoint after terminal failure (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true true"
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  run_test "Resuming Externalized Checkpoint after terminal failure (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false true"
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  run_test "Resuming Externalized Checkpoint after terminal failure (rocks) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks true"
   EXIT_CODE=$?
 fi
 

http://git-wip-us.apache.org/repos/asf/flink/blob/97a66388/flink-end-to-end-tests/test-scripts/common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index 94e179e..18833cc 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -239,6 +239,23 @@ function wait_job_running {
   done
 }
 
+function wait_job_terminal_state {
+  local job=$1
+  local terminal_state=$2
+
+  echo "Waiting for job ($job) to reach terminal state $terminal_state ..."
+
+  while : ; do
+    N=$(grep -o "Job $job reached globally terminal state $terminal_state" $FLINK_DIR/log/*standalonesession*.log | tail -1)
+
+    if [[ -z $N ]]; then
+      sleep 1
+    else
+      break
+    fi
+  done
+}
+
 function take_savepoint {
   "$FLINK_DIR"/bin/flink savepoint $1 $2
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/97a66388/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
index 3dc9909..3994e30 100755
--- a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
+++ b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
@@ -21,6 +21,7 @@ source "$(dirname "$0")"/common.sh
 
 STATE_BACKEND_TYPE=${1:-file}
 STATE_BACKEND_FILE_ASYNC=${2:-true}
+SIMULATE_FAILURE=${3:-false}
 
 setup_flink_slf4j_metric_reporter
 start_cluster
@@ -43,8 +44,11 @@ CHECKPOINT_DIR="$TEST_DATA_DIR/externalized-chckpt-e2e-backend-dir"
 CHECKPOINT_DIR_URI="file://$CHECKPOINT_DIR"
 
 # run the DataStream allroundjob
+
+echo "Running externalized checkpoints test, with STATE_BACKEND_TYPE=$STATE_BACKEND_TYPE STATE_BACKEND_FILE_ASYNC=$STATE_BACKEND_FILE_ASYNC SIMULATE_FAILURE=$SIMULATE_FAILURE ..."
+
 TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \
+BASE_JOB_CMD="$FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \
   --test.semantics exactly-once \
   --environment.externalize_checkpoint true \
   --environment.externalize_checkpoint.cleanup retain \
@@ -52,15 +56,35 @@ DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \
   --state_backend.checkpoint_directory $CHECKPOINT_DIR_URI \
   --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
   --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+  --sequence_generator_source.sleep_after_elements 1"
+
+JOB_CMD=""
+if [[ $SIMULATE_FAILURE == "true" ]]; then
+  # the submitted job should fail after at least 1 complete checkpoint.
+  # When simulating failures with the general purpose DataStream job,
+  # we disable restarting because we want to manually do that after the job fails.
+  JOB_CMD="$BASE_JOB_CMD \
+    --test.simulate_failure true \
+    --test.simulate_failure.num_records 200 \
+    --test.simulate_failure.num_checkpoints 1 \
+    --test.simulate_failure.max_failures 1 \
+    --environment.restart_strategy no_restart"
+else
+  JOB_CMD=$BASE_JOB_CMD
+fi
+
+DATASTREAM_JOB=$($JOB_CMD | grep "Job has been submitted with JobID" | sed 's/.* //g')
 
 wait_job_running $DATASTREAM_JOB
 
-wait_num_checkpoints $DATASTREAM_JOB 1
-wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200
+if [[ $SIMULATE_FAILURE == "true" ]]; then
+  wait_job_terminal_state $DATASTREAM_JOB FAILED
+else
+  wait_num_checkpoints $DATASTREAM_JOB 1
+  wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200
 
-cancel_job $DATASTREAM_JOB
+  cancel_job $DATASTREAM_JOB
+fi
 
 CHECKPOINT_PATH=$(ls -d $CHECKPOINT_DIR/$DATASTREAM_JOB/chk-[1-9]*)
 
@@ -78,19 +102,9 @@ if (( $NUM_CHECKPOINTS > 1 )); then
 fi
 
 echo "Restoring job with externalized checkpoint at $CHECKPOINT_PATH ..."
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $CHECKPOINT_PATH -d $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.externalize_checkpoint true \
-  --environment.externalize_checkpoint.cleanup retain \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR_URI \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+DATASTREAM_JOB=$($BASE_JOB_CMD | grep "Job has been submitted with JobID" | sed 's/.* //g')
 
 wait_job_running $DATASTREAM_JOB
-
 wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200
 
 # if state is errorneous and the general purpose DataStream job produces alerting messages,