You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/05/15 08:47:03 UTC

[2/3] flink git commit: [FLINK-9074] [e2e] Add e2e test for resuming jobs from retained checkpoints

[FLINK-9074] [e2e] Add e2e test for resuming jobs from retained checkpoints

This closes #5969.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4d640ba9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4d640ba9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4d640ba9

Branch: refs/heads/master
Commit: 4d640ba90402f70300e1a9e8486376f7a174c68a
Parents: 0a65f1e
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Wed May 9 12:17:25 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 15 16:19:08 2018 +0800

----------------------------------------------------------------------
 flink-end-to-end-tests/run-nightly-tests.sh     | 24 +++++
 flink-end-to-end-tests/test-scripts/common.sh   | 21 +++++
 .../test_resume_externalized_checkpoints.sh     | 97 ++++++++++++++++++++
 3 files changed, 142 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4d640ba9/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 98d75f03..d00885d 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -130,6 +130,30 @@ fi
 
 if [ $EXIT_CODE == 0 ]; then
   printf "\n==============================================================================\n"
+  printf "Running Resuming Externalized Checkpoint (file, async) end-to-end test\n"
+  printf "==============================================================================\n"
+  STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true $END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  printf "\n==============================================================================\n"
+  printf "Running Resuming Externalized Checkpoint (file, sync) end-to-end test\n"
+  printf "==============================================================================\n"
+  STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false $END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  printf "\n==============================================================================\n"
+  printf "Running Resuming Externalized Checkpoint (rocks) end-to-end test\n"
+  printf "==============================================================================\n"
+  STATE_BACKEND_TYPE=rocks $END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  printf "\n==============================================================================\n"
   printf "Running DataSet allround nightly end-to-end test\n"
   printf "==============================================================================\n"
   $END_TO_END_DIR/test-scripts/test_batch_allround.sh

http://git-wip-us.apache.org/repos/asf/flink/blob/4d640ba9/flink-end-to-end-tests/test-scripts/common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index 3fd38ed..0371f81 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -380,6 +380,27 @@ function wait_oper_metric_num_in_records {
     done
 }
 
+function wait_num_checkpoints {
+    JOB=$1
+    NUM_CHECKPOINTS=$2
+
+    echo "Waiting for job ($JOB) to have at least $NUM_CHECKPOINTS completed checkpoints ..."
+
+    while : ; do
+      N=$(grep -o "Completed checkpoint [1-9]* for job $JOB" $FLINK_DIR/log/*standalonesession*.log | awk '{print $3}' | tail -1)
+
+      if [ -z $N ]; then
+        N=0
+      fi
+
+      if (( N < NUM_CHECKPOINTS )); then
+        sleep 1
+      else
+        break
+      fi
+    done
+}
+
 # make sure to clean up even in case of failures
 function cleanup {
   stop_cluster

http://git-wip-us.apache.org/repos/asf/flink/blob/4d640ba9/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
new file mode 100755
index 0000000..69a5851
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
@@ -0,0 +1,97 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+STATE_BACKEND_TYPE=${STATE_BACKEND_TYPE:-file}
+STATE_BACKEND_FILE_ASYNC=${STATE_BACKEND_FILE_ASYNC:-true}
+
+setup_flink_slf4j_metric_reporter
+start_cluster
+
+function test_cleanup {
+  # don't call ourselves again for another signal interruption
+  trap "exit -1" INT
+  # don't call ourselves again for normal exit
+  trap "" EXIT
+
+  rollback_flink_slf4j_metric_reporter
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap test_cleanup INT
+trap test_cleanup EXIT
+
+CHECKPOINT_DIR="$TEST_DATA_DIR/externalized-chckpt-e2e-backend-dir"
+CHECKPOINT_DIR_URI="file://$CHECKPOINT_DIR"
+
+# run the DataStream allroundjob
+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
+DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \
+  --test.semantics exactly-once \
+  --environment.externalize_checkpoint true \
+  --environment.externalize_checkpoint.cleanup retain \
+  --state_backend $STATE_BACKEND_TYPE \
+  --state_backend.checkpoint_directory $CHECKPOINT_DIR_URI \
+  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+  --sequence_generator_source.sleep_time 15 \
+  --sequence_generator_source.sleep_after_elements 1 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $DATASTREAM_JOB
+
+wait_num_checkpoints $DATASTREAM_JOB 1
+wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200
+
+cancel_job $DATASTREAM_JOB
+
+CHECKPOINT_PATH=$(ls -d $CHECKPOINT_DIR/$DATASTREAM_JOB/chk-[1-9]*)
+
+if [ -z $CHECKPOINT_PATH ]; then
+  echo "Expected an externalized checkpoint to be present, but none exists."
+  PASS=""
+  exit 1
+fi
+
+NUM_CHECKPOINTS=$(echo $CHECKPOINT_PATH | wc -l | tr -d ' ')
+if (( $NUM_CHECKPOINTS > 1 )); then
+  echo "Expected only exactly 1 externalized checkpoint to be present, but $NUM_CHECKPOINTS exists."
+  PASS=""
+  exit 1
+fi
+
+echo "Restoring job with externalized checkpoint at $CHECKPOINT_PATH ..."
+DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $CHECKPOINT_PATH -d $TEST_PROGRAM_JAR \
+  --test.semantics exactly-once \
+  --environment.externalize_checkpoint true \
+  --environment.externalize_checkpoint.cleanup retain \
+  --state_backend $STATE_BACKEND_TYPE \
+  --state_backend.checkpoint_directory $CHECKPOINT_DIR_URI \
+  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+  --sequence_generator_source.sleep_time 15 \
+  --sequence_generator_source.sleep_after_elements 1 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $DATASTREAM_JOB
+
+wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200
+
+# if state is errorneous and the general purpose DataStream job produces alerting messages,
+# output would be non-empty and the test will not pass