You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/06/22 09:21:56 UTC

[3/6] flink git commit: [FLINK-9394] [e2e] Test rescaling when resuming from externalized checkpoints

[FLINK-9394] [e2e] Test rescaling when resuming from externalized checkpoints

This closes #6038.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d7d68253
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d7d68253
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d7d68253

Branch: refs/heads/release-1.5
Commit: d7d68253f59ab906519ef92e1d463987969e6aa0
Parents: 0448d51
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri May 18 00:33:12 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jun 22 11:20:41 2018 +0200

----------------------------------------------------------------------
 flink-end-to-end-tests/run-nightly-tests.sh     | 24 ++++++---
 .../test_resume_externalized_checkpoints.sh     | 56 +++++++++++++++-----
 2 files changed, 60 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d7d68253/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index da029f8..5399893 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -58,13 +58,23 @@ run_test "Resuming Savepoint (rocks, no parallelism change) end-to-end test" "$E
 run_test "Resuming Savepoint (rocks, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 rocks"
 run_test "Resuming Savepoint (rocks, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 rocks"
 
-run_test "Resuming Externalized Checkpoint (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true"
-run_test "Resuming Externalized Checkpoint (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false"
-run_test "Resuming Externalized Checkpoint (rocks) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks"
-
-run_test "Resuming Externalized Checkpoint after terminal failure (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true true"
-run_test "Resuming Externalized Checkpoint after terminal failure (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false true"
-run_test "Resuming Externalized Checkpoint after terminal failure (rocks) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks true"
+run_test "Resuming Externalized Checkpoint (file, async, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file true true"
+run_test "Resuming Externalized Checkpoint (file, sync, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file false true"
+run_test "Resuming Externalized Checkpoint (file, async, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 file true true"
+run_test "Resuming Externalized Checkpoint (file, sync, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 file false true"
+run_test "Resuming Externalized Checkpoint (file, async, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 file true true"
+run_test "Resuming Externalized Checkpoint (file, sync, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 file false true"
+run_test "Resuming Externalized Checkpoint (rocks, non-incremental, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true false"
+run_test "Resuming Externalized Checkpoint (rocks, incremental, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true true"
+run_test "Resuming Externalized Checkpoint (rocks, non-incremental, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 rocks true false"
+run_test "Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 rocks true true"
+run_test "Resuming Externalized Checkpoint (rocks, non-incremental, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 rocks true false"
+run_test "Resuming Externalized Checkpoint (rocks, incremental, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 rocks true true"
+
+run_test "Resuming Externalized Checkpoint after terminal failure (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file true false true"
+run_test "Resuming Externalized Checkpoint after terminal failure (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false false true"
+run_test "Resuming Externalized Checkpoint after terminal failure (rocks, non-incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true false true"
+run_test "Resuming Externalized Checkpoint after terminal failure (rocks, incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true true true"
 
 run_test "DataSet allround end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_allround.sh"
 run_test "Streaming SQL end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh"

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d68253/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
index 6472b23..29c3786 100755
--- a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
+++ b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
@@ -17,13 +17,28 @@
 # limitations under the License.
 ################################################################################
 
+if [ -z $1 ] || [ -z $2 ]; then
+ echo "Usage: ./test_resume_externalized_checkpoints.sh <original_dop> <new_dop> <state_backend_setting> <state_backend_file_async_setting> <state_backend_rocks_incremental_setting>"
+ exit 1
+fi
+
 source "$(dirname "$0")"/common.sh
 
-STATE_BACKEND_TYPE=${1:-file}
-STATE_BACKEND_FILE_ASYNC=${2:-true}
-SIMULATE_FAILURE=${3:-false}
+ORIGINAL_DOP=$1
+NEW_DOP=$2
+STATE_BACKEND_TYPE=${3:-file}
+STATE_BACKEND_FILE_ASYNC=${4:-true}
+STATE_BACKEND_ROCKS_INCREMENTAL=${5:-false}
+SIMULATE_FAILURE=${6:-false}
+
+if (( $ORIGINAL_DOP >= $NEW_DOP )); then
+ NUM_SLOTS=$ORIGINAL_DOP
+else
+ NUM_SLOTS=$NEW_DOP
+fi
 
 backup_config
+change_conf "taskmanager.numberOfTaskSlots" "1" "${NUM_SLOTS}"
 setup_flink_slf4j_metric_reporter
 start_cluster
 
@@ -43,18 +58,30 @@ CHECKPOINT_DIR_URI="file://$CHECKPOINT_DIR"
 
 # run the DataStream allroundjob
 
-echo "Running externalized checkpoints test, with STATE_BACKEND_TYPE=$STATE_BACKEND_TYPE STATE_BACKEND_FILE_ASYNC=$STATE_BACKEND_FILE_ASYNC SIMULATE_FAILURE=$SIMULATE_FAILURE ..."
+echo "Running externalized checkpoints test, \
+with ORIGINAL_DOP=$ORIGINAL_DOP NEW_DOP=$NEW_DOP \
+and STATE_BACKEND_TYPE=$STATE_BACKEND_TYPE STATE_BACKEND_FILE_ASYNC=$STATE_BACKEND_FILE_ASYNC \
+STATE_BACKEND_ROCKSDB_INCREMENTAL=$STATE_BACKEND_ROCKS_INCREMENTAL SIMULATE_FAILURE=$SIMULATE_FAILURE ..."
 
 TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
-BASE_JOB_CMD="$FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.externalize_checkpoint true \
-  --environment.externalize_checkpoint.cleanup retain \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR_URI \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1"
+
+function buildBaseJobCmd {
+  local dop=$1
+
+  echo "$FLINK_DIR/bin/flink run -d -p $dop $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $dop \
+    --environment.externalize_checkpoint true \
+    --environment.externalize_checkpoint.cleanup retain \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR_URI \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --state_backend.rocks.incremental $STATE_BACKEND_ROCKS_INCREMENTAL \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1"
+}
+
+BASE_JOB_CMD=`buildBaseJobCmd $ORIGINAL_DOP`
 
 JOB_CMD=""
 if [[ $SIMULATE_FAILURE == "true" ]]; then
@@ -98,6 +125,9 @@ if (( $NUM_CHECKPOINTS > 1 )); then
 fi
 
 echo "Restoring job with externalized checkpoint at $CHECKPOINT_PATH ..."
+
+BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP`
+
 DATASTREAM_JOB=$($BASE_JOB_CMD | grep "Job has been submitted with JobID" | sed 's/.* //g')
 
 wait_job_running $DATASTREAM_JOB