You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/05/15 08:47:03 UTC
[2/3] flink git commit: [FLINK-9074] [e2e] Add e2e test for resuming
jobs from retained checkpoints
[FLINK-9074] [e2e] Add e2e test for resuming jobs from retained checkpoints
This closes #5969.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4d640ba9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4d640ba9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4d640ba9
Branch: refs/heads/master
Commit: 4d640ba90402f70300e1a9e8486376f7a174c68a
Parents: 0a65f1e
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Wed May 9 12:17:25 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 15 16:19:08 2018 +0800
----------------------------------------------------------------------
flink-end-to-end-tests/run-nightly-tests.sh | 24 +++++
flink-end-to-end-tests/test-scripts/common.sh | 21 +++++
.../test_resume_externalized_checkpoints.sh | 97 ++++++++++++++++++++
3 files changed, 142 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4d640ba9/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 98d75f03..d00885d 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -130,6 +130,30 @@ fi
if [ $EXIT_CODE == 0 ]; then
printf "\n==============================================================================\n"
+ printf "Running Resuming Externalized Checkpoint (file, async) end-to-end test\n"
+ printf "==============================================================================\n"
+ STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true $END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh
+ EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+ printf "\n==============================================================================\n"
+ printf "Running Resuming Externalized Checkpoint (file, sync) end-to-end test\n"
+ printf "==============================================================================\n"
+ STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false $END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh
+ EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+ printf "\n==============================================================================\n"
+ printf "Running Resuming Externalized Checkpoint (rocks) end-to-end test\n"
+ printf "==============================================================================\n"
+ STATE_BACKEND_TYPE=rocks $END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh
+ EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+ printf "\n==============================================================================\n"
printf "Running DataSet allround nightly end-to-end test\n"
printf "==============================================================================\n"
$END_TO_END_DIR/test-scripts/test_batch_allround.sh
http://git-wip-us.apache.org/repos/asf/flink/blob/4d640ba9/flink-end-to-end-tests/test-scripts/common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index 3fd38ed..0371f81 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -380,6 +380,27 @@ function wait_oper_metric_num_in_records {
done
}
+function wait_num_checkpoints {
+ JOB=$1
+ NUM_CHECKPOINTS=$2
+
+ echo "Waiting for job ($JOB) to have at least $NUM_CHECKPOINTS completed checkpoints ..."
+
+ while : ; do
+ N=$(grep -o "Completed checkpoint [1-9]* for job $JOB" $FLINK_DIR/log/*standalonesession*.log | awk '{print $3}' | tail -1)
+
+ if [ -z $N ]; then
+ N=0
+ fi
+
+ if (( N < NUM_CHECKPOINTS )); then
+ sleep 1
+ else
+ break
+ fi
+ done
+}
+
# make sure to clean up even in case of failures
function cleanup {
stop_cluster
http://git-wip-us.apache.org/repos/asf/flink/blob/4d640ba9/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
new file mode 100755
index 0000000..69a5851
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
@@ -0,0 +1,97 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+STATE_BACKEND_TYPE=${STATE_BACKEND_TYPE:-file}
+STATE_BACKEND_FILE_ASYNC=${STATE_BACKEND_FILE_ASYNC:-true}
+
+setup_flink_slf4j_metric_reporter
+start_cluster
+
+function test_cleanup {
+ # don't call ourselves again for another signal interruption
+ trap "exit -1" INT
+ # don't call ourselves again for normal exit
+ trap "" EXIT
+
+ rollback_flink_slf4j_metric_reporter
+
+ # make sure to run regular cleanup as well
+ cleanup
+}
+trap test_cleanup INT
+trap test_cleanup EXIT
+
+CHECKPOINT_DIR="$TEST_DATA_DIR/externalized-chckpt-e2e-backend-dir"
+CHECKPOINT_DIR_URI="file://$CHECKPOINT_DIR"
+
+# run the DataStream allroundjob
+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
+DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \
+ --test.semantics exactly-once \
+ --environment.externalize_checkpoint true \
+ --environment.externalize_checkpoint.cleanup retain \
+ --state_backend $STATE_BACKEND_TYPE \
+ --state_backend.checkpoint_directory $CHECKPOINT_DIR_URI \
+ --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+ --sequence_generator_source.sleep_time 15 \
+ --sequence_generator_source.sleep_after_elements 1 \
+ | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $DATASTREAM_JOB
+
+wait_num_checkpoints $DATASTREAM_JOB 1
+wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200
+
+cancel_job $DATASTREAM_JOB
+
+CHECKPOINT_PATH=$(ls -d $CHECKPOINT_DIR/$DATASTREAM_JOB/chk-[1-9]*)
+
+if [ -z $CHECKPOINT_PATH ]; then
+ echo "Expected an externalized checkpoint to be present, but none exists."
+ PASS=""
+ exit 1
+fi
+
+NUM_CHECKPOINTS=$(echo $CHECKPOINT_PATH | wc -l | tr -d ' ')
+if (( $NUM_CHECKPOINTS > 1 )); then
+ echo "Expected only exactly 1 externalized checkpoint to be present, but $NUM_CHECKPOINTS exists."
+ PASS=""
+ exit 1
+fi
+
+echo "Restoring job with externalized checkpoint at $CHECKPOINT_PATH ..."
+DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $CHECKPOINT_PATH -d $TEST_PROGRAM_JAR \
+ --test.semantics exactly-once \
+ --environment.externalize_checkpoint true \
+ --environment.externalize_checkpoint.cleanup retain \
+ --state_backend $STATE_BACKEND_TYPE \
+ --state_backend.checkpoint_directory $CHECKPOINT_DIR_URI \
+ --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+ --sequence_generator_source.sleep_time 15 \
+ --sequence_generator_source.sleep_after_elements 1 \
+ | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $DATASTREAM_JOB
+
+wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200
+
+# if state is errorneous and the general purpose DataStream job produces alerting messages,
+# output would be non-empty and the test will not pass