You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/06/22 09:21:56 UTC
[3/6] flink git commit: [FLINK-9394] [e2e] Test rescaling when
resuming from externalized checkpoints
[FLINK-9394] [e2e] Test rescaling when resuming from externalized checkpoints
This closes #6038.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d7d68253
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d7d68253
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d7d68253
Branch: refs/heads/release-1.5
Commit: d7d68253f59ab906519ef92e1d463987969e6aa0
Parents: 0448d51
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri May 18 00:33:12 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jun 22 11:20:41 2018 +0200
----------------------------------------------------------------------
flink-end-to-end-tests/run-nightly-tests.sh | 24 ++++++---
.../test_resume_externalized_checkpoints.sh | 56 +++++++++++++++-----
2 files changed, 60 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d7d68253/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index da029f8..5399893 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -58,13 +58,23 @@ run_test "Resuming Savepoint (rocks, no parallelism change) end-to-end test" "$E
run_test "Resuming Savepoint (rocks, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 rocks"
run_test "Resuming Savepoint (rocks, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 rocks"
-run_test "Resuming Externalized Checkpoint (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true"
-run_test "Resuming Externalized Checkpoint (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false"
-run_test "Resuming Externalized Checkpoint (rocks) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks"
-
-run_test "Resuming Externalized Checkpoint after terminal failure (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true true"
-run_test "Resuming Externalized Checkpoint after terminal failure (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false true"
-run_test "Resuming Externalized Checkpoint after terminal failure (rocks) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks true"
+run_test "Resuming Externalized Checkpoint (file, async, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file true true"
+run_test "Resuming Externalized Checkpoint (file, sync, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file false true"
+run_test "Resuming Externalized Checkpoint (file, async, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 file true true"
+run_test "Resuming Externalized Checkpoint (file, sync, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 file false true"
+run_test "Resuming Externalized Checkpoint (file, async, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 file true true"
+run_test "Resuming Externalized Checkpoint (file, sync, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 file false true"
+run_test "Resuming Externalized Checkpoint (rocks, non-incremental, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true false"
+run_test "Resuming Externalized Checkpoint (rocks, incremental, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true true"
+run_test "Resuming Externalized Checkpoint (rocks, non-incremental, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 rocks true false"
+run_test "Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 rocks true true"
+run_test "Resuming Externalized Checkpoint (rocks, non-incremental, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 rocks true false"
+run_test "Resuming Externalized Checkpoint (rocks, incremental, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 rocks true true"
+
+run_test "Resuming Externalized Checkpoint after terminal failure (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file true false true"
+run_test "Resuming Externalized Checkpoint after terminal failure (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false false true"
+run_test "Resuming Externalized Checkpoint after terminal failure (rocks, non-incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true false true"
+run_test "Resuming Externalized Checkpoint after terminal failure (rocks, incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true true true"
run_test "DataSet allround end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_allround.sh"
run_test "Streaming SQL end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh"
http://git-wip-us.apache.org/repos/asf/flink/blob/d7d68253/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
index 6472b23..29c3786 100755
--- a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
+++ b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
@@ -17,13 +17,28 @@
# limitations under the License.
################################################################################
+if [ -z $1 ] || [ -z $2 ]; then
+ echo "Usage: ./test_resume_externalized_checkpoints.sh <original_dop> <new_dop> <state_backend_setting> <state_backend_file_async_setting> <state_backend_rocks_incremental_setting>"
+ exit 1
+fi
+
source "$(dirname "$0")"/common.sh
-STATE_BACKEND_TYPE=${1:-file}
-STATE_BACKEND_FILE_ASYNC=${2:-true}
-SIMULATE_FAILURE=${3:-false}
+ORIGINAL_DOP=$1
+NEW_DOP=$2
+STATE_BACKEND_TYPE=${3:-file}
+STATE_BACKEND_FILE_ASYNC=${4:-true}
+STATE_BACKEND_ROCKS_INCREMENTAL=${5:-false}
+SIMULATE_FAILURE=${6:-false}
+
+if (( $ORIGINAL_DOP >= $NEW_DOP )); then
+ NUM_SLOTS=$ORIGINAL_DOP
+else
+ NUM_SLOTS=$NEW_DOP
+fi
backup_config
+change_conf "taskmanager.numberOfTaskSlots" "1" "${NUM_SLOTS}"
setup_flink_slf4j_metric_reporter
start_cluster
@@ -43,18 +58,30 @@ CHECKPOINT_DIR_URI="file://$CHECKPOINT_DIR"
# run the DataStream allroundjob
-echo "Running externalized checkpoints test, with STATE_BACKEND_TYPE=$STATE_BACKEND_TYPE STATE_BACKEND_FILE_ASYNC=$STATE_BACKEND_FILE_ASYNC SIMULATE_FAILURE=$SIMULATE_FAILURE ..."
+echo "Running externalized checkpoints test, \
+with ORIGINAL_DOP=$ORIGINAL_DOP NEW_DOP=$NEW_DOP \
+and STATE_BACKEND_TYPE=$STATE_BACKEND_TYPE STATE_BACKEND_FILE_ASYNC=$STATE_BACKEND_FILE_ASYNC \
+STATE_BACKEND_ROCKSDB_INCREMENTAL=$STATE_BACKEND_ROCKS_INCREMENTAL SIMULATE_FAILURE=$SIMULATE_FAILURE ..."
TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
-BASE_JOB_CMD="$FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \
- --test.semantics exactly-once \
- --environment.externalize_checkpoint true \
- --environment.externalize_checkpoint.cleanup retain \
- --state_backend $STATE_BACKEND_TYPE \
- --state_backend.checkpoint_directory $CHECKPOINT_DIR_URI \
- --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
- --sequence_generator_source.sleep_time 15 \
- --sequence_generator_source.sleep_after_elements 1"
+
+function buildBaseJobCmd {
+ local dop=$1
+
+ echo "$FLINK_DIR/bin/flink run -d -p $dop $TEST_PROGRAM_JAR \
+ --test.semantics exactly-once \
+ --environment.parallelism $dop \
+ --environment.externalize_checkpoint true \
+ --environment.externalize_checkpoint.cleanup retain \
+ --state_backend $STATE_BACKEND_TYPE \
+ --state_backend.checkpoint_directory $CHECKPOINT_DIR_URI \
+ --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+ --state_backend.rocks.incremental $STATE_BACKEND_ROCKS_INCREMENTAL \
+ --sequence_generator_source.sleep_time 15 \
+ --sequence_generator_source.sleep_after_elements 1"
+}
+
+BASE_JOB_CMD=`buildBaseJobCmd $ORIGINAL_DOP`
JOB_CMD=""
if [[ $SIMULATE_FAILURE == "true" ]]; then
@@ -98,6 +125,9 @@ if (( $NUM_CHECKPOINTS > 1 )); then
fi
echo "Restoring job with externalized checkpoint at $CHECKPOINT_PATH ..."
+
+BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP`
+
DATASTREAM_JOB=$($BASE_JOB_CMD | grep "Job has been submitted with JobID" | sed 's/.* //g')
wait_job_running $DATASTREAM_JOB